步骤
- 安装配置JDK
- 安装配置zookeeper
- 安装配置kafka
- kafka消息队列测试
安装配置JDK
详见文章:https://jz.wohunlfry.top/2020/11/11/CentOS%E4%BD%BF%E7%94%A8yum%E5%AE%89%E8%A3%85jdk/
安装配置zookeeper
下载安装包
地址:https://archive.apache.org/dist/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz
安装配置
新建zookeeper安装目录
cd /opt
mkdir zookeeper
cd zookeeper
mkdir zkdata //存放快照日志
mkdir zkdatalog //存放事务日志
传送安装包到CentOS虚拟机
通过xftp传送文件到CentOS虚拟机/opt/zookeeper目录
解压安装
cd /opt/zookeeper
tar -zxvf apache-zookeeper-3.6.1-bin.tar.gz
修改配置文件
cd /opt/zookeeper/apache-zookeeper-3.6.1-bin/conf
cp zoo_sample.cfg zoo.cfg
vi zoo.cfg
修改dataDir和dataLogDir如下:
dataDir=/opt/zookeeper/zkdata
dataLogDir=/opt/zookeeper/zkdataLog
:wq保存退出
另外,在/opt/zookeeper/zkdata下新建myid文件
cd /opt/zookeeper/zkdata
vi myid
输入1,:wq保存退出
添加环境变量
vim /etc/profile
文件末尾新增
## zookeeper env
export ZOOKEEPER_HOME=/opt/zookeeper/apache-zookeeper-3.6.1-bin
export PATH=$ZOOKEEPER_HOME/bin:$PATH
:wq保存文件退出
source /etc/profile
开防火墙端口
1 | # firewall-cmd --permanent --add-port=2181/tcp |
启动测试
zkServer.sh start
jps //QuorumPeerMain是zookeeper进程,表示正常启动
zkServer.sh stop //停止
安装配置kafka(单Broker)
下载安装包
地址:https://apache.website-solution.net/kafka/2.5.0/kafka_2.12-2.5.0.tgz
安装配置
新建kafka安装目录
cd /opt
mkdir kafka
cd kafka
mkdir kafkalogs //存放事务日志
传送安装包到CentOS虚拟机
通过xftp传送文件到CentOS虚拟机/opt/kafka目录
解压安装
cd /opt/kafka
tar -zxvf kafka_2.12-2.5.0.tgz
修改配置文件
cd /opt/kafka/kafka_2.12-2.5.0/config
vi server.properties
修改log.dirs如下:
log.dirs=/opt/kafka/kafkalogs
:wq保存退出
添加环境变量
vim /etc/profile
文件末尾新增
## kafka env
export $KAFKA_HOME=/opt/kafka/kafka_2.12-2.5.0
export PATH=$KAFKA_HOME/bin:$PATH
:wq保存文件退出
source /etc/profile
开防火墙端口
1 | # firewall-cmd --permanent --add-port=9092/tcp |
启动测试
kafka-server-start.sh $KAFKA_HOME/config/server.properties(前台启动)
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties(后台启动)
jps //出现kafka,表示正常启动
kafka-server-stop.sh $KAFKA_HOME/config/server.properties //停止
kafka消息队列测试
开启zookeeper
开启kafka
开启至少两个终端窗口,一个用于生产者环境,一个用于消费者环境。生产者终端输入数据,消费者终端打印生产者输入的数据。
开启多个CentOS终端的方式:CentOS虚拟机本地终端,xshell,cmd之telnet
新建topic
1 | kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test |
查看所有topic信息
1 | kafka-topics.sh --list --zookeeper localhost:2181 |
启动生产者
1 | kafka-console-producer.sh --broker-list localhost:9092 --topic test |
启动消费者
1 | kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test |
删除topic
1 | kafka-topics.sh --delete --zookeeper localhost:2181 --topic test |
配置kafka(多Broker)
配置
拷贝server.properties三份
cd /opt/kafka/kafka_2.12-2.5.0/config
cp server.properties server-1.properties
cp server.properties server-2.properties
cp server.properties server-3.properties
修改server-1.properties,server-2.properties,server-3.properties
1 | # broker的全局唯一编号,不能重复 |
1 | # broker的全局唯一编号,不能重复 |
1 | # broker的全局唯一编号,不能重复 |
启动测试
1 | kafka-server-start.sh $KAFKA_HOME/config/server-1.properties(前台启动) |
kafka多Broker消息队列测试
开启zookeeper
开启kafka
新建topic
1 | kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic |
启动生产者
1 | kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic my-replicated-topic |
启动消费者
1 | kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic my-replicated-topic |
查看Broker角色
1 | kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic |
多Broker容错性测试
jps -m (查看Broker对应的进程号)
kill -9 进程号 (无论kill掉Broker的leader还是follower,生产者消费者的信息测试仍然正确)
总结:不管当前状态的borker是leader还是follower,当我们kill掉后,只要有一个borker能够正常使用,则消息仍然能够正常的生产和发送。