Introduction

Apache Kafka is a popular distributed streaming platform that can handle real-time data feeds with exceptional reliability. Kafka has become a central piece in the design of event-driven systems and stream processing applications, providing features like horizontal scalability, high-throughput, and fault tolerance. When combined with Spring Boot and Reactive Programming, Kafka’s capabilities can be harnessed in a non-blocking, scalable way.

In this article, we will explore how to implement a reactive Kafka application with Spring Boot using Project Reactor. We will cover setting up Kafka, using Spring Boot to create producers and consumers, and implementing reactive processing for handling high-throughput streams in a responsive and efficient manner.

Prerequisites

Before we start coding, ensure that you have the following:

  • Apache Kafka installed and running on your local machine or server.
  • Java 11 or higher installed.
  • Maven or Gradle for managing dependencies.

Setting Up a Spring Boot Project

To start, let’s set up a Spring Boot project with the required dependencies for Kafka and Reactive support.

Maven Dependencies

Create a new Spring Boot project and add the following dependencies to your pom.xml:

xml
<dependencies>
<!-- Spring Boot Starter for WebFlux for Reactive support -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!– Kafka Client for Java –>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency><!– Reactor Kafka for reactive Kafka support –>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
<version>1.3.14</version>
</dependency>
</dependencies>

The spring-boot-starter-webflux library provides WebFlux support for building reactive web applications, while reactor-kafka offers the necessary API for Kafka-based reactive programming.

Kafka Configuration in Spring Boot

Let’s configure Kafka in our Spring Boot application. Create an application.yml file and add the following properties:

yaml
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: reactive-consumer-group
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

This configuration specifies the Kafka server, producer, and consumer settings. Make sure the bootstrap-servers points to your Kafka instance. By default, Kafka runs on port 9092 on localhost.

Implementing Reactive Kafka Producers

In reactive applications, producers need to be capable of sending messages in a non-blocking manner. We’ll use the KafkaSender from reactor-kafka to send messages to a Kafka topic reactively.

Kafka Producer Configuration

We need to define a KafkaSender bean, which will handle producing messages to Kafka topics.

java
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import java.util.HashMap;
import java.util.Map;@Configuration
public class KafkaProducerConfig {@Bean
public KafkaSender<String, String> kafkaSender() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, “localhost:9092”);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);SenderOptions<String, String> senderOptions = SenderOptions.create(configProps);
return KafkaSender.create(senderOptions);
}
}

This configuration bean initializes a KafkaSender that uses the StringSerializer for both keys and values.

Sending Messages with KafkaSender

Now, we can create a service to send messages to Kafka.

java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderRecord;
@Service
public class ReactiveKafkaProducer {private final KafkaSender<String, String> kafkaSender;@Autowired
public ReactiveKafkaProducer(KafkaSender<String, String> kafkaSender) {
this.kafkaSender = kafkaSender;
}public Mono<Void> sendMessage(String topic, String message) {
return kafkaSender.send(Mono.just(SenderRecord.create(topic, null, null, null, message, null)))
.then();
}
}

In this example, we use the KafkaSender.send() method to send a Mono message to the specified Kafka topic.

Implementing Reactive Kafka Consumers

Reactive consumers can be created using KafkaReceiver, which allows consuming messages from Kafka topics reactively.

Kafka Consumer Configuration

Define a KafkaReceiver bean to configure the Kafka consumer.

java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;@Configuration
public class KafkaConsumerConfig {@Bean
public KafkaReceiver<String, String> kafkaReceiver() {
Map<String, Object> consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, “localhost:9092”);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, “reactive-consumer-group”);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);ReceiverOptions<String, String> receiverOptions = ReceiverOptions.<String, String>create(consumerProps)
.subscription(Collections.singleton(“topic-name”));return KafkaReceiver.create(receiverOptions);
}
}

This bean sets up a KafkaReceiver that listens to a specific topic using the provided group ID.

Consuming Messages with KafkaReceiver

Let’s create a service that processes incoming messages reactively.

java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverRecord;
@Service
public class ReactiveKafkaConsumer {private final KafkaReceiver<String, String> kafkaReceiver;@Autowired
public ReactiveKafkaConsumer(KafkaReceiver<String, String> kafkaReceiver) {
this.kafkaReceiver = kafkaReceiver;
}public Flux<String> receiveMessages() {
return kafkaReceiver.receive()
.map(ReceiverRecord::value)
.doOnNext(System.out::println);
}
}

In this service, we use KafkaReceiver.receive() to create a Flux that emits messages as they arrive. Each message is printed to the console.

Integrating Kafka with REST Controller

To demonstrate the reactive Kafka producer and consumer, let’s set up a REST controller.

java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RestController
@RequestMapping(“/api/kafka”)
public class KafkaController {private final ReactiveKafkaProducer producer;
private final ReactiveKafkaConsumer consumer;@Autowired
public KafkaController(ReactiveKafkaProducer producer, ReactiveKafkaConsumer consumer) {
this.producer = producer;
this.consumer = consumer;
}@PostMapping(“/send”)
public Mono<Void> sendMessage(@RequestParam String message) {
return producer.sendMessage(“topic-name”, message);
}@GetMapping(“/receive”)
public Flux<String> receiveMessages() {
return consumer.receiveMessages();
}
}

This controller provides two endpoints:

  • /api/kafka/send: Sends a message to Kafka.
  • /api/kafka/receive: Streams incoming messages from Kafka.

Testing the Reactive Kafka Application

To test, start Kafka and the Spring Boot application. You can then use tools like Postman or curl to send messages and receive streams of Kafka messages reactively.

Send a Message:

arduino
curl -X POST "http://localhost:8080/api/kafka/send?message=HelloReactiveKafka"

Receive Messages:

sql
curl -X GET "http://localhost:8080/api/kafka/receive"

You should see the messages printed in real-time on the console or terminal where the application is running.

Conclusion

In this article, we’ve built a reactive Kafka application using Spring Boot, Reactor Kafka, and WebFlux. This setup allows the application to handle real-time Kafka streams in a non-blocking way, making it ideal for high-throughput and latency-sensitive use cases. By leveraging reactive programming, the system can process multiple Kafka messages concurrently and effectively manage resources, providing resilience and scalability.

Reactive programming in Kafka applications helps to unlock the full potential of modern architectures by delivering fast, responsive, and scalable solutions. Whether you are building event-driven microservices or real-time analytics platforms, using Spring Boot with Reactor Kafka offers an elegant, high-performance solution for working with streaming data.