Writing a Simple Kafka Consumer using Java Example

We will write a simple consumer to receive messages from a Kafka broker using Java. We will be using the High level APIs here. We will see another example for low level APIs later.

 

Summary of Steps

  1. Setting up the environment

  2. Defining Properties

  3. Reading messages from a topic

  4. Complete code

  5. Executing and verifyinig the program

 

 

Setting up the environment

  1. Install, configure and verify Kafka installation as in http://datastudy.club/en/blog/getting-started-with-kafka-cluster-example....

  2. We will use eclipse IDE for this example.

  3. Create a Java project.

  4. Add all jars from libs folder to class path. If you are using an eclipse java project, right click and go to Build Path > Configure Build Path > Libraries > Add External Jars.

 

Create an empty class SimpleKafkaConsumerHL and add below import statements and see if code is compiling to verify if the libraries are added correctly.

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

 

public class SimpleKafkaConsumerHL {

}

 

Defining Properties

We need to define properties for making a connection with Zookeeper and pass these properties to the Kafka consumer:

Properties props = new Properties();

props.put("zookeeper.connect", "localhost:2181");

props.put("group.id", "testgroup");

props.put("zookeeper.session.timeout.ms", "500");

props.put("zookeeper.sync.time.ms", "250");

props.put("auto.commit.interval.ms", "1000");

 

new ConsumerConfig(props);

 

Reading messages from a topic

We can read messages from the topic as:

Map<String, Integer> topicMap = new HashMap<String, Integer>();

topicCount.put(topic, new Integer(1)); // 1 represents the single thread

Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreamsMap = consumer.createMessageStreams(topicMap);

List<KafkaStream<byte[], byte[]>>streamList = consumerStreamsMap.get(topic);  // Get the list of message streams for each topic, using the default decoder.

for (final KafkaStream <byte[], byte[]> stream : streamList) {

  ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();

  while (consumerIte.hasNext())

    System.out.println("Message from Single Topic :: "+ new String(consumerIte.next().message()));

}

 

Complete Code

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

 

public class SimpleKafkaConsumerHL {

  private final ConsumerConnector consumer;

  private final String  topic;

  

  public SimpleKafkaConsumerHL(String zooKeeperUrl, String groupId, String topic) {

    consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(zooKeeperUrl, groupId));

    this.topic = topic;

  }

  

  private static ConsumerConfig createConsumerConfig(String zooKeeperUrl, String groupId) {

    Properties props = new Properties();

    props.put("zookeeper.connect", zooKeeperUrl);

    props.put("group.id", groupId);

    props.put("zookeeper.session.timeout.ms", "500");

    props.put("zookeeper.sync.time.ms", "250");

    props.put("auto.commit.interval.ms", "1000");

    return new ConsumerConfig(props);

  }

  

  public void testConsumer() {

    Map<String, Integer> topicMap = new HashMap<String, Integer>();

    topicMap.put(topic, new Integer(1)); // Define single thread for topic

    Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreamsMap = consumer.createMessageStreams(topicMap);

    List<KafkaStream<byte[], byte[]>> streamList = consumerStreamsMap.get(topic);

    for (final KafkaStream<byte[], byte[]> stream : streamList) {

      ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();

      while (consumerIte.hasNext())

        System.out.println("Message from Topic : " + new String(consumerIte.next().message()));

    }

    if (consumer != null)

      consumer.shutdown();

  }

  

  public static void main(String[] args) {

    String zooKeeperUrl = "localhost:2181";

    String groupId = "testgroup";

    String topic = "mykafkatopic";

    SimpleKafkaConsumerHL simpleHLConsumer = new SimpleKafkaConsumerHL(zooKeeperUrl, groupId, topic);

    simpleHLConsumer.testConsumer();

  }

}

 

Executing and verifying

Before executing the program, make sure kafka server is running and a topic mykafkatopic is created as per steps in http://datastudy.club/en/blog/getting-started-with-kafka-cluster-example....

Once you execute the program, go to the console producer and type in a text (or use the SimpleKafkaProducer class we created in a previous example.)

You will now see the message sent from producer in the eclipse console.

 

Software Versions Tested Against

  •  kafka_2.10-0.8.2.1, JDK 1.8

Learn Serverless from Serverless Programming Cookbook

Contact

Please first use the contact form or facebook page messaging to connect.

Offline Contact
We currently connect locally for discussions and sessions at Bangalore, India. Please follow us on our facebook page for details.
WhatsApp (Primary): (+91) 7411174113
Phone (Escalations): (+91) 7411174114

Business newsletter

Complete the form below, and we'll send you an e-mail every now and again with all the latest news.

About

CloudMaterials is my blog to share notes and learning materials on Cloud and Data Analytics. My current focus is on Microsoft Azure and Amazon Web Services (AWS).

I like to write and I try to document what I learn to share with others. I believe that knowledge is useless unless you share it; the more you share, the more you learn.

Recent comments

Photo Stream