Kafka 服务器集群部署
上篇文章 Kafka 工作机制 讲述了 Kafka 的各组件(包括配置中心、Broker、消息生产者和消费者)的作用,分区与复制的机制等。有了这些概念,本文以三个 Broker 为例,讲述了 Kafka 集群的搭建步骤和方法,并以官方自带的命令行脚本进行消息的生产、消费、查看等操作。
作者:王克锋
出处:https://kefeng.wang/2017/11/16/kafka-deploy/
版权:自由转载-非商用-非衍生-保持署名,转载请标明作者和出处。
1 下载
最新版本为 1.0.0,发布于 2017-11-01。
http://kafka.apache.org/downloads
https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka_2.11-1.0.0.tgz
2 安装(解压)
1 | wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz |
3 调整配置
打算部署成三个节点的集群,把 config/server.properties
复制成三份:1
2
3sudo cp $KAFKA_HOME/config/server.properties $KAFKA_HOME/config/server-1.properties
sudo cp $KAFKA_HOME/config/server.properties $KAFKA_HOME/config/server-2.properties
sudo cp $KAFKA_HOME/config/server.properties $KAFKA_HOME/config/server-3.properties
修改三个文件 config/server-X.properties
如下内容(前三行末字符分别用 1/2/3):
修改后用命令检查: grep -E "^broker.id|^log.dirs|^listeners" $KAFKA_HOME/config/server-?.properties
其中的参数 zookeeper.connect
用来指定 ZooKeeper 服务器地址,三个文件内容一样。
注意:三个 host:port
共用一个 /kafka
,表示三个 ZooKeeper 服务器中都使用 /kafka
作为 kafka 存储的根目录。1
2
3
4
5
6
7
8
9
10
11
12
13### 必设参数
broker.id=1 ## 服务器的代理ID(默认值-1),需与 zookeeper 的代理ID不同,建议 brokerId 从 maxZookeeperId+1 开始设置;
log.dirs=/tmp/kafka-logs-1 ## 消息日志数据保存的目录;
listeners=PLAINTEXT://:9091 ## 面向客户端的监听器列表;
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183/kafka ## ZooKeeper服务器连接字符串;
### 重要参数
auto.create.topics.enable=true ## 当主题不存在时,允许自动创建主题(默认为 true);
num.partitions=3 ## 每个主题的默认日志分区数量(默认为1);
default.replication.factor=3 ## 自动创建主题的默认复制因子(默认为3);
log.retention.hours=168 ## 比这个保留期更早的消息将被丢弃(默认为 168小时,即7天);
delete.topic.enable=true ## 允许删除主题(默认为 true);
controlled.shutdown.enable=true ## 支持优雅的关机(默认为true)
4 设置环境变量
1 | # sudo vim /etc/profile ## 所有用户有效 |
保存文件,Linux 用户重新登录后生效。
5 防火墙放行
如果非本机应用需要连接,必须把监听端口放行。1
2
3
4
5## sudo vim /etc/sysconfig/iptables
-A INPUT -m state --state NEW -m tcp -p tcp --dport 9091 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 9092 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 9093 -j ACCEPT
## 重启生效: sudo systemctl restart iptables
6 启动服务器
指定选项 -daemon
时采用“守护进程”启动,否则以“控制台进程”启动。1
2
3
4
5# sudo vim $KAFKA_HOME/bin/kafka-server-start-all.sh
# sudo chmod +x $KAFKA_HOME/bin/kafka-server-start-all.sh
sudo $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server-1.properties
sudo $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server-2.properties
sudo $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server-3.properties
7 停止服务器
Kafka 集群将自动检测到任何 Broker 故障或关机(包括人为地),并为该机器上的分区选择新的领导。
因为维护或配置更改而故意停机时,Kafka 支持更优雅的关机机制(默认配置已开启 controlled.shutdown.enable=true
):
- 将所有的日志同步到磁盘,以避免重新启动时需要做任何日志恢复;
- 在关闭之前将服务器领导者的任何分区迁移到其他副本;
1 | # sudo vim $KAFKA_HOME/bin/kafka-server-stop-all.sh |
8 命令行测试
注意:zookeeper 的 host:port/kafka
,表示 ZooKeeper 中使用 /kafka
作为 kafka 存储信息的根目录。
8.1 主题的创建与查看
创建一个分区数为1、复制因子为 3 的主题,名称为 topicName
默认配置时(auto.create.topics.enable=true
),针对不存在的主题发布或消费时,主题会自动创建,而且采用的分区数和复制因子都有相应的配置(num.partitions=1
和default.replication.factor=3
)。
1 | kafka-topics.sh --create --zookeeper localhost:2181/kafka --partitions 1 --replication-factor 3 --topic topicName |
8.2 主题描述信息查询
1 | # kafka-topics.sh --create --zookeeper localhost:2181/kafka --partitions 3 --replication-factor 2 --topic topicName |
输出信息如下:1
2
3
4Topic:topicName3 PartitionCount:3 ReplicationFactor:2 Configs:
Topic: topicName3 Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: topicName3 Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: topicName3 Partition: 2 Leader: 3 Replicas: 3,1 Isr: 3,1
可看到每个 broker 的角色:
- 第一行(主题概要):分区数 3,复制因子 2;
- 后面各行是各个分区(0/1/2)的信息,字段含义如下:
- Leader: 作为主题 Leader 的 brokerId;
- Replicas: 表示复制数据的节点的 brokerId(Leader 也可以在其中);
- isr(in-sync replicas): 是 Replicas 的子集(当前存活者),等待升级为 Leader 的 brokerId;
8.3 消息的发布和消费
1 | kafka-console-producer.sh --broker-list localhost:9092 --topic topicName |
8.4 主题的删除【测试环境用】
1 | grep "delete.topic.enable" $KAFKA_HOME/logs/server.log ## 确保为 true(默认) |
9 清空数据【测试环境用】
1 | sudo $KAFKA_HOME/bin/kafka-server-stop-all.sh |