Overview
Kafka is a distributed system consisting of servers and clients that communicate via a high-performance TCP network protocol. It can be deployed on bare-metal hardware, virtual machines, and containers in on-premise as well as cloud environments.
Servers: Kafka is run as a cluster of one or more servers that can span multiple datacenters or cloud regions. Some of these servers form the storage layer, called the brokers. Other servers run Kafka Connect to continuously import and export data as event streams to integrate Kafka with your existing systems such as relational databases as well as other Kafka clusters. To let you implement mission-critical use cases, a Kafka cluster is highly scalable and fault-tolerant: if any of its servers fails, the other servers will take over their work to ensure continuous operations without any data loss.
Clients: They allow you to write distributed applications and microservices that read, write, and process streams of events in parallel, at scale, and in a fault-tolerant manner even in the case of network problems or machine failures. Kafka ships with some such clients included, which are augmented by dozens of clients provided by the Kafka community: clients are available for Java and Scala including the higher-level Kafka Streams library, for Go, Python, C/C++, and many other programming languages as well as REST APIs.
Prerequisites
Requirement pre-installed docker
and docker-compose
We use kafka server using docker-compose
like following below, or you can use from original docs on Quick Start for Apache Kafka using Confluent Platform (Docker)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| ---
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.2.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:6.2.0
hostname: kafka
container_name: kafka
depends_on:
- zookeeper
ports:
- "29092:29092"
- "9092:9092"
- "9101:9101"
restart: always
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
|
And run on terminal with following command
For stopping server with following command
Project Setup and Dependencies
I’m depending Spring Initializr for this as it is much easier. And we have to create two spring boot projects and started with maven project.
Our example application will be a Spring Boot application. So we need to add spring-kafka
and spring-boot-starter-web
dependency to our pom.xml
.
1
2
3
4
5
6
7
8
9
| <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.4</version>
</dependency>
|
We also need to create application.yml
for configuration file.
1
2
3
4
| kafka:
bootstrapAddress: "http://localhost:9092"
server:
port: 8080
|
Implementation
Configuring Topics
Create constant com.piinalpin.kafkademo.constant.KafkaTopicConstant
.
1
2
3
4
5
| public class KafkaTopicConstant {
public final static String HELLO_WORLD = "hello-world";
}
|
Create bean configuration com.piinalpin.kafkademo.config.KafkaTopicConfiguration
to define topics on Kafka.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| @Configuration
public class KafkaTopicConfiguration {
@Value("${kafka.bootstrapAddress:}")
private String bootstrapAddress;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> config = new HashMap<>();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(config);
}
@Bean
public NewTopic helloWorld() {
return new NewTopic(KafkaTopicConstant.HELLO_WORLD, 0, (short) 1);
}
}
|
Producer Configuration
Create bean configuration com.piinalpin.kafkademo.config.KafkaProducerConfiguration
to define producer configuration bean on Kafka.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| @Configuration
public class KafkaProducerConfiguration {
@Value("${kafka.bootstrapAddress:}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
|
Publisher Service
Create service com.piinalpin.kafkademo.service.KafkaPublisherService
to publish a message.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| @Service
public class KafkaPublisherService {
public final static Logger log = LoggerFactory.getLogger(KafkaProducerService.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String message) {
log.info("Sending message to Kafka...");
log.info(String.format("Payload: %s, Topic: %s", KafkaTopicConstant.HELLO_WORLD, message));
kafkaTemplate.send(KafkaTopicConstant.HELLO_WORLD, message);
}
}
|
Send Message
Create rest controller com.piinalpin.kafkademo.controller.KafkaDemoController
to send a message via rest.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| @RestController
public class KafkaDemoController {
@Autowired
private KafkaPublisherService kafkaPublisherService;
@GetMapping(value = "/")
public Map<String, String> main() {
return okMessage("ok");
}
@PostMapping(value = "/greeting")
public Map<String, String> greeting(@RequestBody Map<String, String> request) {
if (null != request.get("greeting")) {
kafkaPublisherService.send(request.get("greeting"));
}
return okMessage("Sending message...");
}
private Map<String, String> okMessage(String message) {
Map<String, String> ret = new HashMap<>();
ret.put("message", message);
return ret;
}
}
|
Try to run by typing mvn spring-boot:run
then open Postman like below.
URL: http://localhost:8080/greeting (POST)
Request Body
1
2
3
| {
"greeting": "Hello my name is Maverick"
}
|
And log will display like below.
1
2
| 2021-07-23 19:27:00.306 INFO 87879 --- [nio-8080-exec-4] c.p.k.service.KafkaProducerService : Sending message to Kafka...
2021-07-23 19:27:00.307 INFO 87879 --- [nio-8080-exec-4] c.p.k.service.KafkaProducerService : Payload: hello-world, Topic: Hello my name is Maverick
|
Consumer Configuration
For consuming messages, we need to configure a ConsumerFactory and a KafkaListenerContainerFactory. Once these beans are available in the Spring bean factory, POJO-based consumers can be configured using @KafkaListener annotation.
@EnableKafka
annotation is required on the configuration class to enable detection of @KafkaListener
annotation on spring-managed beans:
Create bean configuration com.piinalpin.kafkademo.config.KafkaConsumerConfiguration
to define consumer or listener configuration bean on Kafka.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| @Configuration
@EnableKafka
public class KafkaConsumerConfiguration {
@Value("${kafka.bootstrapAddress:}")
private String bootstrapAddress;
public ConsumerFactory<String, String> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(String groupId) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(groupId));
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> greeting() {
return kafkaListenerContainerFactory("greeting");
}
}
|
Consuming Messages
Create service com.piinalpin.kafkademo.service.KafkaConsumerService
as a listener to consume message from Kafka.
1
2
3
4
5
6
7
8
9
10
11
| @Service
public class KafkaConsumerService {
private final static Logger log = LoggerFactory.getLogger(KafkaConsumerService.class);
@KafkaListener(topics = KafkaTopicConstant.HELLO_WORLD, containerFactory = "greeting")
public void greeting(String payload) {
log.info(String.format("Received payload: %s", payload));
}
}
|
Try to run by typing mvn spring-boot:run
then open Postman like below.
URL: http://localhost:8080/greeting (POST)
Request Body
1
2
3
| {
"greeting": "Hello my name is Maverick"
}
|
And log will display like below.
1
2
3
4
5
| 2021-07-23 20:57:09.511 INFO 8569 --- [nio-8080-exec-2] c.p.k.service.KafkaPublisherService : Sending message to Kafka...
2021-07-23 20:57:09.511 INFO 8569 --- [nio-8080-exec-2] c.p.k.service.KafkaPublisherService : Payload: hello-world, Topic: Hello my name is Maverick
2021-07-23 20:57:09.565 INFO 8569 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: wlyOQ1ksRQ6eVAu6Q0JXYA
2021-07-23 20:57:09.798 INFO 8569 --- [ntainer#0-0-C-1] c.p.k.service.KafkaConsumerService : Received payload: Hello my name is Maverick
|
Clone or Download
You can clone or download this project
1
| https://github.com/piinalpin/kafka-demo.git
|
Thankyou
Baeldung - Intro to Apache Kafka with Spring
Medium - Apache Kafka CLI commands cheat sheet
Github - Spring Kafka
Stack Overflow - How to set groupId to null in @KafkaListeners
Programmer Sought - Springboot integrated kafka-No group.id found in consumer config