Configure Kafka Producer and Consumer in spring boot
This post describes how to configure Kafka producer and consumer in spring boot application and also explains how to create service classes to send and receive Kafka messages to and from configured kafka topic respectively.
Setup Spring Boot Project
First of all, if you are using maven to build your spring boot project then add org.springframework.kafka dependency in pom.xml as highlighted below:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.3.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.abc</groupId>
<artifactId>springboot-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot-kafka</name>
<description>Kafka configuration for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Note: If you build your spring boot project using gradle then please add kafka dependency similarly.
Kafka Producer and Consumer Configuration
Next, add the Kafka producer and consumer yml configuration in application.yml.
Things to note here,
- app.topic.cosumer You can give the comma separated kafka topic names if you want consumer service to consume from multiple kafka topics
- app.topic.producer You can give only one kafka topic name where you want producer service to publish messages
- spring.kafka.properties.ssl If you want to configure secure SSL communication between consumer/producer and kafka server then configure key-store and trust-store otherwise remove this config.
- spring.kafka.properties.ssl.endpoint.identification.algorithm Provide an empty string to this property if you have enabled SSL for kafka, otherwise spring boot startup throw error.
- spring.kafka.properties.producer Kafka producer config
- spring.kafka.properties.consumer Kafka consumer config
Here assumption is Kafka is running as a cluster of 3 server - localhost:9200, localhost:9300, localhost:9400
#APP SPECIFIC CUSTOM PROPERTIES
app:
topic:
producer: <PRODUCER_TOPIC_NAME>
consumer: <CONSUMER_TOPIC_NAME_1, CONSUMER_TOPIC_NAME_2, CONSUMER_TOPIC_NAME_3>
#LOGGING PROPERTIES
logging:
level:
root: DEBUG
#SPRING PROPETIES
spring:
kafka:
properties:
#Server host name verification is disabled by setting ssl.endpoint.identification.algorithm to an empty string
ssl.endpoint.identification.algorithm:
ssl:
protocol: SSL
trust-store-location: </app/store/truststore.jks>,
trust-store-password: <TURST_STORE_PASSWORD>
key-store-location: </app/store/keystore.jks>
key-store-password: <KEY_STORE_PASSWORD>
key-password: <KEY_PASSWORD>
producer:
bootstrap-servers: <localhost:9200,localhost:9300,localhost:9400>
retries: 0
acks: all
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
bootstrap-servers: <localhost:9200,localhost:9300,localhost:9400>
group-id: <KAFKA_CONSUMER_GROUP_ID>
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Spring Boot Kafka Producer
If you want to create a producer service to send messages to a Kafka topic then you need to create two Classes,
Create KafkaProducerConfig Class
First, create a KafkaProducerConfig class which uses producer configuration defined in application.yml and define a KafkaTemplate
bean which creates an instance of Kafka producer. We will use this instance in our producer service class.
package com.abc.demo.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
@Configuration
public class KafkaProducerConfig {
@Autowired
private ProducerFactory<Integer, String> producerFactory;
public Map<String, Object> producerConfig(){
Map<String, Object> kafkaAutoConfig = ((DefaultKafkaProducerFactory<Integer, String>) producerFactory).getConfigurationProperties();
Map<String, Object> producerConfig = new HashMap<>();
producerConfig.putAll(kafkaAutoConfig);
producerConfig.compute(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, (k,v) -> (String)v);
producerConfig.compute(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, (k,v) -> (String)v);
producerConfig.compute(SslConfigs.SSL_KEY_PASSWORD_CONFIG, (k,v) -> (String)v);
return producerConfig;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(){
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfig()));
}
}
Create KafkaProducerService Class
Second, create a KafkaProducerService class and its implementation class KafkaProducerServiceImpl. We are using producer instance created by KafkaTemplate to send kafka message to given kafka topic.
Please read more about KafkaTemplate which comes with overloaded send method to send messages with key, partition and routing information.
package com.abc.demo.service;
public interface KafkaProducerService {
public void send(String topic, String data);
}
package com.abc.demo.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerServiceImpl implements KafkaProducerService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Override
public void send(String topic, String data) {
kafkaTemplate.send(topic, data);
}
}
Spring Boot Kafka Consumer
If you want to create a consumer service to receive messages from a single Kafka topic or multiple Kafka topics then you need to create two Classes,
Create KafkaConsumerConfig Class
First, create a KafkaConsumerConfig class which uses consumer configuration defined in application.yml and define a ConcurrentKafkaListenerContainerFactory
bean which is responsible to create listener for given Kafka bootstrap server. We have also used @EnableKafka
annotation at class level which tells spring boot to detect @KafkaListener
annotation applied to any method in spring boot application. We will use this annotation in our consumer service class.
package com.abc.demo.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Autowired
private ConsumerFactory<Integer, String> consumerFactory;
public Map<String, Object> consumerConfig(){
Map<String, Object> kafkaAutoConfig = ((DefaultKafkaConsumerFactory<Integer, String>) consumerFactory).getConfigurationProperties();
Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.putAll(kafkaAutoConfig);
consumerConfig.compute(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, (k,v) -> (String)v);
consumerConfig.compute(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, (k,v) -> String)v);
consumerConfig.compute(SslConfigs.SSL_KEY_PASSWORD_CONFIG, (k,v) -> (String)v);
return consumerConfig;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig()));
return factory;
}
}
Create KafkaConsumerService Class
Second, create a KafkaConsumerService class and its implementation class KafkaConsumerServiceImpl. We have used @KafkaListener
annotation at method passing kafka consumer topic names, which will be detected by spring boot application automatically. You will be able to consumer messages from kafka topic using this method.
package com.abc.demo.service;
public interface KafkaConsumerService {
public void receive(String data);
}
package com.abc.demo.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerServiceImpl implements KafkaConsumerService{
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerServiceImpl.class);
@KafkaListener(topics = {"#{'${spring.kafka.consumer.topic}'.split(',')}"})
public void receive(@Payload String data) {
logger.info("data: {}", data);
}
}
Summary
Spring boot provides a wrapper over kafka producer and consumer implementation in Java which helps us to easily configure-
- Kafka Producer using
KafkaTemplate
which provides overloaded send method to send messages in multiple ways with keys, partitions and routing information. - Kafka Consumer using
@EnableKafka
annotation which auto detects@KafkaListener
annotation applied to any method and that methods becomes a Kafka Listener.
You can download complete source code from github