How to consume object messages from a Kafka topic

Last updated : Jul 30, 2023 12:00 AM

Kafka consumers consume messages from Kafka topics. The article 2 ways to publish an Object to a Kafka topic explains how to publish an object to a topic. This article describes how to consume that object.

Serializable Kafka consumer object

Here is the object that my consumer consumes in this example.

Consuming objectDescription
import java.io.Serializable;
import java.math.BigDecimal;
public class OrderDetails implements Serializable {
   String OrderNo;
   BigDecimal total;
   public String getOrderNo() {
      return OrderNo;
   }
   public void setOrderNo(String orderNo) {
      OrderNo = orderNo;
   }
   public BigDecimal getTotal() {
      return total;
   }
   public void setTotal(BigDecimal total) {
      this.total = total;
   }
}

Kafka object de-serializer

To consume an Object, the consumer needs to have an Object Deserializer. The OrderDetails is the class being de-serialized.

object de-serializerDescription
import org.apache.kafka.common.serialization.Deserializer;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.Map;
public class OrderDeSerializer implements Deserializer<OrderDetails> {
private ObjectMapper objectMapper = new ObjectMapper();
   @Override
   public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public OrderDetails deserialize(String s, byte[] bytes) {
   try {
      ByteArrayInputStream in = new ByteArrayInputStream(bytes);
      ObjectInputStream is = new ObjectInputStream(in);
      return (OrderDetails)is.readObject();
   } catch (Exception e) {
      e.printStackTrace();
   }
   return null;
  }
}

Kafka consumer that consumes objects

Note that I refer to my DeSerializer in the value.deserializer property.

properties.put("value.deserializer", "com.core.json_objects.delete_later.OrderDeSerializer");
That's how the consumer knows what type of object to expect as the value.

ConsumerDescription
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class Consumer {
   public static void main(String[] args) {
      Properties properties = new Properties();
      properties.put("bootstrap.servers", "localhost:9092");
      properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      properties.put("value.deserializer", "com.core.json_objects.delete_later.OrderDeSerializer");
      properties.put("group.id", "test-group");

      KafkaConsumer consumer = new KafkaConsumer(properties);
      List topics = new ArrayList();
      topics.add("TEST-TOPIC");
      consumer.subscribe(topics);
      try{
         while (true) {
            final ConsumerRecords<Long, OrderDetails> consumerRecords = consumer.poll(1000);
            consumerRecords.forEach(record -> {
               System.out.println(record.value());
            });
         }
      }catch (Exception e){
         System.out.println(e.getMessage());
      }finally {
         consumer.close();
      }
   }
}
Lance

By: Lance

Hi, I'm Lance Raney, a dedicated Fullstack Developer based in Oklahoma with over 15 years of exp

Read more...