Configure Multiple Kafka Producer in Spring Boot
This post describes how to configure Multiple Kafka Producer in Spring Boot application from a property file having different configurations such as Kafka cluster, topic, etc.
Setup Spring Boot Project
It is recommended to use Spring Initializr to generate the initial project. Our project should have Web and Kafka dependencies.
Maven Project
Click on the below link to generate maven project with pre-defined configuration:-
A typical pom.xml file for a kafka project look like this:-
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
Gradle Project
Click on the below link to generate gradle project with pre-defined configuration:-
A typical build.gradle file for a kafka project look like this:-
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
}
Kafka Custom Properties
Spring Boot Kafka only provides support for single-producer configuration. For example:-
spring:
kafka:
bootstrap-servers: localhost:9092, localhost:9093, localhost:9094
producer:
retries: 0
acks: all
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
We will write a custom class KafkaCustomProperties
which will read the property file having multiple producer configurations. A typical example of multiple producer configuration properties where we have two producers producer1
and producer2
publishing messages to different Kafka clusters defined in bootstrap-servers
:-
application.yml
kafka:
producer:
producer1:
topic: topic1
bootstrap-servers: server1:9092, server2:9092, server3:9092
retries: 0
acks: all
producer2:
topic: topic2
bootstrap-servers: another.server1:9092, another.server2:9092, another.server3:9092
retries: 2
acks: 1
producer3:
...
producer4:
...
The value for bootstrap-servers
in the above configuration is shown for illustration purposes. It should be replaced with Kafka cluster servers in comma-separated values.
Here is a list of important custom auto-configuration properties:-
Property | Description |
---|---|
kafka.bootstrap-servers |
Comma separated list of kafka servers (host:port) running as a cluster. Applies to all the producers unless overridden. |
kafka.producer.producer1.bootstrap-servers |
Kafka bootstrap server for producer1. Overrides kafka.bootstrap-servers |
kafka.client-id |
Client-ID to pass to the server when making requests. Used for server-side logging. |
kafka.producer.producer1.client-id |
Client-ID to pass for producer1. Overrides kafka.client-id |
kafka.ssl.* |
Kafka SSL configuration is to provide secure communication between producer/consumer and Kafka server. You need to generate key-store and trust-store files and configure the location and password |
kafka.producer.producer1.* |
Kafka Producer related configurations. Support all configurations same as spring.kafka.producer.* |
The custom Kafka configuration class which reads the above custom configuration:-
KafkaCustomProperties.java
package com.example.kafka.config;
@Configuration
@ConfigurationProperties(prefix = "kafka")
@Getter
@Setter
public class KafkaCustomProperties {
private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092"));
private String clientId;
private Map<String, String> properties = new HashMap<>();
private Map<String, KafkaProperties.Producer> producer;
private Map<String, KafkaProperties.Consumer> consumer;
private KafkaProperties.Ssl ssl = new KafkaProperties.Ssl();
private KafkaProperties.Security security = new KafkaProperties.Security();
public Map<String, Object> buildCommonProperties() {
Map<String, Object> properties = new HashMap<>();
if (this.bootstrapServers != null) {
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
}
if (this.clientId != null) {
properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId);
}
properties.putAll(this.ssl.buildProperties());
properties.putAll(this.security.buildProperties());
if (!CollectionUtils.isEmpty(this.properties)) {
properties.putAll(this.properties);
}
return properties;
}
}
Spring Boot Kafka Multiple Producer
Let’s initialize a KafkaTemplate
bean for each producer using @Qualifier
annotation:-
KafkaMultipleProducerConfig.java
package com.example.kafka.config;
@Configuration
@RequiredArgsConstructor
@Slf4j
public class KafkaMultipleProducerConfig {
private final KafkaCustomProperties kafkaCustomProperties;
@Bean
@Qualifier("producer1")
public KafkaTemplate<String, Object> producer1KafkaTemplate() {
return new KafkaTemplate<>(producerFactory("producer1"));
}
@Bean
@Qualifier("producer2")
public KafkaTemplate<String, Object> producer2KafkaTemplate() {
return new KafkaTemplate<>(producerFactory("producer2"));
}
private ProducerFactory<String, Object> producerFactory(String producerName) {
Map<String, Object> properties = new HashMap<>(kafkaCustomProperties.buildCommonProperties());
if (nonNull(kafkaCustomProperties.getProducer())) {
KafkaProperties.Producer producerProperties = kafkaCustomProperties.getProducer().get(producerName);
if (nonNull(producerProperties)) {
properties.putAll(producerProperties.buildProperties());
}
}
log.info("Kafka Producer '{}' properties: {}", producerName, properties);
return new DefaultKafkaProducerFactory<>(properties);
}
}
Let’s create two producer service classes, which publish messages to different topics configured in different clusters:-
// Producer 1
@Service
@Slf4j
public class KafkaFirstProducerService implements KafkaProducerService {
@Qualifier("producer1")
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${kafka.producer.producer1.topic}")
private String topic;
@Override
public void send(String message) {
log.info("sending message from first producer: {}", message);
kafkaTemplate.send(topic, message);
}
}
// Producer 2
public class KafkaSecondProducerService {
@Qualifier("producer2")
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${kafka.producer.producer2.topic}")
private String topic;
public void send(String message) {
log.info("sending message from second producer: {}", message);
kafkaTemplate.send(topic, message);
}
}
Summary
Spring boot doesn’t provide support for multiple Kafka producer configurations through a property file but we can leverage existing Kafka properties to create a custom configuration to support multiple producers.
This solves the use case where you want to publish the messages to different Kafka topics configured in different Kafka clusters from the same Spring Boot Project.
You can download the complete source code from github and read the official spring documentation Spring for Apache Kafka for further exploration.
Also Read How to Configure Multiple Kafka Consumer in Spring Boot