Before publishing an object to a Kafka topic, I must serialize the object first. Only serialized objects are publishable to Kafka topics.
I will show you two different ways to serialize an object before publishing it to a topic.
When I publish something to a Kafka topic, I must explicitly specify the serializer types for the key and values. In this case, the value is an object. Therefore, I must provide a proper serializer for the object I publish.
In this example, I publish a simple
Below is the Class on that my order details are based. Note that it is
import java.io.Serializable;
import java.math.BigDecimal;
public class OrderDetails implements Serializable {
String OrderNo;
BigDecimal total;
public String getOrderNo() {
return OrderNo;
}
public void setOrderNo(String orderNo) {
OrderNo = orderNo;
}
public BigDecimal getTotal() {
return total;
}
public void setTotal(BigDecimal total) {
this.total = total;
}
}
Here I use Kafka's
import org.apache.kafka.common.serialization.Serializer;
import org.springframework.util.SerializationUtils;
import java.io.Serializable;
import java.util.Map;
public class OrderSerializer T extends Serializable> implements Serializer T> {
@Override
public void configure(Map String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, T data) {
return SerializationUtils.serialize(data);
}
@Override
public void close() {
}
}
Now I can create a Kafka producer to publish my
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.math.BigDecimal;
import java.util.Properties;
public class Producer {
public static void main(String[] args) throws Exception{
String topic = "orders-topic";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "com.serializers.OrderSerializer");
org.apache.kafka.clients.producer.Producer String, OrderDetails> producer = new KafkaProducer String, OrderDetails>(props);
OrderDetails od = new OrderDetails();
od.setOrderNo("100200300");
od.setTotal(BigDecimal.valueOf(500));
producer.send(new ProducerRecord String, OrderDetails>(topic, od));
producer.close();
}
}
Here I use Java's
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.math.BigDecimal;
import java.util.Properties;
public class Producer {
public static void main(String[] args) {
Properties config = new Properties();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
config.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
KafkaProducer String, byte[]> producer = new KafkaProducer >(config);
OrderDetails od = new OrderDetails();
od.setOrderNo("100200300");
od.setTotal(BigDecimal.valueOf(500));
byte[] serializedOd = serializeObject(od);
ProducerRecord String, byte[]> message = new ProducerRecord >("orders-topic", serializedOd);
producer.send(message);
producer.close();
}
private static byte[] serializeObject(OrderDetails obj) {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
try {
ObjectOutputStream oos = new ObjectOutputStream(stream);
oos.writeObject(obj);
} catch (Exception e) {
e.printStackTrace();
}
return stream.toByteArray();
}
}