Spring Boot and Kafka – Practical Configuration Examples

Spring Boot and Kafka – Practical Configuration Examples

This blog post shows how to configure Spring Kafka and Spring Boot to send messages using JSON and receive them in multiple formats: JSON, plain Strings or byte arrays. Based on this configuration, you could also switch your Kafka producer from sending JSON to other serialization methods.

This sample application also demonstrates the usage of three Kafka consumers within the same consumer group, so the messages are load-balanced between the three. Each consumer implements a different deserialization approach.

Besides, at the end of this post, you will find some practical exercises in case you want to grasp some Kafka concepts like the Consumer Group and Topic partitions.

Multiple consumers in a consumer group

Logical View

To better understand the configuration, have a look at the diagram below. As you can see, we create a Kafka topic with three partitions. On the consumer side, there is only one application, but it implements three Kafka consumers with the same group.id property. This is the configuration needed for having them in the same Kafka Consumer Group.

Kafka deserialization examples

When we start the application, Kafka assigns each consumer a different partition. This consumer group will receive the messages in a load-balanced manner. Later in this post, you’ll see what is the difference if we make them have different group identifiers (you probably know the result if you are familiar with Kafka).

The Example Use Case

The logic we are going to build is simple. Each time we call a given REST endpoint, hello, the app will produce a configurable number of messages and send them to the same topic, using a sequence number as the Kafka key. It will wait (using a CountDownLatch) for all messages to be consumed before returning a message, Hello Kafka!. There will be three consumers, each using a different deserialization mechanism, that will decrement the latch count when they receive a new message.

Easy, right? Let’s see how to build it.

Setting up Spring Boot and Kafka

All the code in this post is available on GitHub: Kafka and Spring Boot Example. If you find it useful, please give it a star!

Starting up Kafka

First, you need to have a running Kafka cluster to connect to. For this application, I will use docker-compose and Kafka running in a single node. This is clearly far from being a production configuration, but it is good enough for the goal of this post.

docker-compose.yml
version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
    - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
    - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'

Note that I configured Kafka to not create topics automatically. We will create our topic from the Spring Boot application since we want to pass some custom configuration anyway. If you want to play around with these Docker images (e.g. to use multiple nodes), have a look at the wurstmeister/zookeeper image docs.

To start up Kafka and Zookeeper containers, just run docker-compose up from the folder where this file lives.

Get the book Practical Software Architecture

Basic Spring Boot and Kafka application

Spring Initializer Kafka

The easiest way to get a skeleton for our app is to navigate to start.spring.io, fill in the basic details for our project and select Kafka as a dependency. Then, download the zip file and use your favorite IDE to load the sources.

Let’s use YAML for our configuration. You may need to rename the application.properties file inside src/main/java/resources to application.yml. These are the configuration values we are going to use for this sample application:

spring:
  kafka:
    consumer:
      group-id: tpd-loggers
      auto-offset-reset: earliest
    # change this property if you are using your own
    # Kafka cluster or your Docker IP is different
    bootstrap-servers: localhost:9092

tpd:
  topic-name: advice-topic
  messages-per-request: 10

The first block of properties is Spring Kafka configuration:

  • The group-id that will be used by default by our consumers.
  • The auto-offset-reset property is set to earliest, which means that the consumers will start reading messages from the earliest one available when there is no existing offset for that consumer.
  • The server to use to connect to Kafka, in this case, the only one available if you use the single-node configuration. Note that this property is redundant if you use the default value, localhost:9092.

The second block is application-specific. We define the Kafka topic name and the number of messages to send every time we do an HTTP REST request.

The Message class

This is the Java class that we will use as Kafka message. Nothing complex here, just an immutable class with @JsonProperty annotations in the constructor parameters so Jackson can deserialize it properly.

PracticalAdvice class
package io.tpd.kafkaexample;

import com.fasterxml.jackson.annotation.JsonProperty;

public class PracticalAdvice {
    private final String message;
    private final int identifier;

    public PracticalAdvice(@JsonProperty("message") final String message,
                           @JsonProperty("identifier") final int identifier) {
        this.message = message;
        this.identifier = identifier;
    }

    public String getMessage() {
        return message;
    }

    public int getIdentifier() {
        return identifier;
    }

    @Override
    public String toString() {
        return "PracticalAdvice::toString() {" +
                "message='" + message + '\'' +
                ", identifier=" + identifier +
                '}';
    }
}

Kafka Producer configuration in Spring Boot

To keep the application simple, we will add the configuration in the main Spring Boot class. Eventually, we want to include here both producer and consumer configuration, and use three different variations for deserialization. Remember that you can find the complete source code in the GitHub repository.

First, let’s focus on the Producer configuration.

Spring Boot Kafka Producer
@SpringBootApplication
public class KafkaExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaExampleApplication.class, args);
    }

    @Autowired
    private KafkaProperties kafkaProperties;

    @Value("${tpd.topic-name}")
    private String topicName;

    // Producer configuration

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props =
                new HashMap<>(kafkaProperties.buildProducerProperties());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                JsonSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public NewTopic adviceTopic() {
        return new NewTopic(topicName, 3, (short) 1);
    }
}

In this configuration, we are setting up two parts of the application:

  • The KafkaTemplate instance, which is the object we will use to send messages to Kafka. We don't want to use the default one so we need to inject our custom version in the Spring's application context.
    • We type (with generics) the KafkaTemplate to have a plain String key, and an Object as value. The reason to have Object as a value is that we want to send multiple object types with the same template. The KafkaTemplate accepts as a parameter a ProducerFactory that we also create in our configuration.
    • The ProducerFactory we use is the default one, but we need to explicitly configure here since we want to pass it our custom producer configuration.
    • The Producer Configuration is a simple key-value map. We inject the default properties using @Autowired to obtain the KafkaProperties bean and then we build our map passing the default values for the producer, and overriding the default Kafka key and value serializers. The producer will serialize keys as Strings using the Kafka library's StringSerializer and will do the same for values but this time using JSON, with a JsonSerializer, in this case provided by Spring Kafka.
  • The Kafka topic we're going to use. By injecting a NewTopic instance, we're instructing the Kafka's AdminClient bean (already in the context) to create a topic with the given configuration. The first parameter is the name (advice-topic, from the app configuration), the second is the number of partitions (3) and the third one is the replication factor (one, since we're using a single node anyway).

About Kafka Serializers and Deserializers for Java

There are a few basic Serializers available in the core Kafka library (javadoc) for Strings, all kind of number classes and byte arrays, plus the JSON ones provided by Spring Kafka (javadoc).

On top of that, you can create your own Serializers and Deserializers just by implementing Serializer or ExtendedSerializer, or their corresponding versions for deserialization. That gives you a lot of flexibility to optimize the amount of data traveling through Kafka, in case you need to do so. As you can see in those interfaces, Kafka works with plain byte arrays so, eventually, no matter what complex type you’re working with, it needs to be transformed to a byte[].

Knowing that, you may wonder why someone would want to use JSON with Kafka. It’s quite inefficient since you’re transforming your objects to JSON and then to a byte array. But you have to consider two main advantages of doing this:

  • JSON is more readable by a human than an array of bytes. If you want to debug or analyze the contents of your Kafka topics, it's going to be way simpler than looking at bare bytes.
  • JSON is a standard, whereas default byte array serializers depend on the programming language implementation. Thus, if you want to consume messages from multiple programming languages, you would need to replicate the (de)serializer logic in all those languages.

On the other hand, if you are concerned about the traffic load in Kafka, storage, or speed in (de)serialization, you may want to choose byte arrays and even go for your own serializer/deserializer implementation.

Sending messages with Spring Boot and Kafka

Following the plan, we create a Rest Controller and use the injected KafkaTemplate to produce some JSON messages when the endpoint is requested.

This is the first implementation of the controller, containing only the logic producing the messages.

HelloKafkaController
@RestController
public class HelloKafkaController {

    private static final Logger logger =
            LoggerFactory.getLogger(HelloKafkaController.class);

    private final KafkaTemplate<String, Object> template;
    private final String topicName;
    private final int messagesPerRequest;
    private CountDownLatch latch;

    public HelloKafkaController(
            final KafkaTemplate<String, Object> template,
            @Value("${tpd.topic-name}") final String topicName,
            @Value("${tpd.messages-per-request}") final int messagesPerRequest) {
        this.template = template;
        this.topicName = topicName;
        this.messagesPerRequest = messagesPerRequest;
    }

    @GetMapping("/hello")
    public String hello() throws Exception {
        latch = new CountDownLatch(messagesPerRequest);
        IntStream.range(0, messagesPerRequest)
                .forEach(i -> this.template.send(topicName, String.valueOf(i),
                        new PracticalAdvice("A Practical Advice", i))
                );
        latch.await(60, TimeUnit.SECONDS);
        logger.info("All messages received");
        return "Hello Kafka!";
    }
}

In the constructor, we pass some configuration parameters and the KafkaTemplate that we customized to send String keys and JSON values. Then, when the API client requests the /hello endpoint, we send 10 messages (that’s the configuration value) and then we block the thread for a maximum of 60 seconds. As you can see, there is no implementation yet for the Kafka consumers to decrease the latch count. After the latch gets unlocked, we return the message Hello Kafka! to our client.

This entire lock idea is not a pattern that would see in a real application, but it’s good for the sake of this example. That way, you can check the number of messages received. If you prefer, you can remove the latch and return the “Hello Kafka!” message before receiving the messages.

Kafka Consumer configuration

As mentioned previously on this post, we want to demonstrate different ways of deserialization with Spring Boot and Spring Kafka and, at the same time, see how multiple consumers can work in a load-balanced manner when they are part of the same consumer-group.

Spring Boot Kafka configuration - Consumer
@SpringBootApplication
public class KafkaExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaExampleApplication.class, args);
    }

    @Autowired
    private KafkaProperties kafkaProperties;

    @Value("${tpd.topic-name}")
    private String topicName;

    // Producer configuration
    // omitted...

    // Consumer configuration

    // If you only need one kind of deserialization, you only need to set the
    // Consumer configuration properties. Uncomment this and remove all others below.
//    @Bean
//    public Map<String, Object> consumerConfigs() {
//        Map<String, Object> props = new HashMap<>(
//                kafkaProperties.buildConsumerProperties()
//        );
//        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
//                StringDeserializer.class);
//        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
//                JsonDeserializer.class);
//        props.put(ConsumerConfig.GROUP_ID_CONFIG,
//                "tpd-loggers");
//
//        return props;
//    }

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        final JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>();
        jsonDeserializer.addTrustedPackages("*");
        return new DefaultKafkaConsumerFactory<>(
                kafkaProperties.buildConsumerProperties(), new StringDeserializer(), jsonDeserializer
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

    // String Consumer Configuration

    @Bean
    public ConsumerFactory<String, String> stringConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(
                kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new StringDeserializer()
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerStringContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(stringConsumerFactory());

        return factory;
    }

    // Byte Array Consumer Configuration

    @Bean
    public ConsumerFactory<String, byte[]> byteArrayConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(
                kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new ByteArrayDeserializer()
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, byte[]> kafkaListenerByteArrayContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, byte[]> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(byteArrayConsumerFactory());
        return factory;
    }
}

This configuration may look extense but take into account that, to demonstrate these three types of deserialization, we have repeated three times the creation of the ConsumerFactory and the KafkaListenerContainerFactory instances so we can switch between them in our consumers.

The basic steps to configure a consumer are:

  1. [Omitted] Set up the Consumer properties in a similar way as we did for the Producer. We can skip this step since the only configuration we need is the Group ID, specified in the Spring Boot properties file, and the key and value deserializers, which we will override while creating the customized consumer and KafkaListener factories. If you only need one configuration, meaning always the same type of Key and Value deserializers, this commented code block is the only thing you need. Adjust the deserializer types to the ones you want to use.
  2. Create the ConsumerFactory to be used by the KafkaListenerContainerFactory. We create three, switching the value deserializer in each case to 1) a JSON deserializer, 2) a String deserializer and 3) a Byte Array deserializer.
    1. Note that, after creating the JSON Deserializer, we're including an extra step to specify that we trust all packages. You can fine-tune this in your application if you want. If we don't do this, we will get an error message saying something like: java.lang.IllegalArgumentException: The class [] is not in the trusted packages.
  3. Construct the Kafka Listener container factory (a concurrent one) using the previously configured Consumer Factory. Again, we do this three times to use a different one per instance.

Receiving messages with Spring Boot and Kafka in JSON, String and byte[] formats

It’s time to show how the Kafka consumers look like. We will use the @KafkaListener annotation since it simplifies the process and takes care of the deserialization to the passed Java type.

Kafka listeners
@RestController
public class HelloKafkaController {

    private static final Logger logger =
            LoggerFactory.getLogger(HelloKafkaController.class);

    private final KafkaTemplate<String, Object> template;
    private final String topicName;
    private final int messagesPerRequest;
    private CountDownLatch latch;

    public HelloKafkaController(
            final KafkaTemplate<String, Object> template,
            @Value("${tpd.topic-name}") final String topicName,
            @Value("${tpd.messages-per-request}") final int messagesPerRequest) {
        this.template = template;
        this.topicName = topicName;
        this.messagesPerRequest = messagesPerRequest;
    }

    @GetMapping("/hello")
    public String hello() throws Exception {
        latch = new CountDownLatch(messagesPerRequest);
        IntStream.range(0, messagesPerRequest)
                .forEach(i -> this.template.send(topicName, String.valueOf(i),
                        new PracticalAdvice("A Practical Advice", i))
                );
        latch.await(60, TimeUnit.SECONDS);
        logger.info("All messages received");
        return "Hello Kafka!";
    }

    @KafkaListener(topics = "advice-topic", clientIdPrefix = "json",
            containerFactory = "kafkaListenerContainerFactory")
    public void listenAsObject(ConsumerRecord<String, PracticalAdvice> cr,
                               @Payload PracticalAdvice payload) {
        logger.info("Logger 1 [JSON] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
                typeIdHeader(cr.headers()), payload, cr.toString());
        latch.countDown();
    }

    @KafkaListener(topics = "advice-topic", clientIdPrefix = "string",
            containerFactory = "kafkaListenerStringContainerFactory")
    public void listenasString(ConsumerRecord<String, String> cr,
                               @Payload String payload) {
        logger.info("Logger 2 [String] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
                typeIdHeader(cr.headers()), payload, cr.toString());
        latch.countDown();
    }

    @KafkaListener(topics = "advice-topic", clientIdPrefix = "bytearray",
            containerFactory = "kafkaListenerByteArrayContainerFactory")
    public void listenAsByteArray(ConsumerRecord<String, byte[]> cr,
                                  @Payload byte[] payload) {
        logger.info("Logger 3 [ByteArray] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
                typeIdHeader(cr.headers()), payload, cr.toString());
        latch.countDown();
    }

    private static String typeIdHeader(Headers headers) {
        return StreamSupport.stream(headers.spliterator(), false)
                .filter(header -> header.key().equals("__TypeId__"))
                .findFirst().map(header -> new String(header.value())).orElse("N/A");
    }
}

Get the book Practical Software Architecture

There are three listeners in this class. First, let’s describe the @KafkaListener annotation’s parameters:

  • All listeners are consuming from the same topic, advice-topic. This parameter is mandatory.
  • The parameter clientIdPrefix is optional. I'm using it here so the logs are more human-friendly. You will know which consumer does what by its name prefix. Kafka will append a number this prefix.
  • The containerFactory parameter is optional, you can also rely on naming convention. If you don't specify it, it will look for a bean with the name kafkaListenerContainerFactory, which is also the default name used by Spring Boot when autoconfiguring Kafka. You can also override it by using the same name (although it looks like magic for someone who doesn't know about the convention). We need to set it explicitly because we want to use a different one for each listener, to be able to use different deserializers.

Note that the first argument passed to all listeners is the same, a ConsumerRecord. The second one, annotated with @Payload is redundant if we use the first. We can access the payload using the method value() in ConsumerRecord, but I included it so you see how simple it’s to get directly the message payload by inferred deserialization.

The TypeId Header in Kafka

The __TypeId__ header is automatically set by the Kafka library by default. The utility method typeIdHeader that I use here is just to get the string representation since you will only see a byte array in the output of ConsumerRecord’s toString() method. This TypeId header can be useful for deserialization, so you can find the type to map the data to. It’s not needed for JSON deserialization because that specific deserializer is made by the Spring team and they infer the type from the method’s argument.

Running the application

All the code in this post is available on GitHub: Kafka and Spring Boot Example. If you find it useful, please give it a star!

Now that we finished the Kafka producer and consumers, we can run Kafka and the Spring Boot app:

$ docker-compose up -d
Starting kafka-example_zookeeper_1 ... done
Starting kafka-example_kafka_1     ... done
$ mvn spring-boot:run
...

The Spring Boot app starts and the consumers are registered in Kafka, which assigns a partition to them. We configured the topic with three partitions, so each consumer gets one of them assigned.

Output - Kafka Topic partitions
[Consumer clientId=string-0, groupId=tpd-loggers] Successfully joined group with generation 28
[Consumer clientId=string-0, groupId=tpd-loggers] Setting newly assigned partitions [advice-topic-2]
[Consumer clientId=bytearray-0, groupId=tpd-loggers] Successfully joined group with generation 28
[Consumer clientId=bytearray-0, groupId=tpd-loggers] Setting newly assigned partitions [advice-topic-0]
[Consumer clientId=json-0, groupId=tpd-loggers] Successfully joined group with generation 28
[Consumer clientId=json-0, groupId=tpd-loggers] Setting newly assigned partitions [advice-topic-1]
partitions assigned: [advice-topic-1]
partitions assigned: [advice-topic-2]
partitions assigned: [advice-topic-0]

We can try now  an HTTP call to the service. You can use your browser or curl, for example:

Invoking the endpoint
$ curl localhost:8080/hello

The output in the logs should look like this:

Kafka Listeners output
INFO 15292 --- [ntainer#1-0-C-1] i.tpd.kafkaexample.HelloKafkaController  : Logger 2 [String] received key 0: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":0} | Record: ConsumerRecord(topic = advice-topic, partition = 2, offset = 44, CreateTime = 1542911788418, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 0, value = {"message":"A Practical Advice","identifier":0})
INFO 15292 --- [ntainer#2-0-C-1] i.tpd.kafkaexample.HelloKafkaController  : Logger 3 [ByteArray] received key 1: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 49, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 44, CreateTime = 1542911788422, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 1, value = [[email protected])
INFO 15292 --- [ntainer#1-0-C-1] i.tpd.kafkaexample.HelloKafkaController  : Logger 2 [String] received key 2: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":2} | Record: ConsumerRecord(topic = advice-topic, partition = 2, offset = 45, CreateTime = 1542911788422, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 2, value = {"message":"A Practical Advice","identifier":2})
INFO 15292 --- [ntainer#2-0-C-1] i.tpd.kafkaexample.HelloKafkaController  : Logger 3 [ByteArray] received key 5: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 53, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 45, CreateTime = 1542911788422, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 5, value = [[email protected])
INFO 15292 --- [ntainer#1-0-C-1] i.tpd.kafkaexample.HelloKafkaController  : Logger 2 [String] received key 3: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":3} | Record: ConsumerRecord(topic = advice-topic, partition = 2, offset = 46, CreateTime = 1542911788422, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 3, value = {"message":"A Practical Advice","identifier":3})
INFO 15292 --- [ntainer#2-0-C-1] i.tpd.kafkaexample.HelloKafkaController  : Logger 3 [ByteArray] received key 7: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 55, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 46, CreateTime = 1542911788423, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 7, value = [[email protected])
INFO 15292 --- [ntainer#1-0-C-1] i.tpd.kafkaexample.HelloKafkaController  : Logger 2 [String] received key 9: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":9} | Record: ConsumerRecord(topic = advice-topic, partition = 2, offset = 47, CreateTime = 1542911788423, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 9, value = {"message":"A Practical Advice","identifier":9})
INFO 15292 --- [ntainer#2-0-C-1] i.tpd.kafkaexample.HelloKafkaController  : Logger 3 [ByteArray] received key 8: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 56, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 47, CreateTime = 1542911788423, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 8, value = [[email protected])
INFO 15292 --- [ntainer#0-0-C-1] i.tpd.kafkaexample.HelloKafkaController  : Logger 1 [JSON] received key 4: Type [N/A] | Payload: PracticalAdvice::toString() {message='A Practical Advice', identifier=4} | Record: ConsumerRecord(topic = advice-topic, partition = 1, offset = 22, CreateTime = 1542911788422, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 4, value = PracticalAdvice::toString() {message='A Practical Advice', identifier=4})
INFO 15292 --- [ntainer#0-0-C-1] i.tpd.kafkaexample.HelloKafkaController  : Logger 1 [JSON] received key 6: Type [N/A] | Payload: PracticalAdvice::toString() {message='A Practical Advice', identifier=6} | Record: ConsumerRecord(topic = advice-topic, partition = 1, offset = 23, CreateTime = 1542911788422, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 6, value = PracticalAdvice::toString() {message='A Practical Advice', identifier=6})
INFO 15292 --- [nio-8080-exec-1] i.tpd.kafkaexample.HelloKafkaController  : All messages received

Explanation

Kafka is hashing the message key (a simple string identifier) and, based on that, placing messages into different partitions. Each consumer gets the messages in its assigned partition and uses its deserializer to convert it to a Java object. Remember, our producer always sends JSON values.

As you can see in the logs, each deserializer manages to do its task so the String consumer prints the raw JSON message, the Byte Array shows the byte representation of that JSON String, and the JSON deserializer is using the Java Type Mapper to convert it to the original class, PracticalAdvice. You can have a look at the logged ConsumerRecord and you’ll see the headers, the assigned partition, the offset, etc.

And that’s how you can Send and Receive JSON messages with Spring Boot and Kafka. I hope that you found this guide useful, below you have some code variations so you can explore a bit more how Kafka works.

Should you have any feedback, let me know via Twitter or comments.

Try some Kafka Exercises

If you are new to Kafka, you may want to try some code changes to better understand how Kafka works.

Request /hello multiple times

Make a few requests and then look at how the messages are distributed across partitions. Kafka messages with the same key are always placed in the same partitions. This feature is very useful when you want to make sure that all messages for a given user, or process, or whatever logic you’re working on, are received by the same consumer in the same order as they were produced, no matter how much load balancing you’re doing.

Get the book Practical Software Architecture

Reduce the number of partitions

Kafka - more consumers in a group than partitions

First, make sure to restart Kafka so you just discard the previous configuration.

Then, redefine the topic in the application to have only 2 partitions:

Redefine Kafka Topic
@Bean
public NewTopic adviceTopic() {
    return new NewTopic(topicName, 2, (short) 1);
}

Now, run the app again and do a request to the /hello endpoint.

Kafka consumer without partition
Logger 3 [ByteArray] received key 0: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 48, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 0, CreateTime = 1542952988174, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 0, value = [[email protected])
Logger 3 [ByteArray] received key 2: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 50, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 1, CreateTime = 1542952988177, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 2, value = [[email protected])
Logger 3 [ByteArray] received key 5: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 53, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 2, CreateTime = 1542952988178, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 5, value = [[email protected])
Logger 3 [ByteArray] received key 6: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 54, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 3, CreateTime = 1542952988178, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 6, value = [[email protected])
Logger 1 [JSON] received key 1: Type [N/A] | Payload: PracticalAdvice::toString() {message='A Practical Advice', identifier=1} | Record: ConsumerRecord(topic = advice-topic, partition = 1, offset = 0, CreateTime = 1542952988177, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1, value = PracticalAdvice::toString() {message='A Practical Advice', identifier=1})
Logger 1 [JSON] received key 3: Type [N/A] | Payload: PracticalAdvice::toString() {message='A Practical Advice', identifier=3} | Record: ConsumerRecord(topic = advice-topic, partition = 1, offset = 1, CreateTime = 1542952988178, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 3, value = PracticalAdvice::toString() {message='A Practical Advice', identifier=3})
Logger 1 [JSON] received key 4: Type [N/A] | Payload: PracticalAdvice::toString() {message='A Practical Advice', identifier=4} | Record: ConsumerRecord(topic = advice-topic, partition = 1, offset = 2, CreateTime = 1542952988178, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 4, value = PracticalAdvice::toString() {message='A Practical Advice', identifier=4})
Logger 1 [JSON] received key 7: Type [N/A] | Payload: PracticalAdvice::toString() {message='A Practical Advice', identifier=7} | Record: ConsumerRecord(topic = advice-topic, partition = 1, offset = 3, CreateTime = 1542952988178, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 7, value = PracticalAdvice::toString() {message='A Practical Advice', identifier=7})
Logger 1 [JSON] received key 8: Type [N/A] | Payload: PracticalAdvice::toString() {message='A Practical Advice', identifier=8} | Record: ConsumerRecord(topic = advice-topic, partition = 1, offset = 4, CreateTime = 1542952988178, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 8, value = PracticalAdvice::toString() {message='A Practical Advice', identifier=8})
Logger 1 [JSON] received key 9: Type [N/A] | Payload: PracticalAdvice::toString() {message='A Practical Advice', identifier=9} | Record: ConsumerRecord(topic = advice-topic, partition = 1, offset = 5, CreateTime = 1542952988178, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 9, value = PracticalAdvice::toString() {message='A Practical Advice', identifier=9})
All messages received

One of the consumers is not receiving any messages. This is the expected behavior since there are no more partitions available for it within the same consumer group.

Change one Consumer’s Group Identifier

Kafka - Two consumer groups

Keep the changes from the previous case, the topic has now only 2 partitions. We are now changing the group id of one of our consumers, so it’s working independently.

Changing Kafka group id
@KafkaListener(topics = "advice-topic", clientIdPrefix = "bytearray",
        containerFactory = "kafkaListenerByteArrayContainerFactory",
        groupId = "tpd-loggers-2")
public void listenAsByteArray(ConsumerRecord<String, byte[]> cr,
                              @Payload byte[] payload) {
    logger.info("Logger 3 [ByteArray] received a payload with size {}", payload.length);
    latch.countDown();
}

Note that we also changed the logged message. Now, this consumer is in charge of printing the size of the payload, not the payload itself. Also, we need to change the CountDownLatch so it expects twice the number of messages.

Latch sets to twice the number
latch = new CountDownLatch(messagesPerRequest * 2);

Why? This time, let’s explain what is going to happen before running the app. As I described at the beginning of this post, when consumers belong to the same Consumer Group they’re (conceptually) working on the same task. We’re implementing a load-balanced mechanism in which concurrent workers get messages from different partitions without needing to process each other’s messages.

In this example, I also changed the “task” of the last consumer to better understand this: it’s printing something different. Since we changed the group id, this consumer will work independently and Kafka will assign both partitions to it. The Byte Array consumer will receive all messages, working separately from the other two.

Two Kafka Consumers
Logger 3 [ByteArray] received a payload with size 47
Logger 3 [ByteArray] received a payload with size 47
Logger 3 [ByteArray] received a payload with size 47
Logger 3 [ByteArray] received a payload with size 47
Logger 2 [String] received key 1: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":1} | Record: ConsumerRecord(topic = advice-topic, partition = 1, offset = 12, CreateTime = 1542954145932, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 1, value = {"message":"A Practical Advice","identifier":1})
Logger 2 [String] received key 3: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":3} | Record: ConsumerRecord(topic = advice-topic, partition = 1, offset = 13, CreateTime = 1542954145933, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 3, value = {"message":"A Practical Advice","identifier":3})
Logger 2 [String] received key 4: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":4} | Record: ConsumerRecord(topic = advice-topic, partition = 1, offset = 14, CreateTime = 1542954145933, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 4, value = {"message":"A Practical Advice","identifier":4})
Logger 2 [String] received key 7: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":7} | Record: ConsumerRecord(topic = advice-topic, partition = 1, offset = 15, CreateTime = 1542954145933, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 7, value = {"message":"A Practical Advice","identifier":7})
Logger 2 [String] received key 8: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":8} | Record: ConsumerRecord(topic = advice-topic, partition = 1, offset = 16, CreateTime = 1542954145933, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 8, value = {"message":"A Practical Advice","identifier":8})
Logger 3 [ByteArray] received a payload with size 47
Logger 2 [String] received key 9: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":9} | Record: ConsumerRecord(topic = advice-topic, partition = 1, offset = 17, CreateTime = 1542954145934, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 9, value = {"message":"A Practical Advice","identifier":9})
Logger 3 [ByteArray] received a payload with size 47
Logger 3 [ByteArray] received a payload with size 47
Logger 3 [ByteArray] received a payload with size 47
Logger 3 [ByteArray] received a payload with size 47
Logger 3 [ByteArray] received a payload with size 47
Logger 1 [JSON] received key 0: Type [N/A] | Payload: PracticalAdvice::toString() {message='A Practical Advice', identifier=0} | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 8, CreateTime = 1542954145929, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 0, value = PracticalAdvice::toString() {message='A Practical Advice', identifier=0})
Logger 1 [JSON] received key 2: Type [N/A] | Payload: PracticalAdvice::toString() {message='A Practical Advice', identifier=2} | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 9, CreateTime = 1542954145933, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 2, value = PracticalAdvice::toString() {message='A Practical Advice', identifier=2})
Logger 1 [JSON] received key 5: Type [N/A] | Payload: PracticalAdvice::toString() {message='A Practical Advice', identifier=5} | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 10, CreateTime = 1542954145933, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 5, value = PracticalAdvice::toString() {message='A Practical Advice', identifier=5})
Logger 1 [JSON] received key 6: Type [N/A] | Payload: PracticalAdvice::toString() {message='A Practical Advice', identifier=6} | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 11, CreateTime = 1542954145933, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 6, value = PracticalAdvice::toString() {message='A Practical Advice', identifier=6})
All messages received

With these exercises, and changing parameters here and there, I think you can grasp better the concepts. Remember: if you liked this post please share it or comment on Twitter.

Moisés Macero's Picture

About Moisés Macero

Software Developer, Architect, and Author.
Do you need help?

Amsterdam, The Netherlands https://thepracticaldeveloper.com