Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。
操作系统:CentOS-7.x\8.x\AnolisOS\openEuler\Kylin-Server-V10等
kafka版本:kafka_2.12-3.9.1
三台服务器ip地址:
192.168.21.100,192.168.21.101,192.168.21.128
1、关闭selinux
sed -i 's/^SELINUX=.*/SELINUX=disabled/' /etc/selinux/config
setenforce 0
2、关闭防火墙
kafka默认使用tcp9092端口号,建议在内网环境下部署kafka集群,并且关闭防火墙。
CentOS-7.x默认使用的是firewall作为防火墙,关闭
systemctl stop firewalld.service #停止firewall
systemctl disable firewalld.service #禁止firewall开机启动
systemctl mask firewalld
systemctl stop firewalld
yum remove firewalld
3、安装jdk
kafka依赖Java,需要安装java环境,我们使用Java21版本
Linux系统下安装Java JDK:https://www.osyunwei.com/archives/12872.html
4、安装配置zookeeper
在 Kafka 2.8.0 及其以上版本,引入了 KRaft 模式(Kafka Raft Metadata mode),不再依赖 ZooKeeper,所有元数据由 Kafka 自己通过 Raft 协议管理。
使用 Kafka 内部的 Quorum 控制器来取代 ZooKeeper管理元数据,元数据保存在controller中,
这样我们无需维护zk集群,只要维护Kafka集群就可以了,节省运算资源。
使用KRaft 模式,无需再安装部署zookeeper集群
5、添加hosts解析
vi /etc/hosts #编辑配置文件
192.168.21.100 kafka01
192.168.21.101 kafka02
192.168.21.128 kafka03
:wq! #保存退出
6、安装kafka
6.1下载地址
https://kafka.apache.org/downloads
https://dlcdn.apache.org/kafka/3.9.1/kafka_2.12-3.9.1.tgz
上传到/usr/local/src目录
#创建kafka安装目录
mkdir -p /data/server/kafka
mkdir -p /data/server/kafka/kafka-logs
mkdir -p /data/server/kafka/meta
cd /usr/local/src
#解压到安装目录
tar zxvf kafka_2.12-3.9.1.tgz -C /data/server/kafka --strip-components 1
6.2配置环境变量
vi /etc/profile
#set kafka environment
export KAFKA_HOME=/data/server/kafka
export PATH=$PATH:$KAFKA_HOME/bin
:wq! #保存退出
source /etc/profile #使配置立即生效
6.3设置kafka的KRaft模式配置文件
cp /data/server/kafka/config/kraft/server.properties /data/server/kafka/config/kraft/server.properties.bak
vi /data/server/kafka/config/kraft/server.properties #kafka配置文件
process.roles=broker,controller #节点角色(controller + broker 表示既是控制器又是 Broker)
node.id=1 #KRaft模式需要添加node.id代替broker.id,取值范围1-1000的整数,每个Kafka节点必须唯一
#控制器 Quorum 投票者列表(所有 controller 节点)
#controller.quorum.voters 中的前面数字(如 1, 2, 3)必须与各个 Kafka 节点中配置的 node.id 严格一致、一一对应。
#9092是客户端连接端口,9093 是控制器间通信端口(Controller-to-Controller,用于 Raft 协议)
controller.quorum.voters=1@kafka01:9093,2@kafka02:9093,3@kafka03:9093
listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
advertised.listeners=PLAINTEXT://kafka01:9092,CONTROLLER://kafka01:9093 #每个Kafka节点写自己的名称kafka02、kafka03
controller.listener.names=CONTROLLER #定义 CONTROLLER 协议使用的监听名称
num.network.threads=32 #处理网络请求的最大线程数
num.io.threads=32 #处理I/O请求的线程数
log.dirs=/data/server/kafka/kafka-logs
metadata.log.dir=/data/server/kafka/meta #将元数据日志和其他数据日志分开,便于管理与维护
num.partitions=3 #全局默认分区,集群环境下修改为节点数量3
offsets.topic.replication.factor=3 #默认主题__consumer_offsets的副本数量,默认值是1,集群环境下修改为节点数量3,否则挂掉一个节点整个集群就不可用了
default.replication.factor=3 #全局默认副本,集群环境下修改为节点数量3
delete.topic.enable=true #允许通过kafka命令行就可以直接删除topic
auto.create.topics.enable=false #关闭自动创建topic
log.retention.hours=168 #默认日志保留时间7天(168小时)
log.retention.bytes=1073741824 #限制每个分区的日志文件大小为1GB。当日志文件大小达到此限制时,Kafka将开始删除最旧的消息以释放空间
buffer.memory=68719476736
batch.size=2097152
max.request.size=4194304
message.max.bytes=6291456
fetch.max.bytes=7340032
linger.ms=10
compression.type=snappy
zookeeper.connection="" #禁止ZooKeeper模式
:wq! #保存退出
6.4初始化kafka集群
#在其中一台服务器上执行下面命令生成储目录唯一的uuid
KAFKA_CLUSTER_ID="$(/data/server/kafka/bin/kafka-storage.sh random-uuid)"
#查看uuid
echo "$KAFKA_CLUSTER_ID"
HPLwMwurQgG_qIjTlSpUWg
#用该uuid格式化kafka存储目录,三台服务器都要执行以下命令
/data/server/kafka/bin/kafka-storage.sh format -t HPLwMwurQgG_qIjTlSpUWg -c /data/server/kafka/config/kraft/server.properties
6.5启动kafka
#启动参数设置,默认都是1GB,可以根据服务器配置时代做调整
vi /data/server/kafka/bin/kafka-server-start.sh
export KAFKA_HEAP_OPTS="-Xmx32G -Xms32G -Xmn32G -XX:MetaspaceSize=256M -XX:+UseG1GC -XX:MaxGCPauseMillis=50 -XX:G1HeapRegionSize=16M"
:wq! #保存退出
#三台机器分别启动kafka
cd /data/server/kafka/bin
./kafka-server-start.sh -daemon ../config/kraft/server.properties #以后台方式启动
验证 ,有Kafka了证明启动没有问题
jps
jps | grep Kafka
ps -ef |grep kafka
#查看控制器 Quorum 状态
/data/server/kafka/bin/kafka-metadata-quorum.sh \
--bootstrap-controller kafka01:9093 \
describe \
--status
#从控制器日志中查找元数据版本信息
grep -i "MetadataVersion" /data/server/kafka/logs/server.log
6.6创建topics
只在1台主节点操作
cd /data/server/kafka/bin
./kafka-topics.sh --create --topic testtopics --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3
6.7查看主题
cd /data/server/kafka/bin
./kafka-topics.sh --list --bootstrap-server localhost:9092
6.8生产测试
cd /data/server/kafka/bin
./kafka-console-producer.sh --broker-list localhost:9092 --topic testtopics
>>>
>test
>kafka-tets
6.9消费测试
cd /data/server/kafka/bin
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testtopics --from-beginning
test
kafka-tets
#查看所有消费者组
cd /data/server/kafka/bin
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
6.10查看topics信息
./kafka-topics.sh --bootstrap-server kafka01:9092,kafka02:9092,kafka:9092 --topic testtopics --describe
7、设置kafka开机启动
vi /etc/rc.d/rc.local
/bin/sh /data/server/kafka/bin/kafka-server-start.sh -daemon /data/server/kafka/config/kraft/server.properties
:wq! #保存退出
#设置普通用户启动,需要设置kafka目录所有者为myuser
su - myuser -c "/bin/sh /data/server/kafka/bin/kafka-server-start.sh -daemon /data/server/kafka/config/kraft/server.properties"
#默认/etc/rc.local没有执行权限,需要手动添加执行权限
chmod +x /etc/rc.d/rc.local
sh /data/server/kafka/bin/kafka-server-start.sh -daemon /data/server/kafka/config/kraft/server.properties #启动kafka
#关闭kafka
cd /data/server/kafka/bin
./kafka-server-stop.sh
ps -ef |grep kafka
#扩展阅读
kafka最新版本是kafka_2.13-4.0.0.tgz
https://dlcdn.apache.org/kafka/4.0.0/kafka_2.13-4.0.0.tgz
配置文件没有kraft目录了,默认就是config/server.propertie,默认支持KRaft模式
至此,Linux系统下kafka集群使用KRaft模式安装部署完成。



