Configure Multiple Kafka Consumer in Spring Boot
			
		This post describes how to configure Multiple Kafka Consumer in Spring Boot application from a property file having different configurations such as Kafka cluster, topic, etc.
Setup Spring Boot Project
It is recommended to use Spring Initializr to generate the initial project. Our project should have Web and Kafka dependencies.
Maven Project
Click on the below link to generate maven project with pre-defined configuration:-
A typical pom.xml file for a kafka project look like this:-
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-test</artifactId>
	<scope>test</scope>
</dependency>
<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka-test</artifactId>
	<scope>test</scope>
</dependency>
<dependency>
  <groupId>org.projectlombok</groupId>
  <artifactId>lombok</artifactId>
  <optional>true</optional>
</dependency>
Gradle Project
Click on the below link to generate gradle project with pre-defined configuration:-
A typical build.gradle file for a kafka project look like this:-
dependencies {
  implementation 'org.springframework.boot:spring-boot-starter-web'
  implementation 'org.springframework.kafka:spring-kafka'
  testImplementation 'org.springframework.boot:spring-boot-starter-test'
  testImplementation 'org.springframework.kafka:spring-kafka-test'
  compileOnly 'org.projectlombok:lombok'
  annotationProcessor 'org.projectlombok:lombok'
}
Kafka Custom Properties
Spring Boot Kafka only provides support for single-consumer configuration. For example:-
spring:
  kafka:
    bootstrap-servers: localhost:9092, localhost:9093, localhost:9094
    consumer:
      group-id: group-1
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
We will write a custom class KafkaCustomProperties which will read the property file having multiple consumer configurations. A typical example of  multiple consumer configuration properties where we have two consumers consumer1 and consumer2 consuming messages from different topics configured in different Kafka clusters:-
application.yml
kafka:
  consumer:
    consumer1:
      bootstrap-servers: server1:9092, server2:9092, server3:9092
      topic: topic1
      group-id: group1
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    consumer2:
      bootstrap-servers: another.server1:9092, another.server2:9092, another.server3:9092
      topic: topic2
      group-id: group2
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    consumer3:
      ...
    consumer4:
      ...
The value for bootstrap-servers in the above configuration is shown for illustration purposes. It should be replaced with Kafka cluster servers in comma-separated values.
Here is a list of important custom auto-configuration properties:-
| Property | Description | 
|---|---|
kafka.bootstrap-servers | 
Comma separated list of kafka servers (host:port) running as a cluster. Applies to all the consumers unless overridden. | 
kafka.consumer.consumer1.bootstrap-servers | 
Kafka bootstrap server for consumer1. Overrides kafka.bootstrap-servers | 
kafka.client-id | 
Client-ID to pass to the server when making requests. Used for server-side logging. | 
kafka.consumer.consumer1.client-id | 
Client-ID to pass for consumer1. Overrides kafka.client-id | 
kafka.ssl.* | 
Kafka SSL configuration is to provide secure communication between producer/consumer and Kafka server. You need to generate key-store and trust-store files and configure the location and password | 
kafka.consumer.consumer1.* | 
Kafka Consumer related configurations. Support all configurations same as spring.kafka.consumer.* | 
The custom Kafka configuration class which reads the above custom configuration:-
KafkaCustomProperties.java
package com.example.kafka.config;
@Configuration
@ConfigurationProperties(prefix = "kafka")
@Getter
@Setter
public class KafkaCustomProperties {
  private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092"));
  private String clientId;
  private Map<String, String> properties = new HashMap<>();
  private Map<String, KafkaProperties.Producer> producer;
  private Map<String, KafkaProperties.Consumer> consumer;
  private KafkaProperties.Ssl ssl = new KafkaProperties.Ssl();
  private KafkaProperties.Security security = new KafkaProperties.Security();
  public Map<String, Object> buildCommonProperties() {
    Map<String, Object> properties = new HashMap<>();
    if (this.bootstrapServers != null) {
      properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
    }
    if (this.clientId != null) {
      properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId);
    }
    properties.putAll(this.ssl.buildProperties());
    properties.putAll(this.security.buildProperties());
    if (!CollectionUtils.isEmpty(this.properties)) {
      properties.putAll(this.properties);
    }
    return properties;
  }
}
Spring Boot Kafka Multiple Consumer
Let’s initialize a ConcurrentKafkaListenerContainerFactory bean for each consumer using @Qualifier annotation:-
KafkaMultipleConsumerConfig.java
package com.example.kafka.multi.config;
@Configuration
@RequiredArgsConstructor
@Slf4j
public class KafkaMultipleConsumerConfig {
  private final KafkaCustomProperties kafkaCustomProperties;
  @Bean
  @Qualifier("consumer1")
  public ConcurrentKafkaListenerContainerFactory<String, String> consumer1ContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory("consumer1"));
    return factory;
  }
  @Bean
  @Qualifier("consumer2")
  public ConcurrentKafkaListenerContainerFactory<String, String> consumer2ContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory("consumer2"));
    return factory;
  }
  private ConsumerFactory<String, Object> consumerFactory(String consumerName) {
    Map<String, Object> properties = new HashMap<>(kafkaCustomProperties.buildCommonProperties());
    if (nonNull(kafkaCustomProperties.getConsumer())) {
      KafkaProperties.Consumer consumerProperties = kafkaCustomProperties.getConsumer().get(consumerName);
      if (nonNull(consumerProperties)) {
        properties.putAll(consumerProperties.buildProperties());
      }
    }
    log.info("Kafka Consumer '{}' properties: {}", consumerName, properties);
    return new DefaultKafkaConsumerFactory<>(properties);
  }
}
Let’s create two consumer service classes, which consume messages from different topics configured in different clusters:-
// Consumer 1
@Service
@Slf4j
public class KafkaFirstConsumerServiceImpl implements KafkaConsumerService {
    @KafkaListener(topics = {"${kafka.consumer.consumer1.topic}"}, groupId = "${kafka.consumer.consumer1.group-id}", 
      containerFactory = "consumer1ContainerFactory")
    public void receive(@Payload String message) {
        log.info("message received in consumer1: {}", message);
    }
}
// Consumer 2
@Service
@Slf4j
public class KafkaSecondConsumerServiceImpl implements KafkaConsumerService {
    @KafkaListener(topics = {"${kafka.consumer.consumer2.topic}"}, groupId = "${kafka.consumer.consumer2.group-id}", 
      containerFactory = "consumer2ContainerFactory")
    public void receive(@Payload String message) {
        log.info("message received in consumer2: {}", message);
    }
}
Make note of containerFactory passed in @KafkaListener annotation, which tells which consumer configuration to use.
Summary
Spring boot doesn’t provide support for multiple Kafka consumer configurations through a property file but we can leverage existing Kafka properties to create a custom configuration to support multiple consumers.
This solves the use case where you want to consume the messages from different Kafka topics configured in different Kafka clusters from the same Spring Boot Project.
You can download the complete source code from github and read the official spring documentation Spring for Apache Kafka for further exploration.
Also Read How to Configure Multiple Kafka Producer in Spring Boot