Kafka consumers consume messages from Kafka topics. The article 2 ways to publish an Object to a Kafka topic explains how to publish an object to a topic. This article describes how to consume that object.
Here is the object that my consumer consumes in this example.
import java.io.Serializable;
import java.math.BigDecimal;
public class OrderDetails implements Serializable {
String OrderNo;
BigDecimal total;
public String getOrderNo() {
return OrderNo;
}
public void setOrderNo(String orderNo) {
OrderNo = orderNo;
}
public BigDecimal getTotal() {
return total;
}
public void setTotal(BigDecimal total) {
this.total = total;
}
}
To consume an Object, the consumer needs to have an Object Deserializer. The
import org.apache.kafka.common.serialization.Deserializer;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.Map;
public class OrderDeSerializer implements Deserializer<OrderDetails> {
private ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public OrderDetails deserialize(String s, byte[] bytes) {
try {
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
ObjectInputStream is = new ObjectInputStream(in);
return (OrderDetails)is.readObject();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
Note that I refer to my DeSerializer in the
properties.put("value.deserializer", "com.core.json_objects.delete_later.OrderDeSerializer");
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class Consumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "com.core.json_objects.delete_later.OrderDeSerializer");
properties.put("group.id", "test-group");
KafkaConsumer consumer = new KafkaConsumer(properties);
List topics = new ArrayList();
topics.add("TEST-TOPIC");
consumer.subscribe(topics);
try{
while (true) {
final ConsumerRecords<Long, OrderDetails> consumerRecords = consumer.poll(1000);
consumerRecords.forEach(record -> {
System.out.println(record.value());
});
}
}catch (Exception e){
System.out.println(e.getMessage());
}finally {
consumer.close();
}
}
}