CentOS7 下 kafka 集群安装部署

本文介绍了在 CentOS7 下安装和部署 Kafka 集群的步骤。首先介绍了 Kafka 的简介,包括其作为高吞吐量的分布式发布订阅消息系统的特点。然后说明了搭建 Kafka 集群环境所需的设备环境,包括准备一个 Zookeeper 环境。接下来详细介绍了安装步骤,包括下载 Kafka、解压安装、配置环境变量、编辑配置文件、创建 system 服务、启动和关闭 Kafka 以及查看集群状态。最后,介绍了 Kafka 命令行工具的使用,包括测试流程和测试步骤,涵盖了创建 topic、生产消息、消费消息等操作。

CentOS7 下 kafka 集群安装部署

1. kafka 简介

Apache kafka 是由 Apache 软件基金会开发的一个开源流处理平台,由 Scala 和 Java 编写。Kafka 是一种高吞吐量的分布式发布订阅消息系统,是消息中间件的一种,用于构建实时数据管道和流应用程序,非常流行。

Kafka官网:http://kafka.apache.org

学习推荐:http://orchome.com/kafka/index

官网下载:http://kafka.apache.org/downloads

2. 设备环境

Kafka 集群环境搭建,需要准备好一个 zookeeper 环境(集群)

Host IP Port OS Software
cnode1 10.128.170.21 9092 CentOS 7.9.2009 kafka 2.13-2.8.1
cnode2 10.128.170.22 9092 CentOS 7.9.2009 kafka 2.13-2.8.1
cnode3 10.128.170.23 9092 CentOS 7.9.2009 kafka 2.13-2.8.1

说明:kafka 名中的 2.13 是 Scala 语言版本,后面的 2.8.1 是 kafka 版本,端口默认为 9092。

3. 安装步骤

3.1 下载 kafka

官网下载太慢,推荐使用国内镜像进行下载,清华镜像下载地址:

https://mirrors.tuna.tsinghua.edu.cn/apache/kafka

这里以下载 2.8.1 版本为例:

1
wget -c https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.1/kafka_2.13-2.8.1.tgz

3.2 解压安装

这里解压至 /opt 目录:

1
tar -zxvf kafka_2.13-2.8.1.tgz -C /opt

解压后目录:

1
2
3
4
5
6
7
8
9
10
11
[root@cnode1 kafka_2.13-2.8.1]# pwd
/opt/kafka_2.13-2.8.1
[root@cnode1 kafka_2.13-2.8.1]# ll
total 40
drwxr-xr-x. 3 root root 4096 Sep 14 21:09 bin
drwxr-xr-x. 3 root root 4096 Sep 14 21:09 config
drwxr-xr-x. 2 root root 8192 Feb 18 10:26 libs
-rw-r--r--. 1 root root 14520 Sep 14 21:03 LICENSE
drwxr-xr-x. 2 root root 262 Sep 14 21:09 licenses
-rw-r--r--. 1 root root 953 Sep 14 21:03 NOTICE
drwxr-xr-x. 2 root root 44 Sep 14 21:09 site-docs

3.3 配置环境变量(可选)

编辑 /etc/profile 文件:

1
vim /etc/profile

在文件末尾添加如下配置:

1
2
3
4
# Kafka Environment
export KAFKA_HOME=/opt/kafka_2.13-2.8.1

export PATH=${PATH}:${KAFKA_HOME}/bin

使配置生效:

1
source /etc/profile

3.4 编辑配置文件

进入 config 目录:

1
cd /opt/kafka_2.13-2.8.1/config/

备份原配置文件:

1
cp server.properties server.properties.bak

编辑配置文件:

1
vim server.properties

修改配置如下:

1
2
3
4
5
6
7
############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1 # 默认是 0,这里自定义 cnode1:1 cnode2:2 cnode3:3

# Switch to enable topic deletion or not, default value is false
#delete.topic.enable # 这里为新增行,默认是 false,为 false 时,删除 topic 时不会立即被删掉
1
2
3
4
5
6
7
8
9
10
############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092 # 根据需要修改端口
host.name=cnode1 # 这里为新增行
1
2
3
4
############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/data/kafka-logs # 默认是 /tmp/kafka-logs,这里重新定义了路径
1
2
3
4
5
6
7
8
9
############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
# 这里写上已准备好的 zookeeper 集群环境
zookeeper.connect=cnode1:2181,cnode2:2181,cnode3:2181

其它配置可以保持默认,保存退出。在 cnode2 和 cnode3 上进行同样的操作,不再赘述。

3.5 创建 system 服务(可选)

1
vim /usr/lib/systemd/system/kafka.service

添加如下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[Unit]
Description=kafka service
After=network.target

[Service]
Type=forking
User=root
Group=root
# 启用 jmx
#Environment=JMX_PORT=9988
ExecStart=/opt/kafka_2.13-2.8.1/bin/kafka-server-start.sh -daemon /opt/kafka_2.13-2.8.1/config/server.properties
ExecStop=/opt/kafka_2.13-2.8.1/bin/kafka-server-stop.sh
Restart=always
RestartSec=1
StartLimitIntervalSec=0

[Install]
WantedBy=multi-user.target

https://lists.freedesktop.org/archives/systemd-devel/2017-July/039255.html

3.6 启动 kafka

这里以未配置环境变量为例。

切换到 bin 目录下:

1
cd /opt/kafka_2.13-2.8.1/bin/

启动服务:

1
[root@cnode1 bin]# ./kafka-server-start.sh -daemon ../config/server.properties

查看 9092 端口状态,确保服务已经启动:

1
2
3
4
[root@cnode1 bin]# netstat -anptl | grep 9092
tcp6 0 0 10.128.170.21:9092 :::* LISTEN 29553/java
tcp6 0 0 10.128.170.21:40092 10.128.170.21:9092 ESTABLISHED 29553/java
tcp6 0 0 10.128.170.21:9092 10.128.170.21:40092 ESTABLISHED 29553/java

同样地,启动 cnode2 和 cnode3:

1
[root@cnode2 bin]# ./kafka-server-start.sh -daemon ../config/server.properties
1
[root@cnode3 bin]# ./kafka-server-start.sh -daemon ../config/server.properties

至此,kafka 安装启动完成。

3.7 关闭 kafka

1
2
3
[root@cnode1 bin]# ./kafka-server-stop.sh
[root@cnode2 bin]# ./kafka-server-stop.sh
[root@cnode3 bin]# ./kafka-server-stop.sh

3.8 查看集群状态

登录 zookeeper(切换到 zookeeper 的 bin 目录下):

1
2
3
4
5
6
7
8
9
[root@cnode1 bin]# cd /opt/apache-zookeeper-3.6.3-bin/bin/
[root@cnode1 bin]# ./zkCli.sh -server cnode2
...
[zk: cnode2(CONNECTED) 0] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
[zk: cnode2(CONNECTED) 1] ls /brokers
[ids, seqid, topics]
[zk: cnode2(CONNECTED) 2] ls /brokers/ids
[1, 2, 3]

说明:zookeeper 集群建好之后,通过 ls / 出来的只有 zookeeper,连接 kafka 使用后,/ 下面多了不少东西,其中通过查看 /brokers/ids 可以发现已经检查到了已经安装的三台 kafka 的 broker.id [1,2,3]。

4. kafka 命令行工具使用

4.1 测试流程

主线:创建 topic,生产消息,消费消息。

流程命令会连接不同的节点,可以顺带检测集群消息的同步情况。

(1)在 cnode1 上创建 topic

(2)在 cnode3 查看 topic

(3)在 cnode1 上生成消息

(4)在 cnode2 上消费消息

4.2 测试步骤

(1)先创建一个 topic

在 cnode1 节点执行

1
2
[root@cnode1 bin]# ./kafka-topics.sh --create --zookeeper cnode1:2181 --replication-factor 3 --partitions 2 --topic google
Created topic google.

创建一个名叫 "google" 的 topic 成功。

(2)查看一下建好的 topic

在 cnode3 节点执行

1
2
[root@cnode3 bin]# ./kafka-topics.sh --list --zookeeper cnode3:2181
google

(3)在建好的 topic 下生产消息

在 cnode1 节点执行

1
2
3
4
5
[root@cnode1 bin]# ./kafka-console-producer.sh --broker-list cnode1:9092 --topic google
>Apple
>Banana
>Cat
>Dog

这里生产了 4 个消息,每行一个。

(4)消费指定 topic 下的消息

在 cnode2 节点执行

1
2
3
4
5
[root@cnode2 bin]# ./kafka-console-consumer.sh --bootstrap-server cnode2:9092 --topic google --from-beginning
Banana
Dog
Apple
Cat

(5)删除指定 topic

先 list 一下然后删除

1
2
3
4
5
6
7
8
[root@cnode1 bin]# ./kafka-topics.sh --list --zookeeper cnode1:2181
__consumer_offsets
google
[root@cnode1 bin]# ./kafka-topics.sh --delete --zookeeper cnode3:2181 --topic google
Topic google is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[root@cnode1 bin]# ./kafka-topics.sh --list --zookeeper cnode1:2181
__consumer_offsets

References

centos7下kafka集群安装部署

kafka命令行脚本使用