Kafka 客户端开发
前两篇文章讲述了 Kafka 的 工作机制 和 服务器集群部署。至此,Kafka 服务器已就绪,本文分别以官方API、Spring、SpringBoot三种构建方式,讲述了 Kafka 消费生产者和消费者的开发。
作者:王克锋
出处:https://kefeng.wang/2017/11/18/kafka-development/
版权:自由转载-非商用-非衍生-保持署名,转载请标明作者和出处。
1 开发概述
Kafka 中,客户端与服务端是通过 TCP 协议进行的;
Kafka 公布了所有功能协议(与特定语言无关),并把 Java 客户端作为 kafka 项目的一部分进行维护。
其他非 Java 语言的客户端则作为独立的开源项目提供,非 Java 客户端的名单可在 这里。
Kafka 提供了五类 API:
- Producer API: 向主题(一个或多个)发布消息;
- Consumer API: 订阅主题(一个或多个),拉取这些主题上发布的消息;
- Stream API: 作为流处理器,从主题消费消息,向主题发布消息,把输出流转换为输入流;可参考 例子;
- Connect API: 作为下游或上游,把主题连接到应用程序或数据系统(比如关系数据库),通常不需要直接使用这些API,而是使用 现成的连接器;
- AdminClient API: 管理(或巡查) topic, brokers, 或其他 kafka 对象;
2 基于官方 API 开发
2.1 Maven 依赖
1 | <dependency> |
2.2 logback.xml(日志配置,可选)
1 | <?xml version="1.0" encoding="UTF-8"?> |
2.3 演示类 KafkaClientDemo.java
生产者:相应函数为 KafkaClientDemo.producerDemo(),其中 props 完整参数配置项见 Producer Configs
消费者:相应函数为 KafkaClientDemo.consumerDemo(),其中 props 完整参数配置项见 New Consumer Configs 和 Old Consumer Configs1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61public class KafkaClientDemo {
private static final Logger logger = LoggerFactory.getLogger(KafkaClientDemo.class);
private static final String BROKER_SERVERS = "centos:9091,centos:9092,centos:9093";
private static final String TOPIC_NAME = "topicName";
public static void producerDemo() {
// 配置选项
Properties props = new Properties();
props.put("bootstrap.servers", BROKER_SERVERS); // [必填] Kafka Broker 地址列表
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); // [必填] KEY 的序列化类
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // [必填] VALUE 的序列化类
// props.put("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner"); // [默认值] 结合主题的分区个数和KEY,使得消息平均地分配给分区
Producer<Integer, String> producer = new KafkaProducer<Integer, String>(props); // 建立连接
for (int id = 1; id <= 8; ++id) {
final int key = id;
final String value = String.format("msg#%d", key);
ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(TOPIC_NAME, key, value);
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata meta, Exception e) {
logger.info("KafkaProducer.push(\"{}\", {}, {}, {}, \"{}\") OK.",
meta.topic(), meta.partition(), meta.offset(), key, value);
}
}); // 推送消息
}
producer.flush(); // 提交
producer.close(); // 关闭连接
}
public static void consumerDemo() {
// 配置选项
Properties props = new Properties();
props.put("bootstrap.servers", BROKER_SERVERS); // [必填] Kafka Broker 地址列表
props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); // [必填] KEY 的反序列化类
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // [必填] VALUE 的反序列化类
props.put("group.id", "groupName"); // [必填] 本消费者所属分组
// 开始消费
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(props); // 建立连接
consumer.subscribe(Arrays.asList(TOPIC_NAME)); // 订阅主题
while (true) {
ConsumerRecords<Integer, String> records = consumer.poll(1000); // 拉取消息
if (records.isEmpty()) {
break;
}
logger.info("KafkaConsumer.poll({}) OK.", records.count());
for (ConsumerRecord<Integer, String> record : records) {
logger.info("KafkaConsumer.poll(\"{}\", {}, {}, {}, \"{}\") OK.",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
}
consumer.close(); // 关闭连接
}
public static void main(String[] args) {
KafkaClientDemo.producerDemo();
KafkaClientDemo.consumerDemo();
}
}
2.4 运行结果
1 | ## 生产者 |
3 基于 Spring 开发
官网: http://projects.spring.io/spring-kafka/
介绍: https://docs.spring.io/spring-kafka/docs/2.1.2.RELEASE/reference/html/
API: http://docs.spring.io/spring-kafka/docs/2.1.2.RELEASE/api/
3.1 Maven 依赖
环境要求: Apache Kafka 1.0.0, Java 8+1
2
3
4
5
6
7
8
9
10<dependency><!-- logback 日志 -->
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency><!-- spring-kafka -->
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.2.RELEASE</version>
</dependency>
3.2 spring.xml
1 | <?xml version="1.0" encoding="UTF-8"?> |
3.3 KafkaProducerListener.java
1 | public class KafkaProducerListener extends ProducerListenerAdapter<Integer, String> { |
3.4 KafkaProducerListener.java
1 | public class KafkaConsumerListener implements MessageListener<Integer, String> { |
3.5 KafkaClientDemo.java
1 |
|
3.6 运行结果
1 | ## 生产者 |
4 基于 SpringBoot 开发
创建 SpringBoot 工程。
4.1 Maven 依赖
1 | <dependency> |
4.2 application.properties
1 | # Brokers 地址列表 |
4.3 KafkaClientDemo.java
1 |
|
4.4 运行结果
运行 SpringBoot 的 Application 类(无需任何调整),结果如下:1
2
3
4
5
6
7
8
9
10
11## 可见:一个生产者定时投递消息;两个消费者(属于同一消费者组 groupName)交替收取消息。
14:36:52.586 INFO [KafkaClientDemo.java:30] - KafkaClientDemo.producerDemo("msg#1") ...
14:36:52.889 INFO [KafkaClientDemo.java:37] - KafkaClientDemo.consumerDemo1("msg#1") OK.
14:36:53.583 INFO [KafkaClientDemo.java:30] - KafkaClientDemo.producerDemo("msg#2") ...
14:36:53.603 INFO [KafkaClientDemo.java:43] - KafkaClientDemo.consumerDemo2("msg#2") OK.
14:36:54.583 INFO [KafkaClientDemo.java:30] - KafkaClientDemo.producerDemo("msg#3") ...
14:36:54.613 INFO [KafkaClientDemo.java:37] - KafkaClientDemo.consumerDemo1("msg#3") OK.
14:36:55.583 INFO [KafkaClientDemo.java:30] - KafkaClientDemo.producerDemo("msg#4") ...
14:36:55.600 INFO [KafkaClientDemo.java:37] - KafkaClientDemo.consumerDemo1("msg#4") OK.
14:36:56.583 INFO [KafkaClientDemo.java:30] - KafkaClientDemo.producerDemo("msg#5") ...
14:36:56.598 INFO [KafkaClientDemo.java:43] - KafkaClientDemo.consumerDemo2("msg#5") OK.