We will write a simple consumer to receive messages from a Kafka broker using Java. We will be using the High level APIs here. We will see another example for low level APIs later.
Summary of Steps
-
Setting up the environment
-
Defining Properties
-
Reading messages from a topic
-
Complete code
-
Executing and verifyinig the program
Setting up the environment
-
Install, configure and verify Kafka installation as in http://datastudy.club/en/blog/getting-started-with-kafka-cluster-example....
-
We will use eclipse IDE for this example.
-
Create a Java project.
-
Add all jars from libs folder to class path. If you are using an eclipse java project, right click and go to Build Path > Configure Build Path > Libraries > Add External Jars.
Create an empty class SimpleKafkaConsumerHL and add below import statements and see if code is compiling to verify if the libraries are added correctly.
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class SimpleKafkaConsumerHL {
}
Defining Properties
We need to define properties for making a connection with Zookeeper and pass these properties to the Kafka consumer:
Properties props = new Properties();
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "testgroup");
props.put("zookeeper.session.timeout.ms", "500");
props.put("zookeeper.sync.time.ms", "250");
props.put("auto.commit.interval.ms", "1000");
new ConsumerConfig(props);
Reading messages from a topic
We can read messages from the topic as:
Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicCount.put(topic, new Integer(1)); // 1 represents the single thread
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreamsMap = consumer.createMessageStreams(topicMap);
List<KafkaStream<byte[], byte[]>>streamList = consumerStreamsMap.get(topic); // Get the list of message streams for each topic, using the default decoder.
for (final KafkaStream <byte[], byte[]> stream : streamList) {
ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
while (consumerIte.hasNext())
System.out.println("Message from Single Topic :: "+ new String(consumerIte.next().message()));
}
Complete Code
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class SimpleKafkaConsumerHL {
private final ConsumerConnector consumer;
private final String topic;
public SimpleKafkaConsumerHL(String zooKeeperUrl, String groupId, String topic) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(zooKeeperUrl, groupId));
this.topic = topic;
}
private static ConsumerConfig createConsumerConfig(String zooKeeperUrl, String groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", zooKeeperUrl);
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "500");
props.put("zookeeper.sync.time.ms", "250");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
public void testConsumer() {
Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put(topic, new Integer(1)); // Define single thread for topic
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreamsMap = consumer.createMessageStreams(topicMap);
List<KafkaStream<byte[], byte[]>> streamList = consumerStreamsMap.get(topic);
for (final KafkaStream<byte[], byte[]> stream : streamList) {
ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
while (consumerIte.hasNext())
System.out.println("Message from Topic : " + new String(consumerIte.next().message()));
}
if (consumer != null)
consumer.shutdown();
}
public static void main(String[] args) {
String zooKeeperUrl = "localhost:2181";
String groupId = "testgroup";
String topic = "mykafkatopic";
SimpleKafkaConsumerHL simpleHLConsumer = new SimpleKafkaConsumerHL(zooKeeperUrl, groupId, topic);
simpleHLConsumer.testConsumer();
}
}
Executing and verifying
Before executing the program, make sure kafka server is running and a topic mykafkatopic is created as per steps in http://datastudy.club/en/blog/getting-started-with-kafka-cluster-example....
Once you execute the program, go to the console producer and type in a text (or use the SimpleKafkaProducer class we created in a previous example.)
You will now see the message sent from producer in the eclipse console.
Software Versions Tested Against
-
kafka_2.10-0.8.2.1, JDK 1.8
- heartin's blog
- Log in or register to post comments

Recent comments