Configure Kafka Producer and Consumer in spring boot Configure Kafka Producer and Consumer in spring boot

Page content

This post describes how to configure Kafka producer and consumer in spring boot application and also explains how to create service classes to send and receive Kafka messages to and from configured kafka topic respectively.

Setup Spring Boot Project

First of all, if you are using maven to build your spring boot project then add org.springframework.kafka dependency in pom.xml as highlighted below:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.1.3.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.abc</groupId>
	<artifactId>springboot-kafka</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>springboot-kafka</name>
	<description>Kafka configuration for Spring Boot</description>

	<properties>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

Note: If you build your spring boot project using gradle then please add kafka dependency similarly.

Kafka Producer and Consumer Configuration

Next, add the Kafka producer and consumer yml configuration in application.yml.

Things to note here,

  • app.topic.cosumer You can give the comma separated kafka topic names if you want consumer service to consume from multiple kafka topics
  • app.topic.producer You can give only one kafka topic name where you want producer service to publish messages
  • spring.kafka.properties.ssl If you want to configure secure SSL communication between consumer/producer and kafka server then configure key-store and trust-store otherwise remove this config.
  • spring.kafka.properties.ssl.endpoint.identification.algorithm Provide an empty string to this property if you have enabled SSL for kafka, otherwise spring boot startup throw error.
  • spring.kafka.properties.producer Kafka producer config
  • spring.kafka.properties.consumer Kafka consumer config

Here assumption is Kafka is running as a cluster of 3 server - localhost:9200, localhost:9300, localhost:9400

#APP SPECIFIC CUSTOM PROPERTIES
app:
  topic:
    producer: <PRODUCER_TOPIC_NAME>
    consumer: <CONSUMER_TOPIC_NAME_1, CONSUMER_TOPIC_NAME_2, CONSUMER_TOPIC_NAME_3>
#LOGGING PROPERTIES
logging:
  level:
    root: DEBUG
#SPRING PROPETIES
spring:
  kafka:
    properties:
      #Server host name verification is disabled by setting ssl.endpoint.identification.algorithm to an empty string
      ssl.endpoint.identification.algorithm:
    ssl:
      protocol: SSL
      trust-store-location: </app/store/truststore.jks>,
      trust-store-password: <TURST_STORE_PASSWORD>
      key-store-location: </app/store/keystore.jks>
      key-store-password: <KEY_STORE_PASSWORD>
	  key-password: <KEY_PASSWORD>
	producer:
      bootstrap-servers: <localhost:9200,localhost:9300,localhost:9400>
      retries: 0
      acks: all
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      bootstrap-servers: <localhost:9200,localhost:9300,localhost:9400>
      group-id: <KAFKA_CONSUMER_GROUP_ID>
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer


Spring Boot Kafka Producer

If you want to create a producer service to send messages to a Kafka topic then you need to create two Classes,

Create KafkaProducerConfig Class

First, create a KafkaProducerConfig class which uses producer configuration defined in application.yml and define a KafkaTemplate bean which creates an instance of Kafka producer. We will use this instance in our producer service class.

package com.abc.demo.config;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

@Configuration
public class KafkaProducerConfig {

	@Autowired
	private ProducerFactory<Integer, String> producerFactory;
	
	public Map<String, Object> producerConfig(){
		Map<String, Object> kafkaAutoConfig = ((DefaultKafkaProducerFactory<Integer, String>) producerFactory).getConfigurationProperties();
		Map<String, Object> producerConfig = new HashMap<>();
		producerConfig.putAll(kafkaAutoConfig);
		producerConfig.compute(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, (k,v) -> (String)v);
		producerConfig.compute(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, (k,v) -> (String)v);
		producerConfig.compute(SslConfigs.SSL_KEY_PASSWORD_CONFIG, (k,v) -> (String)v);
		return producerConfig;
	}
	
	@Bean
	public KafkaTemplate<String, String> kafkaTemplate(){
		return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfig()));
	}
}


Create KafkaProducerService Class

Second, create a KafkaProducerService class and its implementation class KafkaProducerServiceImpl. We are using producer instance created by KafkaTemplate to send kafka message to given kafka topic.

Please read more about KafkaTemplate which comes with overloaded send method to send messages with key, partition and routing information.

package com.abc.demo.service;

public interface KafkaProducerService {
	public void send(String topic, String data);
}
package com.abc.demo.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerServiceImpl implements KafkaProducerService {

	@Autowired
	private KafkaTemplate<String, String> kafkaTemplate;

	@Override
	public void send(String topic, String data) {
		kafkaTemplate.send(topic, data);
	}
}


Spring Boot Kafka Consumer

If you want to create a consumer service to receive messages from a single Kafka topic or multiple Kafka topics then you need to create two Classes,

Create KafkaConsumerConfig Class

First, create a KafkaConsumerConfig class which uses consumer configuration defined in application.yml and define a ConcurrentKafkaListenerContainerFactory bean which is responsible to create listener for given Kafka bootstrap server. We have also used @EnableKafka annotation at class level which tells spring boot to detect @KafkaListener annotation applied to any method in spring boot application. We will use this annotation in our consumer service class.

package com.abc.demo.config;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.common.config.SslConfigs;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

	@Autowired
	private ConsumerFactory<Integer, String> consumerFactory;
	
	public Map<String, Object> consumerConfig(){
		Map<String, Object> kafkaAutoConfig = ((DefaultKafkaConsumerFactory<Integer, String>) consumerFactory).getConfigurationProperties();
		Map<String, Object> consumerConfig = new HashMap<>();
		consumerConfig.putAll(kafkaAutoConfig);
		consumerConfig.compute(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, (k,v) -> (String)v);
		consumerConfig.compute(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, (k,v) -> String)v);
		consumerConfig.compute(SslConfigs.SSL_KEY_PASSWORD_CONFIG, (k,v) -> (String)v);
		return consumerConfig;
	}
	
	@Bean
	public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(){
		ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig()));
		return factory;
	}
}


Create KafkaConsumerService Class

Second, create a KafkaConsumerService class and its implementation class KafkaConsumerServiceImpl. We have used @KafkaListener annotation at method passing kafka consumer topic names, which will be detected by spring boot application automatically. You will be able to consumer messages from kafka topic using this method.

package com.abc.demo.service;

public interface KafkaConsumerService {
	public void receive(String data);
}
package com.abc.demo.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerServiceImpl implements KafkaConsumerService{

	private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerServiceImpl.class);
	
	@KafkaListener(topics = {"#{'${spring.kafka.consumer.topic}'.split(',')}"})
	public void receive(@Payload String data) {
		logger.info("data: {}", data);
	}
	
}


Summary

Spring boot provides a wrapper over kafka producer and consumer implementation in Java which helps us to easily configure-

  • Kafka Producer using KafkaTemplate which provides overloaded send method to send messages in multiple ways with keys, partitions and routing information.
  • Kafka Consumer using @EnableKafka annotation which auto detects @KafkaListener annotation applied to any method and that methods becomes a Kafka Listener.

You can download complete source code from github