技术交流QQ群:①185473046   ②190706903   ③203744115   网站地图
登录

下次自动登录
现在的位置: 首页kafka>正文
Linux系统下kafka集群安装部署
2023年05月01日 kafka 暂无评论 ⁄ 被围观 2,908次+

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。

操作系统:CentOS-7.x

kafka版本:kafka_2.12-3.4.0

三台服务器ip地址:

192.168.21.100,192.168.21.101,192.168.21.128

1、关闭selinux

sed -i 's/^SELINUX=.*/SELINUX=disabled/' /etc/selinux/config

setenforce 0

2、关闭防火墙

kafka默认使用tcp9092端口号,建议在内网环境下部署kafka集群,关闭服务器防火墙。

CentOS-7.x默认使用的是firewall作为防火墙,关闭

systemctl stop firewalld.service #停止firewall

systemctl disable firewalld.service #禁止firewall开机启动

systemctl mask firewalld

systemctl stop firewalld

yum remove firewalld

3、安装jdk

kafka依赖Java,需要安装java环境,我们使用Java 1.8版本

Linux系统下安装Java JDK:https://www.osyunwei.com/archives/12872.html

4、安装配置zookeeper

kafka依赖zookeeper,需要先安装部署,并且优先于kafka启动

也可以使用kafka自带的zookeeper,这里我们使用自定义安装的zookeeper

Linux系统下部署ZooKeeper集群:https://www.osyunwei.com/archives/13465.html

5、添加hosts解析

vi /etc/hosts #编辑配置文件

192.168.21.100 kafka01

192.168.21.101 kafka02

192.168.21.128 kafka03

:wq! #保存退出

6、安装kafka

6.1、下载地址

https://kafka.apache.org/downloads

https://downloads.apache.org/kafka/3.4.0/kafka_2.12-3.4.0.tgz

上传到/usr/local/src目录

6.1创建kafka安装目录

mkdir -p /data/server/kafka

mkdir -p /data/server/kafka/kafka-logs

cd /usr/local/src

#解压到安装目录

tar zxvf kafka_2.12-3.4.0.tgz -C /data/server/kafka --strip-components 1

6.2配置环境变量

vi /etc/profile

#set kafka environment

export KAFKA_HOME=/data/server/kafka

export PATH=$PATH:$KAFKA_HOME/bin

:wq! #保存退出

source /etc/profile #使配置立即生效

6.3配置kafka

cp /data/server/kafka/config/server.properties /data/server/kafka/config/server.properties.bak

vi /data/server/kafka/config/server.properties #kafka配置文件

broker.id=1 # 唯一标识在集群中的ID,每个节点的broker.id值唯一,1,2,3(192.168.21.100是1,192.168.21.101是2,192.168.21.128是3),最小是0,最大1000

listeners=PLAINTEXT://0.0.0.0:6667 #broker 服务器要监听的地址及端口,默认监听端口9092,这里改为6667,是kafka真正bind的地址

num.network.threads=32 #处理网络请求的最大线程数

num.io.threads=32 #处理I/O请求的线程数

advertised.listeners=PLAINTEXT://192.168.21.100:6667 #各自填写每个节点ip地址,暴露对外访问的地址,这个是对外提供的地址 , 当client请求到kafka时, 会分发这个地址,如果没有设置,会用listeners地址

log.dirs=/data/server/kafka/kafka-logs

zookeeper.connect=192.168.21.100:2181,192.168.21.100:2181,192.168.21.128:2181

offsets.topic.replication.factor=3 #默认主题__consumer_offsets的副本数量,默认值是1,集群环境下修改为节点数量3,否则挂掉一个节点整个集群就不可用了

num.partitions=3 #全局默认分区,集群环境下修改为节点数量3

default.replication.factor=3 #全局默认副本,集群环境下修改为节点数量3

delete.topic.enable=true #允许通过kafka命令行就可以直接删除topic

auto.create.topics.enable=false #关闭自动创建topic

log.retention.hours=7 #默认日志保留时间7天(168小时)

buffer.memory=2684354560

batch.size=524288

max.request.size=5242880

linger.ms=50

:wq! #保存退出

6.4启动kafka

zookeeper集群一定要在Kafka启动之前启动,因为kafka的节点需要向zookeeper注册,三台机器分别启动kafka

cd /data/server/kafka/bin

./kafka-server-start.sh -daemon ../config/server.properties #以后台方式启动

验证 ,有Kafka了证明启动没有问题

jps

jps | grep Kafka

ps -ef |grep kafka

6.5创建topics

只在1台主节点操作

./kafka-topics.sh --create --topic hostScreen --bootstrap-server localhost:6667 --replication-factor 3 --partitions 3

./kafka-topics.sh --create --topic inspection --bootstrap-server localhost:6667 --replication-factor 3 --partitions 3

./kafka-topics.sh --create --topic inspection.hardware.host --bootstrap-server localhost:6667 --replication-factor 3 --partitions 3

./kafka-topics.sh --create --topic inspection.hardware.log --bootstrap-server localhost:6667 --replication-factor 3 --partitions 3

./kafka-topics.sh --create --topic network --bootstrap-server localhost:6667 --replication-factor 3 --partitions 3

6.6查看主题

cd /data/server/kafka/bin

./kafka-topics.sh --list --bootstrap-server localhost:6667

6.7生产测试

cd /data/server/kafka/bin

./kafka-console-producer.sh --broker-list localhost:6667 --topic hostScreen

>>>

>test

>kafka-tets

6.8消费测试

cd /data/server/kafka/bin

./kafka-console-consumer.sh --bootstrap-server localhost:6667 --topic hostScreen --from-beginning

test

kafka-tets

#查看所有消费者组

cd /data/server/kafka/bin

./kafka-consumer-groups.sh --bootstrap-server localhost:6667 --list

6.9查看topics信息

./kafka-topics.sh --bootstrap-server kafka01:6667,kafka02:6667,kafka:6667 --topic inspection --describe

./kafka-topics.sh --bootstrap-server kafka01:6667,kafka02:6667,kafka:6667 --topic inspection.hardware.log --describe

7、设置kafka开机启动

vi /etc/rc.d/rc.local

/bin/sh /data/server/kafka/bin/kafka-server-start.sh -daemon /data/server/kafka/config/server.properties

:wq! #保存退出

#设置普通用户启动,需要设置kafka目录所有者为myuser

su - myuser -c "/bin/sh /data/server/kafka/bin/kafka-server-start.sh -daemon /data/server/kafka/config/server.properties"

#默认/etc/rc.local没有执行权限,需要手动添加执行权限

chmod +x /etc/rc.d/rc.local

sh /data/server/kafka/bin/kafka-server-start.sh -daemon /data/server/kafka/config/server.properties #启动kafka

#关闭kafka

cd /data/server/kafka/bin

./kafka-server-stop.sh

ps -ef |grep kafka

#扩容分区

cd /data/server/kafka/bin

1、topic分区扩容,分区数只能增加不能减少

#扩容inspection.hardware.host这个topic

./kafka-topics.sh --bootstrap-server kafka01:6667kafka02:6667,kafka03:6667 --topic inspection.hardware.host -alter --partitions 3

2、根据topic的分区情况自行修改 partitions-topic.json 文件配置

#partitions-topic.json文件要放在执行副本搬迁命令的目录下

vi partitions-topic.json

{

"partitions":

[

{

"topic": "inspection.hardware.host",

"partition": 0,

"replicas": [2,3,1]

},

{

"topic": "inspection.hardware.host",

"partition": 1,

"replicas": [3,2,1]

},

{

"topic": "inspection.hardware.host",

"partition": 2,

"replicas": [1,2,3]

}

],

"version":1

}

:wq! #保存退出

3、执行副本搬迁

#partitions-topic.json文件要放在执行副本搬迁命令的目录下

./kafka-reassign-partitions.sh --bootstrap-server kafka01:6667,kafka02:6667,kafka03:6667 --reassignment-json-file partitions-topic.json --execute

4、查看迁移情况

../bin/kafka-reassign-partitions.sh --zookeeper 127.0.0.1:2181 --reassignment-json-file partitions-topic.json --verify

说明:

kafka-reassign-partitions.sh工具来重新分布分区。该工具有三种使用模式:

generate模式,给定需要重新分配的Topic,自动生成reassign plan(并不执行)

execute模式,根据指定的reassign plan重新分配Partition

verify模式,验证重新分配Partition是否成功

#相关概念

Broker:kafka节点,多个broker组成kafka集群。

Topic:即主题,kafka通过Topic对消息进行分类,发布到kafka的消息都需要指定Topic。

Producer:即消息生产者,向Broker发送消息的客户端。

Consumer:即消息消费者,从Broker消费(接收)消息的客户端。

ConsumerGroup:即消费者组,消费者隶属于消费者组,同一个分区的消息可以被多个消费者消费,但是同一个消费者组中只能有一个消费者可以消费。

Partition:即分区,每个Topic下都至少有一个分区,分区内部的消息是有序的。

#结束进程

ps -aux | grep -E 'kafka' | grep -v grep |awk '{print $2}' |sudo xargs kill -s 9

#Kafka架构

#kafka配置参数

至此,Linux系统下kafka集群安装部署完成。

     

  系统运维技术交流QQ群:①185473046 系统运维技术交流□Ⅰ ②190706903 系统运维技术交流™Ⅱ ③203744115 系统运维技术交流™Ⅲ

给我留言

您必须 [ 登录 ] 才能发表留言!



Copyright© 2011-2024 系统运维 All rights reserved
版权声明:本站所有文章均为作者原创内容,如需转载,请注明出处及原文链接
陕ICP备11001040号-3