RocketMQ5.x教程-RocketMQ集群架构

发布时间:2024-01-17 11:49:07
 

RocketMQ集群架构

刚才的演示中,我们已经体验到了RocketMQ是如何工作的。我们回头看RocketMQ的集群架构,就能够有更全面的理解了。

4.1.RocketMQ集群架构解析

一个完整的RocketMQ集群中,有如下几种角色 :

  • Producer:消息的发送者;举例:发信者
  • Consumer:消息接收者;举例:收信者
  • Broker:暂存和传输消息;举例:邮局
  • NameServer:管理Broker;举例:各个邮局的管理机构
  • Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息 。
  • Message Queue:相当于是Topic的分区;用于并行发送和接收消息。

4.2.RocketMQ集群搭建

准备三台虚机,并配置机器名。可以利用安装好的虚机通过克隆出另外两个机器。

4.2.1.系统配置

1.使用vi /etc/hosts命令,配置机器名,在文件末尾加上以下配置:
[root@localhost ~]# vi /etc/hosts
192.168.43.134 worker1
192.168.43.135 worker2
192.168.43.136 worker3

2.服务之间设置免密登陆,三个机器都使用ssh-keygen生成秘钥。提示录入直接回车即可
[root@localhost ~]# ssh-keygen

3.三个机器都使用以下命令分发给其他机器,输入yes,然后输入密码;这样可以直接某个机器使用ssh或者scp到另外的机器。
[root@localhost ~]# ssh-copy-id worker1
[root@localhost ~]# ssh-copy-id worker2
[root@localhost ~]# ssh-copy-id worker3

4.停止并禁用防火墙或者删除防火墙,我这边使用的是删除防火墙。
#检查防火墙状态
[root@localhost ~]# firewall-cmd --state
#停止并禁用防火墙
[root@localhost ~]# systemctl stop firewalld
[root@localhost ~]# systemctl disable firewalld
#删除防火墙
[root@localhost ~]# yum remove firewalld

4.2.2.配置RocketMQ主从集群

使用conf/2m-2s-async下的配置文件搭建一个2主2从异步刷盘的集群。设计的集群情况如下:

机器名

nemaeServer节点部署

broker节点部署

worker1

nameserver

 

worker2

nameserver

broker-a,broker-b-s

worker3

nameserver

broker-b,broker-a-s

4.2.2.1.配置方式:conf目录下存在三种配置方式
  • 2m-2s-async:2主2从异步刷盘(吞吐量较大,但是消息可能丢失)
  • 2m-2s-sync:2主2从同步刷盘(吞吐量会下降,但是消息更安全)
  • 2m-noslave:2主无从(单点故障),然后还可以直接配置broker.conf,进行单点环境配置

而dleger就是用来实现主从切换的。集群中的节点会基于Raft协议随机选举出一个leader,其他的就都是follower。通常正式环境都会采用这种方式来搭建集群。

4.2.2.2.搭建2主2从模式,配置2m-2s-async目录Broker文件:
1.进入conf/2m-2s-async下:
[root@localhost /]# cd /app/rocketMQ/rocketmq-all-5.1.0-bin-release/conf/2m-2s-async/

2.配置worker2机器的主节点,将下方broker-a.properties内容配置到相应文件中,原有配置使用 #号屏蔽
[root@localhost 2m-2s-async]# vi broker-a.properties 

3.配置worker2机器的主节点,将下方broker-b-s.properties内容配置到相应文件中,原有配置使用 #号屏蔽
[root@localhost 2m-2s-async]# vi broker-b-s.properties 

4.配置worker3机器的主节点,将下方broker-b.properties内容配置到相应文件中,原有配置使用 #号屏蔽
[root@localhost 2m-2s-async]# vi broker-b.properties 

5.配置worker3机器的主节点,将下方broker-a-s.properties内容配置到相应文件中,原有配置使用 #号屏蔽
[root@localhost 2m-2s-async]# vi broker-a-s.properties 
#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=DefaultCluster
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-a
#brokerid,0就表示是Master,>0的都是表示Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=worker1:9876;worker2:9876;worker3:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueuEnums=4
#是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨4点
deleteWhen=04
#文件保留时间,默认48小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/app/rocketMQ/store
#commitLog存储路径
storePathCommitLog=/app/rocketMQ/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/app/rocketMQ/store/consumequeue
#消息索引存储路径
storePathIndex=/app/rocketMQ/store/index
#checkpoint文件存储路径
storeCheckpoint=/app/rocketMQ/store/checkpoint
#abort文件存储路径
abortFile=/app/rocketMQ/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker的角色
#-ASYNC_MASTER异步复制Master
#-SYNC_MASTER同步双写Master
#-SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#-ASYNC_FLUSH异步刷盘
#-SYNC_FLUSH同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
#开启Sql过滤
enablePropertyFilter=true
#重试支持过滤
filterSupportRetry=true
#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=DefaultCluster
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-b
#brokerid,0就表示是Master,>0的都是表示Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=worker1:9876;worker2:9876;worker3:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨4点
deleteWhen=04
#文件保留时间,默认48小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/app/rocketMQ/store
#commitLog存储路径
storePathCommitLog=/app/rocketMQ/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/app/rocketmQ/store/consumequeue
#消息索引存储路径
storePathIndex=/app/rocketMQ/store/index
#checkpoint文件存储路径
storeCheckpoint=/app/rocketMQ/store/checkpoint
#abort文件存储路径
abortFile=/app/rocketMQ/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker的角色
#-ASYNC_MASTER异步复制Master
#-SYNC_MASTER同步双写Master
#-SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#-ASYNC_FLUSH异步刷盘
#-SYNC_FLUSH同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
#开启Sql过滤
enablePropertyFilter=true
#重试支持过滤
filterSupportRetry=true
#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=DefaultCluster
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-a
#brokerid,0就表示是Master,>0的都是表示Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=worker1:9876;worker2:9876;worker3:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨4点
deleteWhen=04
#文件保留时间,默认48小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/app/rocketMQ/storeSlave
#commitLog存储路径
storePathCommitLog=/app/rocketMQ/storeSlave/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/app/rocketMQ/storeSlave/consumequeue
#消息索引存储路径
storePathIndex=/app/rocketMQ/storeSlave/index
#checkpoint文件存储路径
storeCheckpoint=/app/rocketMQ/storeSlave/checkpoint
#abort文件存储路径
abortFile=/app/rocketMQ/storeSlave/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
  #Broker的角色
#-ASYNC_MASTER异步复制Master
#-SYNC_MASTER同步双写Master
#-SLAVE
brokerRole=SLAVE
#刷盘方式
#-ASYNC_FLUSH异步刷盘
#-SYNC_FLUSH同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
#开启Sql过滤
enablePropertyFilter=true
#重试支持过滤
filterSupportRetry=true
#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=DefaultCluster
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-b
#brokerid,0就表示是Master,>0的都是表示Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=worker1:9876;worker2:9876;worker3:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨4点
deleteWhen=04
#文件保留时间,默认48小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/app/rocketMQ/storeSlave
#commitLog存储路径
storePathCommitLog=/app/rocketMQ/storeSlave/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/app/rocketMQ/storeSlave/consumequeue
#消息索引存储路径
storePathIndex=/app/rocketMQ/storeSlave/index
#checkpoint文件存储路径
storeCheckpoint=/app/rocketMQ/storeSlave/checkpoint
#abort文件存储路径
abortFile=/app/rocketMQ/storeSlave/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker的角色
#-ASYNC_MASTER异步复制Master
#-SYNC_MASTER同步双写Master
#-SLAVE
brokerRole=SLAVE
#刷盘方式
#-ASYNC_FLUSH异步刷盘
#-SYNC_FLUSH同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
#开启Sql过滤
enablePropertyFilter=true
#重试支持过滤
filterSupportRetry=true

这样2主2从的集群配置基本就完成了。搭建过程中需要注意的配置项:

      • 同一机器上两个实例的store目录不能相同,否则会报错 Lock failed,MQ already started
      • 同一机器上两个实例的listenPort也不能相同。否则会报端口占用的错
      • 如果是多网卡的机器,比如云服务器,那么需要在broker.conf中增加brokerIP1属性,指定所在机器的外网网卡地址。

4.2.3.启动集群

由于我们之前已经在worker1单机部署过,所以相关的启动JVM参数已经调整过,如果是新配置需要注意jvm参数根据实际的内存大小分配。其他两个机器是克隆过来的所以无需在进行调整,nameServer不需要进行配置,直接启动nameServer即可。这也看出nameserver是无状态的。

RocketMQ5.X版本兼容之前旧版本的启动方式,即如下部署方式:

4.2.3.1.启动worker1、worker2、worker3的nameServer,并观察启动日志
[root@localhost 2m-2s-async]# cd ../../bin/
#启动之前使用jps命令查看下环境是否正常,有时候会出现环境变量异常,需要重新使用source ~/.bash_profile命令刷新配置
[root@localhost bin]# nohup ./mqnamesrv &
#观察日志查看是否启动成功,同样出现The Name Server boot success. serializeType=JSON 即成功启动
[root@localhost bin]# tail -f nohup.out
#也可以使用tail -f ~/logs/rocketmqlogs/namesrv.log 观察日志

4.2.3.2.worker2上启动broker-a节点与broker-b-s节点
[root@localhost bin]# nohup ./mqbroker -c ../conf/2m-2s-async/broker-a.properties & 
#出现以下日志即启动成功,观察注册的nameServer服务
#The broker[broker-a, 192.168.43.135:10911] boot success. serializeType=JSON and name server is worker1:9876;worker2:9876;worker3:9876

[root@localhost bin]# nohup ./mqbroker -c ../conf/2m-2s-async/broker-b-s.properties & 
[root@localhost bin]# tail -f nohup.out 
#出现以下日志即启动成功,观察注册的nameServer服务
#The broker[broker-b, 192.168.43.135:11011] boot success. serializeType=JSON and name server is worker1:9876;worker2:9876;worker3:9876
#也可以使用tail -f ~/logs/rocketmqlogs/broker.log 观察日志

4.2.3.3.worker3上启动broker-b节点与broker-a-s节点
[root@localhost bin]# nohup ./mqbroker -c ../conf/2m-2s-async/broker-b.properties & 
#出现以下日志即启动成功,观察注册的nameServer服务
#The broker[broker-b, 192.168.43.136:10911] boot success. serializeType=JSON and name server is worker1:9876;worker2:9876;worker3:9876

[root@localhost bin]# nohup ./mqbroker -c ../conf/2m-2s-async/broker-a-s.properties &
#出现以下日志即启动成功,观察注册的nameServer服务
#The broker[broker-b, 192.168.43.136:10911] boot success. serializeType=JSON and name server is worker1:9876;worker2:9876;worker3:9876

4.2.3.4.使用测试工具测试消息收发
# worker2发送消息
[root@localhost bin]# ./tools.sh org.apache.rocketmq.example.quickstart.Producer

# worker3接受消息
[root@localhost bin]# ./tools.sh org.apache.rocketmq.example.quickstart.Consumer

RocketMQ5.X版本兼容之前旧版本部署完成。在部署新版之前先通过maven安装一个rocketmq-dashboard可视化界面查看我们的集群。

4.2.4.安装rocketmq-dashboard

4.2.4.1.在1号机通过maven安装dashboard,所以要先安装maven服务
1.回到app目录,创建一个maven目录,将maven压缩包上载到该目录。
#maven包可以自行在官网下载,也可以使用课件资料中的maven包,官网下载地址:http://maven.apache.org/download.cgi
[root@localhost bin]# cd /app/
[root@localhost app]# mkdir maven
[root@localhost app]# cd maven/
#上载完成后解压
[root@localhost maven]# tar zxvf apache-maven-3.9.1-bin.tar.gz
#创建jar包仓库目录,然后修改maven配置
[root@localhost maven]# mkdir repository

4.2.4.2.配置maven环境变量
#配置MAVEN_HOME,并在path目录最前加上$MAVEN_HOME/bin:
[root@localhost maven]# vi /etc/profile
export MAVEN_HOME=/app/maven/apache-maven-3.9.1
$MAVEN_HOME/bin:

4.2.4.3.更新环境配置,查看maven是否成功
[root@localhost maven]# source /etc/profile
[root@localhost maven]# mvn -v
#显示如下信息即安装成功

4.2.4.4.修改maven仓库配置
#进入maven配置目录
[root@localhost maven]# cd apache-maven-3.9.1/conf/
#修改配置文件
[root@localhost conf]# vi settings.xml
#找到<localRepository>节点,添加本地仓库目录,注意不要添加在注释的代码中,单独拷贝节点,然后修改路径。
<localRepository>/app/maven/repository</localRepository>

#找到<mirror>节点,将节点内容更换成以下配置:
<mirror>
  <id>alimaven</id>
  <name>aliyun maven</name>
  <url>https://maven.aliyun.com/repository/public/</url>
  <mirrorOf>*</mirrorOf>
</mirror>

4.2.4.5.安装dashboard
#返回app目录创建dashboard目录,将下好的压缩包上载到该目录,同样可以使用资料中的压缩包或者自行下载。
[root@localhost dashboard]# cd /app/
[root@localhost app]# mkdir dashboard
[root@localhost app]# cd dashboard/
#上载后解压
[root@localhost dashboard]# unzip rocketmq-dashboard-rocketmq-dashboard-1.0.0.zip

4.2.4.6.编译dashboard
#进入源码目录使用mvn打包,也可以使用资料包中已经编译好的jar包。
#因为rocketMQ5.1版本对应dashboard的一些类还没有升级,最后编译时会异常,所以还是使用4.9版本打包dashboard,使用中未发现明显bug。
[root@localhost rocketmq-dashboard-rocketmq-dashboard-1.0.0]# cd rocketmq-dashboard-rocketmq-dashboard-1.0.0/
[root@localhost rocketmq-dashboard-rocketmq-dashboard-1.0.0]# mvn clean package -Dmaven.test.skip=true

#打包失败可以使用以下命令,清除编译结果,重新打包。(编译成功无需使用)
[root@localhost rocketmq-dashboard-rocketmq-dashboard-1.0.0]# mvn clean install -U -Dmaven.test.skip=true

4.2.4.7.启动dashboard
#编译成功后,进入target目录,静默启动dashboard
[root@localhost rocketmq-dashboard-rocketmq-dashboard-1.0.0]# cd target/
[root@localhost target]# nohup java -jar rocketmq-dashboard-1.0.0.jar &
#查看日志
[root@worker1 target]# tail -f nohup.out

4.2.4.8.启动成功后访问:http://192.168.43.134:8080/#/ 操作dashboard界面:

4.2.5.部署5.x版本-Local模式

Apache RocketMQ 5.0 版本完成基本消息收发,包括 NameServer、Broker、Proxy 组件。 在 5.0 版本中 Proxy 和 Broker 根据实际诉求可以分为 Local 模式和 Cluster 模式,一般情况下如果没有特殊需求,或者遵循从早期版本平滑升级的思路,可以选用Local模式。

  • 在 Local 模式下,Broker 和 Proxy 是同进程部署,只是在原有 Broker 的配置基础上新增 Proxy 的简易配置就可以运行。
  • 在 Cluster 模式下,Broker 和 Proxy 分别部署,即在原有的集群基础上,额外再部署 Proxy 即可。
4.2.5.1.关闭worker2,worker3的broker服务
[root@localhost bin]# sh ./mqshutdown broker
4.2.5.2.使用Local方式部署,每个机器只能部署一个broker,否则会出现端口占用的异常,这里启用worker2的broker-a与worker3的broker-b节点。
# worker2机器
[root@localhost bin]# nohup ./mqbroker -c ../conf/2m-2s-async/broker-a.properties --enable-proxy &
# worker3机器
[root@localhost bin]# nohup ./mqbroker -c ../conf/2m-2s-async/broker-b.properties --enable-proxy &

#使用tail -f ~/logs/rocketmqlogs/proxy.log查看日志
[root@localhost bin]# tail -f ~/logs/rocketmqlogs/proxy.log
# 出现以下异常即成功启动
# 2023-04-23 15:09:33 INFO main - The broker[broker-a, 192.168.43.135:10911] boot success. serializeType=JSON and name server is worker1:9876;worker2:9876;worker3:9876
# 2023-04-23 15:09:34 INFO main - grpc server start successfully.

4.2.5.3.测试消息收发
[root@localhost bin]# ./tools.sh org.apache.rocketmq.example.quickstart.Producer
[root@localhost bin]# ./tools.sh org.apache.rocketmq.example.quickstart.Consumer

4.2.5.4.登录dashboard页面查看注册成功

4.2.5.5.其他部署模式

官网还提供了其他部署模式,有兴趣的小伙伴可以自行研究,官网部署方式:https://rocketmq.apache.org/zh/docs/deploymentOperations/01deploy。

集群部署搭建的过程我们到此结束,接下来我们使用官方提供的exmaple代码进行实战。


 
上一篇 RocketMQ5.x教程-安装RocketMQ
下一篇 RocketMQ5.x教程-官方API实战

文章素材均来源于网络,如有侵权,请联系管理员删除。

标签: Java教程Java基础Java编程技巧面试题Java面试题