Configure Multiple Kafka Consumer in Spring Boot Configure Multiple Kafka Consumer in Spring Boot

Page content

This post describes how to configure Multiple Kafka Consumer in Spring Boot application from a property file having different configurations such as Kafka cluster, topic, etc.

Setup Spring Boot Project

It is recommended to use Spring Initializr to generate the initial project. Our project should have Web and Kafka dependencies.

Maven Project

Click on the below link to generate maven project with pre-defined configuration:-

https://start.spring.io/#!type=maven-project&language=java&platformVersion=3.0.10&packaging=jar&jvmVersion=11&groupId=com.example&artifactId=springboot-kafka&name=springboot-kafka&description=Kafka%20producer%20and%20consumer%20configuration&packageName=com.example.kafka&dependencies=web,kafka,lombok

A typical pom.xml file for a kafka project look like this:-

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-test</artifactId>
	<scope>test</scope>
</dependency>
<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka-test</artifactId>
	<scope>test</scope>
</dependency>
<dependency>
  <groupId>org.projectlombok</groupId>
  <artifactId>lombok</artifactId>
  <optional>true</optional>
</dependency>

Gradle Project

Click on the below link to generate gradle project with pre-defined configuration:-

https://start.spring.io/#!type=gradle-project&language=java&platformVersion=3.0.10&packaging=jar&jvmVersion=11&groupId=com.example&artifactId=springboot-kafka&name=springboot-kafka&description=Kafka%20producer%20and%20consumer%20configuration&packageName=com.example.kafka&dependencies=web,kafka,lombok

A typical build.gradle file for a kafka project look like this:-

dependencies {
  implementation 'org.springframework.boot:spring-boot-starter-web'
  implementation 'org.springframework.kafka:spring-kafka'
  testImplementation 'org.springframework.boot:spring-boot-starter-test'
  testImplementation 'org.springframework.kafka:spring-kafka-test'
  compileOnly 'org.projectlombok:lombok'
  annotationProcessor 'org.projectlombok:lombok'
}

Kafka Custom Properties

Spring Boot Kafka only provides support for single-consumer configuration. For example:-

spring:
  kafka:
    bootstrap-servers: localhost:9092, localhost:9093, localhost:9094
    consumer:
      group-id: group-1
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

We will write a custom class KafkaCustomProperties which will read the property file having multiple consumer configurations. A typical example of multiple consumer configuration properties where we have two consumers consumer1 and consumer2 consuming messages from different topics configured in different Kafka clusters:-

application.yml
kafka:
  consumer:
    consumer1:
      bootstrap-servers: server1:9092, server2:9092, server3:9092
      topic: topic1
      group-id: group1
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    consumer2:
      bootstrap-servers: another.server1:9092, another.server2:9092, another.server3:9092
      topic: topic2
      group-id: group2
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    consumer3:
      ...
    consumer4:
      ...

The value for bootstrap-servers in the above configuration is shown for illustration purposes. It should be replaced with Kafka cluster servers in comma-separated values.

Here is a list of important custom auto-configuration properties:-

Property Description
kafka.bootstrap-servers Comma separated list of kafka servers (host:port) running as a cluster. Applies to all the consumers unless overridden.
kafka.consumer.consumer1.bootstrap-servers Kafka bootstrap server for consumer1. Overrides kafka.bootstrap-servers
kafka.client-id Client-ID to pass to the server when making requests. Used for server-side logging.
kafka.consumer.consumer1.client-id Client-ID to pass for consumer1. Overrides kafka.client-id
kafka.ssl.* Kafka SSL configuration is to provide secure communication between producer/consumer and Kafka server. You need to generate key-store and trust-store files and configure the location and password
kafka.consumer.consumer1.* Kafka Consumer related configurations. Support all configurations same as spring.kafka.consumer.*

The custom Kafka configuration class which reads the above custom configuration:-

KafkaCustomProperties.java
package com.example.kafka.config;

@Configuration
@ConfigurationProperties(prefix = "kafka")
@Getter
@Setter
public class KafkaCustomProperties {
  private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092"));
  private String clientId;
  private Map<String, String> properties = new HashMap<>();
  private Map<String, KafkaProperties.Producer> producer;
  private Map<String, KafkaProperties.Consumer> consumer;
  private KafkaProperties.Ssl ssl = new KafkaProperties.Ssl();
  private KafkaProperties.Security security = new KafkaProperties.Security();

  public Map<String, Object> buildCommonProperties() {
    Map<String, Object> properties = new HashMap<>();
    if (this.bootstrapServers != null) {
      properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
    }
    if (this.clientId != null) {
      properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId);
    }
    properties.putAll(this.ssl.buildProperties());
    properties.putAll(this.security.buildProperties());
    if (!CollectionUtils.isEmpty(this.properties)) {
      properties.putAll(this.properties);
    }
    return properties;
  }
}

Spring Boot Kafka Multiple Consumer

Let’s initialize a ConcurrentKafkaListenerContainerFactory bean for each consumer using @Qualifier annotation:-

KafkaMultipleConsumerConfig.java
package com.example.kafka.multi.config;

@Configuration
@RequiredArgsConstructor
@Slf4j
public class KafkaMultipleConsumerConfig {

  private final KafkaCustomProperties kafkaCustomProperties;

  @Bean
  @Qualifier("consumer1")
  public ConcurrentKafkaListenerContainerFactory<String, String> consumer1ContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory("consumer1"));
    return factory;
  }

  @Bean
  @Qualifier("consumer2")
  public ConcurrentKafkaListenerContainerFactory<String, String> consumer2ContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory("consumer2"));
    return factory;
  }

  private ConsumerFactory<String, Object> consumerFactory(String consumerName) {
    Map<String, Object> properties = new HashMap<>(kafkaCustomProperties.buildCommonProperties());
    if (nonNull(kafkaCustomProperties.getConsumer())) {
      KafkaProperties.Consumer consumerProperties = kafkaCustomProperties.getConsumer().get(consumerName);
      if (nonNull(consumerProperties)) {
        properties.putAll(consumerProperties.buildProperties());
      }
    }
    log.info("Kafka Consumer '{}' properties: {}", consumerName, properties);
    return new DefaultKafkaConsumerFactory<>(properties);
  }
}

Let’s create two consumer service classes, which consume messages from different topics configured in different clusters:-

// Consumer 1
@Service
@Slf4j
public class KafkaFirstConsumerServiceImpl implements KafkaConsumerService {

    @KafkaListener(topics = {"${kafka.consumer.consumer1.topic}"}, groupId = "${kafka.consumer.consumer1.group-id}", 
      containerFactory = "consumer1ContainerFactory")
    public void receive(@Payload String message) {
        log.info("message received in consumer1: {}", message);
    }
}

// Consumer 2
@Service
@Slf4j
public class KafkaSecondConsumerServiceImpl implements KafkaConsumerService {

    @KafkaListener(topics = {"${kafka.consumer.consumer2.topic}"}, groupId = "${kafka.consumer.consumer2.group-id}", 
      containerFactory = "consumer2ContainerFactory")
    public void receive(@Payload String message) {
        log.info("message received in consumer2: {}", message);
    }
}

Make note of containerFactory passed in @KafkaListener annotation, which tells which consumer configuration to use.

Summary

Spring boot doesn’t provide support for multiple Kafka consumer configurations through a property file but we can leverage existing Kafka properties to create a custom configuration to support multiple consumers.

This solves the use case where you want to consume the messages from different Kafka topics configured in different Kafka clusters from the same Spring Boot Project.

You can download the complete source code from github and read the official spring documentation Spring for Apache Kafka for further exploration.

Also Read How to Configure Multiple Kafka Producer in Spring Boot