Configure Multiple Kafka Producer in Spring Boot Configure Multiple Kafka Producer in Spring Boot

Page content

This post describes how to configure Multiple Kafka Producer in Spring Boot application from a property file having different configurations such as Kafka cluster, topic, etc.

Setup Spring Boot Project

It is recommended to use Spring Initializr to generate the initial project. Our project should have Web and Kafka dependencies.

Maven Project

Click on the below link to generate maven project with pre-defined configuration:-

https://start.spring.io/#!type=maven-project&language=java&platformVersion=3.0.10&packaging=jar&jvmVersion=11&groupId=com.example&artifactId=springboot-kafka&name=springboot-kafka&description=Kafka%20producer%20and%20consumer%20configuration&packageName=com.example.kafka&dependencies=web,kafka,lombok

A typical pom.xml file for a kafka project look like this:-

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-test</artifactId>
	<scope>test</scope>
</dependency>
<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka-test</artifactId>
	<scope>test</scope>
</dependency>
<dependency>
  <groupId>org.projectlombok</groupId>
  <artifactId>lombok</artifactId>
  <optional>true</optional>
</dependency>

Gradle Project

Click on the below link to generate gradle project with pre-defined configuration:-

https://start.spring.io/#!type=gradle-project&language=java&platformVersion=3.0.10&packaging=jar&jvmVersion=11&groupId=com.example&artifactId=springboot-kafka&name=springboot-kafka&description=Kafka%20producer%20and%20consumer%20configuration&packageName=com.example.kafka&dependencies=web,kafka,lombok

A typical build.gradle file for a kafka project look like this:-

dependencies {
  implementation 'org.springframework.boot:spring-boot-starter-web'
  implementation 'org.springframework.kafka:spring-kafka'
  testImplementation 'org.springframework.boot:spring-boot-starter-test'
  testImplementation 'org.springframework.kafka:spring-kafka-test'
  compileOnly 'org.projectlombok:lombok'
  annotationProcessor 'org.projectlombok:lombok'
}

Kafka Custom Properties

Spring Boot Kafka only provides support for single-producer configuration. For example:-

spring:
  kafka:
    bootstrap-servers: localhost:9092, localhost:9093, localhost:9094
    producer:
      retries: 0
      acks: all
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

We will write a custom class KafkaCustomProperties which will read the property file having multiple producer configurations. A typical example of multiple producer configuration properties where we have two producers producer1 and producer2 publishing messages to different Kafka clusters defined in bootstrap-servers:-

application.yml
kafka:
  producer:
    producer1:
      topic: topic1
      bootstrap-servers: server1:9092, server2:9092, server3:9092
      retries: 0
      acks: all
    producer2:
      topic: topic2
      bootstrap-servers: another.server1:9092, another.server2:9092, another.server3:9092
      retries: 2
      acks: 1
    producer3:
      ...
    producer4:
      ...

The value for bootstrap-servers in the above configuration is shown for illustration purposes. It should be replaced with Kafka cluster servers in comma-separated values.

Here is a list of important custom auto-configuration properties:-

PropertyDescription
kafka.bootstrap-serversComma separated list of kafka servers (host:port) running as a cluster. Applies to all the producers unless overridden.
kafka.producer.producer1.bootstrap-serversKafka bootstrap server for producer1. Overrides kafka.bootstrap-servers
kafka.client-idClient-ID to pass to the server when making requests. Used for server-side logging.
kafka.producer.producer1.client-idClient-ID to pass for producer1. Overrides kafka.client-id
kafka.ssl.*Kafka SSL configuration is to provide secure communication between producer/consumer and Kafka server. You need to generate key-store and trust-store files and configure the location and password
kafka.producer.producer1.*Kafka Producer related configurations. Support all configurations same as spring.kafka.producer.*

The custom Kafka configuration class which reads the above custom configuration:-

KafkaCustomProperties.java
package com.example.kafka.config;

@Configuration
@ConfigurationProperties(prefix = "kafka")
@Getter
@Setter
public class KafkaCustomProperties {
  private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092"));
  private String clientId;
  private Map<String, String> properties = new HashMap<>();
  private Map<String, KafkaProperties.Producer> producer;
  private Map<String, KafkaProperties.Consumer> consumer;
  private KafkaProperties.Ssl ssl = new KafkaProperties.Ssl();
  private KafkaProperties.Security security = new KafkaProperties.Security();

  public Map<String, Object> buildCommonProperties() {
    Map<String, Object> properties = new HashMap<>();
    if (this.bootstrapServers != null) {
      properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
    }
    if (this.clientId != null) {
      properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId);
    }
    properties.putAll(this.ssl.buildProperties());
    properties.putAll(this.security.buildProperties());
    if (!CollectionUtils.isEmpty(this.properties)) {
      properties.putAll(this.properties);
    }
    return properties;
  }
}

Spring Boot Kafka Multiple Producer

Let’s initialize a KafkaTemplate bean for each producer using @Qualifier annotation:-

KafkaMultipleProducerConfig.java
package com.example.kafka.config;

@Configuration
@RequiredArgsConstructor
@Slf4j
public class KafkaMultipleProducerConfig {

  private final KafkaCustomProperties kafkaCustomProperties;

  @Bean
  @Qualifier("producer1")
  public KafkaTemplate<String, Object> producer1KafkaTemplate() {
    return new KafkaTemplate<>(producerFactory("producer1"));
  }

  @Bean
  @Qualifier("producer2")
  public KafkaTemplate<String, Object> producer2KafkaTemplate() {
    return new KafkaTemplate<>(producerFactory("producer2"));
  }

  private ProducerFactory<String, Object> producerFactory(String producerName) {
    Map<String, Object> properties = new HashMap<>(kafkaCustomProperties.buildCommonProperties());
    if (nonNull(kafkaCustomProperties.getProducer())) {
      KafkaProperties.Producer producerProperties = kafkaCustomProperties.getProducer().get(producerName);
      if (nonNull(producerProperties)) {
        properties.putAll(producerProperties.buildProperties());
      }
    }
    log.info("Kafka Producer '{}' properties: {}", producerName, properties);
    return new DefaultKafkaProducerFactory<>(properties);
  }
}

Let’s create two producer service classes, which publish messages to different topics configured in different clusters:-

// Producer 1
@Service
@Slf4j
public class KafkaFirstProducerService implements KafkaProducerService {

  @Qualifier("producer1")
  private KafkaTemplate<String, String> kafkaTemplate;

  @Value("${kafka.producer.producer1.topic}")
  private String topic;

  @Override
  public void send(String message) {
    log.info("sending message from first producer: {}", message);
    kafkaTemplate.send(topic, message);
  }
}

// Producer 2
public class KafkaSecondProducerService {
  @Qualifier("producer2")
  private KafkaTemplate<String, String> kafkaTemplate;

  @Value("${kafka.producer.producer2.topic}")
  private String topic;

  public void send(String message) {
    log.info("sending message from second producer: {}", message);
    kafkaTemplate.send(topic, message);
  }
}

Summary

Spring boot doesn’t provide support for multiple Kafka producer configurations through a property file but we can leverage existing Kafka properties to create a custom configuration to support multiple producers.

This solves the use case where you want to publish the messages to different Kafka topics configured in different Kafka clusters from the same Spring Boot Project.

You can download the complete source code from github and read the official spring documentation Spring for Apache Kafka for further exploration.

Also Read How to Configure Multiple Kafka Consumer in Spring Boot