技术交流QQ群:①185473046   ②190706903   ③203744115   网站地图
登录

下次自动登录
现在的位置: 首页kafka>正文
Linux系统下kafka集群使用KRaft模式安装部署
2025年05月26日 kafka 暂无评论 ⁄ 被围观 248次+

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。

操作系统:CentOS-7.x\8.x\‌AnolisOS‌\openEuler\Kylin-Server-V10等

kafka版本:kafka_2.12-3.9.1

三台服务器ip地址:

192.168.21.100,192.168.21.101,192.168.21.128

1、关闭selinux

sed -i 's/^SELINUX=.*/SELINUX=disabled/' /etc/selinux/config

setenforce 0

2、关闭防火墙

kafka默认使用tcp9092端口号,建议在内网环境下部署kafka集群,并且关闭防火墙。

CentOS-7.x默认使用的是firewall作为防火墙,关闭

systemctl stop firewalld.service #停止firewall

systemctl disable firewalld.service #禁止firewall开机启动

systemctl mask firewalld

systemctl stop firewalld

yum remove firewalld

3、安装jdk

kafka依赖Java,需要安装java环境,我们使用Java21版本

Linux系统下安装Java JDK:https://www.osyunwei.com/archives/12872.html

4、安装配置zookeeper

在 Kafka 2.8.0 及其以上版本,引入了 KRaft 模式(Kafka Raft Metadata mode),不再依赖 ZooKeeper,所有元数据由 Kafka 自己通过 Raft 协议管理。

使用 Kafka 内部的 Quorum 控制器来取代 ZooKeeper管理元数据,元数据保存在controller中,

这样我们无需维护zk集群,只要维护Kafka集群就可以了,节省运算资源。

使用KRaft 模式,无需再安装部署zookeeper集群

5、添加hosts解析

vi /etc/hosts #编辑配置文件

192.168.21.100 kafka01

192.168.21.101 kafka02

192.168.21.128 kafka03

:wq! #保存退出

6、安装kafka

6.1下载地址

https://kafka.apache.org/downloads

https://dlcdn.apache.org/kafka/3.9.1/kafka_2.12-3.9.1.tgz

上传到/usr/local/src目录

#创建kafka安装目录

mkdir -p /data/server/kafka

mkdir -p /data/server/kafka/kafka-logs

mkdir -p /data/server/kafka/meta

cd /usr/local/src

#解压到安装目录

tar zxvf kafka_2.12-3.9.1.tgz -C /data/server/kafka --strip-components 1

6.2配置环境变量

vi /etc/profile

#set kafka environment

export KAFKA_HOME=/data/server/kafka

export PATH=$PATH:$KAFKA_HOME/bin

:wq! #保存退出

source /etc/profile #使配置立即生效

6.3设置kafka的KRaft模式配置文件

cp /data/server/kafka/config/kraft/server.properties /data/server/kafka/config/kraft/server.properties.bak

vi /data/server/kafka/config/kraft/server.properties #kafka配置文件

process.roles=broker,controller #节点角色(controller + broker 表示既是控制器又是 Broker)

node.id=1 #KRaft模式需要添加node.id代替broker.id,取值范围1-1000的整数,每个Kafka节点必须唯一

#控制器 Quorum 投票者列表(所有 controller 节点)

#controller.quorum.voters 中的前面数字(如 1, 2, 3)必须与各个 Kafka 节点中配置的 node.id 严格一致、一一对应。

#9092是客户端连接端口,9093 是控制器间通信端口(Controller-to-Controller,用于 Raft 协议)

controller.quorum.voters=1@kafka01:9093,2@kafka02:9093,3@kafka03:9093

listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093

advertised.listeners=PLAINTEXT://kafka01:9092,CONTROLLER://kafka01:9093 #每个Kafka节点写自己的名称kafka02、kafka03

controller.listener.names=CONTROLLER #定义 CONTROLLER 协议使用的监听名称

num.network.threads=32 #处理网络请求的最大线程数

num.io.threads=32 #处理I/O请求的线程数

log.dirs=/data/server/kafka/kafka-logs

metadata.log.dir=/data/server/kafka/meta #将元数据日志和其他数据日志分开,便于管理与维护

num.partitions=3 #全局默认分区,集群环境下修改为节点数量3

offsets.topic.replication.factor=3 #默认主题__consumer_offsets的副本数量,默认值是1,集群环境下修改为节点数量3,否则挂掉一个节点整个集群就不可用了

default.replication.factor=3 #全局默认副本,集群环境下修改为节点数量3

delete.topic.enable=true #允许通过kafka命令行就可以直接删除topic

auto.create.topics.enable=false #关闭自动创建topic

log.retention.hours=168 #默认日志保留时间7天(168小时)

log.retention.bytes=1073741824 #限制每个分区的日志文件大小为1GB。当日志文件大小达到此限制时,Kafka将开始删除最旧的消息以释放空间

buffer.memory=68719476736

batch.size=2097152

max.request.size=4194304

message.max.bytes=6291456

fetch.max.bytes=7340032

linger.ms=10

compression.type=snappy

zookeeper.connection="" #禁止ZooKeeper模式

:wq! #保存退出

6.4初始化kafka集群

#在其中一台服务器上执行下面命令生成储目录唯一的uuid

KAFKA_CLUSTER_ID="$(/data/server/kafka/bin/kafka-storage.sh random-uuid)"

#查看uuid

echo "$KAFKA_CLUSTER_ID"

HPLwMwurQgG_qIjTlSpUWg

#用该uuid格式化kafka存储目录,三台服务器都要执行以下命令

/data/server/kafka/bin/kafka-storage.sh format -t HPLwMwurQgG_qIjTlSpUWg -c /data/server/kafka/config/kraft/server.properties

6.5启动kafka

#启动参数设置,默认都是1GB,可以根据服务器配置时代做调整

vi /data/server/kafka/bin/kafka-server-start.sh

export KAFKA_HEAP_OPTS="-Xmx32G -Xms32G -Xmn32G -XX:MetaspaceSize=256M -XX:+UseG1GC -XX:MaxGCPauseMillis=50 -XX:G1HeapRegionSize=16M"

:wq! #保存退出

#三台机器分别启动kafka

cd /data/server/kafka/bin

./kafka-server-start.sh -daemon ../config/kraft/server.properties #以后台方式启动

验证 ,有Kafka了证明启动没有问题

jps

jps | grep Kafka

ps -ef |grep kafka

#查看控制器 Quorum 状态

/data/server/kafka/bin/kafka-metadata-quorum.sh \

--bootstrap-controller kafka01:9093 \

describe \

--status

#从控制器日志中查找元数据版本信息

grep -i "MetadataVersion" /data/server/kafka/logs/server.log

6.6创建topics

只在1台主节点操作

cd /data/server/kafka/bin

./kafka-topics.sh --create --topic testtopics --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3

6.7查看主题

cd /data/server/kafka/bin

./kafka-topics.sh --list --bootstrap-server localhost:9092

6.8生产测试

cd /data/server/kafka/bin

./kafka-console-producer.sh --broker-list localhost:9092 --topic testtopics

>>>

>test

>kafka-tets

6.9消费测试

cd /data/server/kafka/bin

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testtopics --from-beginning

test

kafka-tets

#查看所有消费者组

cd /data/server/kafka/bin

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

6.10查看topics信息

./kafka-topics.sh --bootstrap-server kafka01:9092,kafka02:9092,kafka:9092 --topic testtopics --describe

7、设置kafka开机启动

vi /etc/rc.d/rc.local

/bin/sh /data/server/kafka/bin/kafka-server-start.sh -daemon /data/server/kafka/config/kraft/server.properties

:wq! #保存退出

#设置普通用户启动,需要设置kafka目录所有者为myuser

su - myuser -c "/bin/sh /data/server/kafka/bin/kafka-server-start.sh -daemon /data/server/kafka/config/kraft/server.properties"

#默认/etc/rc.local没有执行权限,需要手动添加执行权限

chmod +x /etc/rc.d/rc.local

sh /data/server/kafka/bin/kafka-server-start.sh -daemon /data/server/kafka/config/kraft/server.properties #启动kafka

#关闭kafka

cd /data/server/kafka/bin

./kafka-server-stop.sh

ps -ef |grep kafka

#扩展阅读

kafka最新版本是kafka_2.13-4.0.0.tgz

https://dlcdn.apache.org/kafka/4.0.0/kafka_2.13-4.0.0.tgz

配置文件没有kraft目录了,默认就是config/server.propertie,默认支持KRaft模式

至此,Linux系统下kafka集群使用KRaft模式安装部署完成。

     

  系统运维技术交流QQ群:①185473046 系统运维技术交流□Ⅰ ②190706903 系统运维技术交流™Ⅱ ③203744115 系统运维技术交流™Ⅲ

给我留言

您必须 [ 登录 ] 才能发表留言!



Copyright© 2011-2025 系统运维 All rights reserved
版权声明:本站所有文章均为作者原创内容,如需转载,请注明出处及原文链接
陕ICP备11001040号-3