2 ways to publish an Object to a Kafka topic

Last updated : Jul 30, 2023 12:00 AM

Before publishing an object to a Kafka topic, I must serialize the object first. Only serialized objects are publishable to Kafka topics.

I will show you two different ways to serialize an object before publishing it to a topic.

  1. Serialize the object using Apache Kafka's Serializer interface
  2. Serialize the object by converting the object to a byte array

When I publish something to a Kafka topic, I must explicitly specify the serializer types for the key and values. In this case, the value is an object. Therefore, I must provide a proper serializer for the object I publish.

In this example, I publish a simple OrderDetails object to the Kafka topic orders-topic.

Using Apache Kafka's Serializer interface.

Below is the Class on that my order details are based. Note that it is Serializable.

Serializable objectDescription
import java.io.Serializable;
import java.math.BigDecimal;

public class OrderDetails implements Serializable {
   String OrderNo;
   BigDecimal total;
   public String getOrderNo() {
      return OrderNo;
   }
   public void setOrderNo(String orderNo) {
      OrderNo = orderNo;
   }
   public BigDecimal getTotal() {
      return total;
   }
   public void setTotal(BigDecimal total) {
      this.total = total;
   }
}

Here I use Kafka's Serializer interface to serialize my object.

Serializer interfaceDescription
import org.apache.kafka.common.serialization.Serializer;
import org.springframework.util.SerializationUtils;
import java.io.Serializable;
import java.util.Map;

public class OrderSerializer T extends Serializable> implements Serializer T> {
   @Override
   public void configure(Map String, ?> configs, boolean isKey) {
   }
   @Override
   public byte[] serialize(String topic, T data) {
      return SerializationUtils.serialize(data);
   }
   @Override
   public void close() {
   }
}

Now I can create a Kafka producer to publish my OrderDetails objects with OrderSerializer as the value.serializer.

ProducerDescription
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.math.BigDecimal;
import java.util.Properties;

public class Producer {
   public static void main(String[] args) throws Exception{
      String topic = "orders-topic";
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer", "com.serializers.OrderSerializer");

      org.apache.kafka.clients.producer.Producer String, OrderDetails> producer = new KafkaProducer String, OrderDetails>(props);

      OrderDetails od = new OrderDetails();
      od.setOrderNo("100200300");
      od.setTotal(BigDecimal.valueOf(500));

      producer.send(new ProducerRecord String, OrderDetails>(topic, od));
      producer.close();
   }
}

Converting the object to a byte array and publishing to a topic.

Here I use Java's ByteArrayOutputStream and ObjectOutputStream to serialize my OrderDetails object. But note that my value serializer is org.apache.kafka.common.serialization.ByteArraySerializer.

Producer with serializerDescription
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.math.BigDecimal;
import java.util.Properties;

public class Producer {
public static void main(String[] args) {

   Properties config = new Properties();
   config.put("bootstrap.servers", "localhost:9092");
   config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
   config.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

   KafkaProducer String, byte[]> producer = new KafkaProducer >(config);

   OrderDetails od = new OrderDetails();
   od.setOrderNo("100200300");
   od.setTotal(BigDecimal.valueOf(500));
   byte[] serializedOd = serializeObject(od);

   ProducerRecord String, byte[]> message = new ProducerRecord >("orders-topic", serializedOd);
   producer.send(message);
   producer.close();
}

private static byte[] serializeObject(OrderDetails obj) {
   ByteArrayOutputStream stream = new ByteArrayOutputStream();
   try {
      ObjectOutputStream oos = new ObjectOutputStream(stream);
      oos.writeObject(obj);
   } catch (Exception e) {
      e.printStackTrace();
   }
   return stream.toByteArray();
}
}
Lance

By: Lance

Hi, I'm Lance Raney, a dedicated Fullstack Developer based in Oklahoma with over 15 years of exp

Read more...