文章目录
  1. 1 下载
  2. 2 安装(解压)
  3. 3 调整配置
  4. 4 设置环境变量
  5. 5 防火墙放行
  6. 6 启动服务器
  7. 7 停止服务器
  8. 8 命令行测试
    1. 8.1 主题的创建与查看
    2. 8.2 主题描述信息查询
    3. 8.3 消息的发布和消费
    4. 8.4 主题的删除【测试环境用】
  9. 9 清空数据【测试环境用】

上篇文章 Kafka 工作机制 讲述了 Kafka 的各组件(包括配置中心、Broker、消息生产者和消费者)的作用,分区与复制的机制等。有了这些概念,本文以三个 Broker 为例,讲述了 Kafka 集群的搭建步骤和方法,并以官方自带的命令行脚本进行消息的生产、消费、查看等操作。

作者:王克锋
出处:https://kefeng.wang/2017/11/16/kafka-deploy/
版权:自由转载-非商用-非衍生-保持署名,转载请标明作者和出处。

1 下载

最新版本为 1.0.0,发布于 2017-11-01。
http://kafka.apache.org/downloads
https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka_2.11-1.0.0.tgz

2 安装(解压)

1
2
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz
sudo tar -zxf kafka_2.11-1.0.0.tgz -C /opt

3 调整配置

打算部署成三个节点的集群,把 config/server.properties 复制成三份:

1
2
3
sudo cp $KAFKA_HOME/config/server.properties $KAFKA_HOME/config/server-1.properties
sudo cp $KAFKA_HOME/config/server.properties $KAFKA_HOME/config/server-2.properties
sudo cp $KAFKA_HOME/config/server.properties $KAFKA_HOME/config/server-3.properties

修改三个文件 config/server-X.properties 如下内容(前三行末字符分别用 1/2/3):
修改后用命令检查: grep -E "^broker.id|^log.dirs|^listeners" $KAFKA_HOME/config/server-?.properties
其中的参数 zookeeper.connect 用来指定 ZooKeeper 服务器地址,三个文件内容一样。
注意:三个 host:port 共用一个 /kafka,表示三个 ZooKeeper 服务器中都使用 /kafka 作为 kafka 存储的根目录。

1
2
3
4
5
6
7
8
9
10
11
12
13
### 必设参数
broker.id=1 ## 服务器的代理ID(默认值-1),需与 zookeeper 的代理ID不同,建议 brokerId 从 maxZookeeperId+1 开始设置;
log.dirs=/tmp/kafka-logs-1 ## 消息日志数据保存的目录;
listeners=PLAINTEXT://:9091 ## 面向客户端的监听器列表;
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183/kafka ## ZooKeeper服务器连接字符串;

### 重要参数
auto.create.topics.enable=true ## 当主题不存在时,允许自动创建主题(默认为 true);
num.partitions=3 ## 每个主题的默认日志分区数量(默认为1);
default.replication.factor=3 ## 自动创建主题的默认复制因子(默认为3);
log.retention.hours=168 ## 比这个保留期更早的消息将被丢弃(默认为 168小时,即7天);
delete.topic.enable=true ## 允许删除主题(默认为 true);
controlled.shutdown.enable=true ## 支持优雅的关机(默认为true)

4 设置环境变量

1
2
3
## sudo vim /etc/profile ## 所有用户有效
export KAFKA_HOME=/opt/kafka_2.11-1.0.0
export PATH=$PATH:$KAFKA_HOME/bin

保存文件,Linux 用户重新登录后生效。

5 防火墙放行

如果非本机应用需要连接,必须把监听端口放行。

1
2
3
4
5
### sudo vim /etc/sysconfig/iptables
-A INPUT -m state --state NEW -m tcp -p tcp --dport 9091 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 9092 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 9093 -j ACCEPT
### 重启生效: sudo systemctl restart iptables

6 启动服务器

指定选项 -daemon 时采用“守护进程”启动,否则以“控制台进程”启动。

1
2
3
4
5
## sudo vim $KAFKA_HOME/bin/kafka-server-start-all.sh
## sudo chmod +x $KAFKA_HOME/bin/kafka-server-start-all.sh
sudo $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server-1.properties
sudo $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server-2.properties
sudo $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server-3.properties

7 停止服务器

Kafka 集群将自动检测到任何 Broker 故障或关机(包括人为地),并为该机器上的分区选择新的领导。
因为维护或配置更改而故意停机时,Kafka 支持更优雅的关机机制(默认配置已开启 controlled.shutdown.enable=true):

  • 将所有的日志同步到磁盘,以避免重新启动时需要做任何日志恢复;
  • 在关闭之前将服务器领导者的任何分区迁移到其他副本;
1
2
3
4
5
## sudo vim $KAFKA_HOME/bin/kafka-server-stop-all.sh
## sudo chmod +x $KAFKA_HOME/bin/kafka-server-stop-all.sh
## cat $KAFKA_HOME/bin/kafka-server-{stop,start}-all.sh | sudo tee $KAFKA_HOME/bin/kafka-server-restart-all.sh
## sudo chmod +x $KAFKA_HOME/bin/kafka-server-restart-all.sh
sudo $KAFKA_HOME/bin/kafka-server-stop.sh

8 命令行测试

注意:zookeeper 的 host:port/kafka,表示 ZooKeeper 中使用 /kafka 作为 kafka 存储信息的根目录。

8.1 主题的创建与查看

创建一个分区数为1、复制因子为 3 的主题,名称为 topicName
默认配置时(auto.create.topics.enable=true),针对不存在的主题发布或消费时,主题会自动创建,而且采用的分区数和复制因子都有相应的配置(num.partitions=1default.replication.factor=3)。

1
2
3
4
5
kafka-topics.sh --create --zookeeper localhost:2181/kafka --partitions 1 --replication-factor 3 --topic topicName
# Result: Created topic "topicName".

kafka-topics.sh --list --zookeeper localhost:2181/kafka
# Result: topicName / topicName2 / topicName3

8.2 主题描述信息查询

1
2
# kafka-topics.sh --create --zookeeper localhost:2181/kafka --partitions 3 --replication-factor 2 --topic topicName
kafka-topics.sh --describe --zookeeper localhost:2181/kafka --topic topicName

输出信息如下:

1
2
3
4
Topic:topicName3       PartitionCount:3             ReplicationFactor:2   Configs:
Topic: topicName3 Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: topicName3 Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: topicName3 Partition: 2 Leader: 3 Replicas: 3,1 Isr: 3,1

可看到每个 broker 的角色:

  • 第一行(主题概要):分区数 3,复制因子 2;
  • 后面各行是各个分区(0/1/2)的信息,字段含义如下:
  •   Leader: 作为主题 Leader 的 brokerId;
  •   Replicas: 表示复制数据的节点的 brokerId(Leader 也可以在其中);
  •   isr(in-sync replicas): 是 Replicas 的子集(当前存活者),等待升级为 Leader 的 brokerId;

8.3 消息的发布和消费

1
2
3
4
5
kafka-console-producer.sh --broker-list localhost:9092 --topic topicName
## 等待提示符(大于号)出现后,输入文本,每行作为一个消息来发布。按 CTRL+C 结束

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicName
## 会逐行打印收到的消息,按 CTRL+C 结束。如果要从头接收,要增加选项 --from-beginning

8.4 主题的删除【测试环境用】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
grep "delete.topic.enable" $KAFKA_HOME/logs/server.log ## 确保为 true(默认)
kafka-topics.sh --delete --zookeeper localhost:2181/kafka --topic topicName

## 【可能不需要此步】前面只是对主题做了删除标记,必须手工再删除
$KAFKA_HOME/bin/kafka-server-stop-all.sh
zkCli.sh -server localhost:2181 <<EOF
rmr /kafka/brokers/topics/topicName
rmr /kafka/admin/delete_topics/topicName
rmr /kafka/config/topics/topicName
quit
EOF
sudo rm -rf /tmp/kafka-logs-{1,2,3}/topicName
$KAFKA_HOME/bin/kafka-server-start-all.sh

## 确认删除结果
kafka-topics.sh --list --zookeeper localhost:2181/kafka

9 清空数据【测试环境用】

1
2
3
sudo $KAFKA_HOME/bin/kafka-server-stop-all.sh
sudo rm -rf /tmp/kafka-logs /tmp/kafka-logs-{1,2,3}
sudo $KAFKA_HOME/bin/kafka-server-start-all.sh
文章目录
  1. 1 下载
  2. 2 安装(解压)
  3. 3 调整配置
  4. 4 设置环境变量
  5. 5 防火墙放行
  6. 6 启动服务器
  7. 7 停止服务器
  8. 8 命令行测试
    1. 8.1 主题的创建与查看
    2. 8.2 主题描述信息查询
    3. 8.3 消息的发布和消费
    4. 8.4 主题的删除【测试环境用】
  9. 9 清空数据【测试环境用】