We will write a simple producer to send messages to a Kafka broker using Java.
This is a simple program without specifying any custom partitioning. We will also create another Java producer with custom partitioning in later notes.
Summary of Steps
-
Setting up the environment
-
Defining properties
-
Building the message and sending it
-
Complete program
-
Executing and verifying
Setting up the environment
-
Install, configure and verify Kafka installation as in http://datastudy.club/en/blog/getting-started-with-kafka-cluster-example....
-
We will use eclipse IDE for this example.
-
Create a Java project.
-
Add all jars from libs folder to class path. If you are using an eclipse java project, right click and go to Build Path > Configure Build Path > Libraries > Add External Jars.
Create an empty class SimpleKafkaProducer and add below import statements and see if code is compiling to verify if the libraries are added correctly.
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class SimpleKafkaProducer {
}
Defining properties
We need to define below properties for making a connection with the Kafka broker and pass these properties to the Kafka producer:
Properties props = new Properties();
props.put("metadata.broker.list","localhost:9092");
props.put("serializer.class","kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String> (config);
Building the message and sending it
You can create some message and send it as below:
String runtime = new Date().toString();
String msg = "Message Publishing Time is " + runtime;
KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, msg);
producer.send(data);
Complete Java Program
import java.util.Date;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class SimpleKafkaProducer {
private static Producer<String, String> producer;
private static final String TOPIC_NAME = "mykafkatopic";
public SimpleKafkaProducer() {
Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
producer = new Producer<String, String>(config);
}
public static void main(String[] args) {
String topic = TOPIC_NAME;
System.out.println("Topic Name - " + topic);
SimpleKafkaProducer simpleKafkaProducer = new SimpleKafkaProducer();
simpleKafkaProducer.publishMessage(topic);
}
private void publishMessage(String topic) {
String runtime = new Date().toString();
String msg = "Message Publishing Time is " + runtime;
KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, msg);
producer.send(data);
producer.close();
}
}
Executing and verifying the program
Before executing the program, make sure kafka server is running and a topic mykafkatopic is created as per steps in http://datastudy.club/en/blog/getting-started-with-kafka-cluster-example....
Once you execute the program, you should see the messages in the console based consumer.
Software Versions Tested Against
-
kafka_2.10-0.8.2.1, JDK 1.8
- heartin's blog
- Log in or register to post comments
Recent comments