Configure Multiple Kafka Consumer in Spring Boot
This post describes how to configure Multiple Kafka Consumer in Spring Boot application from a property file having different configurations such as Kafka cluster, topic, etc.
Setup Spring Boot Project
It is recommended to use Spring Initializr to generate the initial project. Our project should have Web and Kafka dependencies.
Maven Project
Click on the below link to generate maven project with pre-defined configuration:-
A typical pom.xml file for a kafka project look like this:-
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
Gradle Project
Click on the below link to generate gradle project with pre-defined configuration:-
A typical build.gradle file for a kafka project look like this:-
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
}
Kafka Custom Properties
Spring Boot Kafka only provides support for single-consumer configuration. For example:-
spring:
kafka:
bootstrap-servers: localhost:9092, localhost:9093, localhost:9094
consumer:
group-id: group-1
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
We will write a custom class KafkaCustomProperties
which will read the property file having multiple consumer configurations. A typical example of multiple consumer configuration properties where we have two consumers consumer1
and consumer2
consuming messages from different topics configured in different Kafka clusters:-
application.yml
kafka:
consumer:
consumer1:
bootstrap-servers: server1:9092, server2:9092, server3:9092
topic: topic1
group-id: group1
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
consumer2:
bootstrap-servers: another.server1:9092, another.server2:9092, another.server3:9092
topic: topic2
group-id: group2
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
consumer3:
...
consumer4:
...
The value for bootstrap-servers
in the above configuration is shown for illustration purposes. It should be replaced with Kafka cluster servers in comma-separated values.
Here is a list of important custom auto-configuration properties:-
Property | Description |
---|---|
kafka.bootstrap-servers |
Comma separated list of kafka servers (host:port) running as a cluster. Applies to all the consumers unless overridden. |
kafka.consumer.consumer1.bootstrap-servers |
Kafka bootstrap server for consumer1. Overrides kafka.bootstrap-servers |
kafka.client-id |
Client-ID to pass to the server when making requests. Used for server-side logging. |
kafka.consumer.consumer1.client-id |
Client-ID to pass for consumer1. Overrides kafka.client-id |
kafka.ssl.* |
Kafka SSL configuration is to provide secure communication between producer/consumer and Kafka server. You need to generate key-store and trust-store files and configure the location and password |
kafka.consumer.consumer1.* |
Kafka Consumer related configurations. Support all configurations same as spring.kafka.consumer.* |
The custom Kafka configuration class which reads the above custom configuration:-
KafkaCustomProperties.java
package com.example.kafka.config;
@Configuration
@ConfigurationProperties(prefix = "kafka")
@Getter
@Setter
public class KafkaCustomProperties {
private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092"));
private String clientId;
private Map<String, String> properties = new HashMap<>();
private Map<String, KafkaProperties.Producer> producer;
private Map<String, KafkaProperties.Consumer> consumer;
private KafkaProperties.Ssl ssl = new KafkaProperties.Ssl();
private KafkaProperties.Security security = new KafkaProperties.Security();
public Map<String, Object> buildCommonProperties() {
Map<String, Object> properties = new HashMap<>();
if (this.bootstrapServers != null) {
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
}
if (this.clientId != null) {
properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId);
}
properties.putAll(this.ssl.buildProperties());
properties.putAll(this.security.buildProperties());
if (!CollectionUtils.isEmpty(this.properties)) {
properties.putAll(this.properties);
}
return properties;
}
}
Spring Boot Kafka Multiple Consumer
Let’s initialize a ConcurrentKafkaListenerContainerFactory
bean for each consumer using @Qualifier
annotation:-
KafkaMultipleConsumerConfig.java
package com.example.kafka.multi.config;
@Configuration
@RequiredArgsConstructor
@Slf4j
public class KafkaMultipleConsumerConfig {
private final KafkaCustomProperties kafkaCustomProperties;
@Bean
@Qualifier("consumer1")
public ConcurrentKafkaListenerContainerFactory<String, String> consumer1ContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory("consumer1"));
return factory;
}
@Bean
@Qualifier("consumer2")
public ConcurrentKafkaListenerContainerFactory<String, String> consumer2ContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory("consumer2"));
return factory;
}
private ConsumerFactory<String, Object> consumerFactory(String consumerName) {
Map<String, Object> properties = new HashMap<>(kafkaCustomProperties.buildCommonProperties());
if (nonNull(kafkaCustomProperties.getConsumer())) {
KafkaProperties.Consumer consumerProperties = kafkaCustomProperties.getConsumer().get(consumerName);
if (nonNull(consumerProperties)) {
properties.putAll(consumerProperties.buildProperties());
}
}
log.info("Kafka Consumer '{}' properties: {}", consumerName, properties);
return new DefaultKafkaConsumerFactory<>(properties);
}
}
Let’s create two consumer service classes, which consume messages from different topics configured in different clusters:-
// Consumer 1
@Service
@Slf4j
public class KafkaFirstConsumerServiceImpl implements KafkaConsumerService {
@KafkaListener(topics = {"${kafka.consumer.consumer1.topic}"}, groupId = "${kafka.consumer.consumer1.group-id}",
containerFactory = "consumer1ContainerFactory")
public void receive(@Payload String message) {
log.info("message received in consumer1: {}", message);
}
}
// Consumer 2
@Service
@Slf4j
public class KafkaSecondConsumerServiceImpl implements KafkaConsumerService {
@KafkaListener(topics = {"${kafka.consumer.consumer2.topic}"}, groupId = "${kafka.consumer.consumer2.group-id}",
containerFactory = "consumer2ContainerFactory")
public void receive(@Payload String message) {
log.info("message received in consumer2: {}", message);
}
}
Make note of containerFactory
passed in @KafkaListener
annotation, which tells which consumer configuration to use.
Summary
Spring boot doesn’t provide support for multiple Kafka consumer configurations through a property file but we can leverage existing Kafka properties to create a custom configuration to support multiple consumers.
This solves the use case where you want to consume the messages from different Kafka topics configured in different Kafka clusters from the same Spring Boot Project.
You can download the complete source code from github and read the official spring documentation Spring for Apache Kafka for further exploration.
Also Read How to Configure Multiple Kafka Producer in Spring Boot