抱歉,您的浏览器无法访问本站

本页面需要浏览器支持(启用)JavaScript


了解详情 >

Binary方式安装

# 下载安装包
wget http://mirror.bit.edu.cn/apache/rocketmq/4.6.1/rocketmq-all-4.6.1-bin-release.zip
# 解压进入目录
cd /rent
unzip rocketmq-all-4.6.1-bin-release.zip
cd rocketmq-all-4.6.1-bin-release
# 启动nameserver
bin/mqnamesrv
# The Name Server boot success. serializeType=JSON 看到这个表示已经提供成功
# 启动broker
bin/mqbroker -n 192.168.123.121:9876 #-n 指定nameserver地址和端口

Java HotSpot(TM) 64-Bit Server VM warning: INFO:
os::commit_memory(0x00000005c0000000, 8589934592, 0) failed; error='Cannot allocate memory' (errno=12)
# 启动失败, 因为内存不够, RocketMQ的配置默认是生产环境的配置, 设置的jvm的内存大小值比较大, 对于学习而言没有必要设置这么大, 测试环境的内存往往都不是很大, 所以需要调整默认值.
# 调整默认的内存大小参数
cd bin/
vim runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"
cd bin/
vim runbroker.sh 
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m"

# 重新启动
bin/mqbroker -n 192.168.123.121:9876
The broker[localhost.localdomain, 172.17.0.1:10911] boot success. serializeType=JSON and name server is 192.168.123.121:9876

# 测试发送消息
cd /bin
export NAMESRV_ADDR=192.168.123.121:9876
sh tools.sh org.apache.rocketmq.example.quickstart.Producer
# 测试结果
06:37:54.213 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
SendResult [sendStatus=SEND_OK, msgId=AC11000100002B193F2D77DE90B80000, offsetMsgId=AC11000100002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=AC11000100002B193F2D77DE90E80001, offsetMsgId=AC11000100002A9F00000000000000C9, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=2], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=AC11000100002B193F2D77DE90F50002, offsetMsgId=AC11000100002A9F0000000000000192, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=3], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=AC11000100002B193F2D77DE91140003, offsetMsgId=AC11000100002A9F000000000000025B, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=0], queueOffset=0]
# 可正常发送消息

# 测试接收消息
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer
# 测试结果
06:39:58.700 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
Consumer Started.
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=0, storeSize=201, queueOffset=0, sysFlag=0, bornTimestamp=1582544274836, bornHost=/192.168.123.121:36652, storeTimestamp=1582544274838, storeHost=/172.17.0.1:10911, msgId=AC11000100002A9F000000000000025B, commitLogOffset=603, bodyCRC=1032136437, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1582544399271, UNIQ_KEY=AC11000100002B193F2D77DE91140003, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 51], transactionId='null'}]]
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=1, storeSize=201, queueOffset=0, sysFlag=0, bornTimestamp=1582544274745, bornHost=/192.168.123.121:36652, storeTimestamp=1582544274774, storeHost=/172.17.0.1:10911, msgId=AC11000100002A9F0000000000000000, commitLogOffset=0, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1582544399273, UNIQ_KEY=AC11000100002B193F2D77DE90B80000, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]]
# 可正常接收消息

Docker方式安装

# 查找镜像
docker search rocketmq
# 拉取镜像
docker pull rocketmqinc/rocketmq
# 创建nameserver容器
docker create --name rmqserver -p 9876:9876 -e "JAVA_OPTS=-Duser.home/opt" -e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" -v /data/namesrv/logs:/opt/logs -v /data/namesrv/store:/opt/store rocketmqinc/rocketmq sh mqnamesrv
# 创建broker容器 注意需要2个端口
docker create --name rmqbroker -p 10911:10911 -p 10909:10909 -e "JAVA_OPTS=-Duser.home=/opt" -e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" -v /home/lgq/rmq/rmqbroker/conf/broker.conf:/etc/rocketmq/broker.conf -v /data/rmq/rmqbroker/logs:/opt/logs -v /data/rmq/rmqbroker/store:/opt/store rocketmqinc/rocketmq sh mqbroker -c /etc/rocketmq/broker.conf
# 启动容器
docker start rmqserver rmqbroker
# 停止删除容器
docker stop rmqserver rmqbroker
docker rm rmqserver rmqbroker
# 查看启动日志
docker logs rmqserver

安装RocketMQ管理工具

# 使用Docker安装
# 拉取镜像
docker pull styletang/rocketmq-console-ng
#创建并启动容器
docker run -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.123.121:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8082:8080 -t styletang/rocketmq-console-ng
# 然后通过浏览器访问 http://192.168.123.123:8082

添加对自定义属性的支持

# 加入到broker的配置文件中
enablePropertyFilter=true

RocketMQ Consumer

在RocketMQ中, 消费者有两种模式, 一种是push模式, 另一种是pull模式.

push模式:客户端与服务端建立连接后, 当服务端有消息时, 将消息推送到客户端.

pull模式:客户端不断的轮询请求服务端, 来获取新的消息.

但在具体实现时, Push和Pull模式都是采用消费端主动拉取的方式, 即consumer轮询从broker拉取消息.

  • Push方式里, consumer把轮询过程封装了, 并注册MessageListener监听器, 取到消息后, 唤醒MessageListener的consumeMessage()来消费, 对用户而言, 感觉消息是被推送过来的.
  • Pull方式里, 取消息的过程需要用户自己写, 首先通过打算消费的Topic拿到MessageQueue的集合, 遍历MessageQueue集合, 然后针对每个MessageQueue批量取消息, 一次取完后, 记录该队列下一次要取的开始offset, 直到取完了, 再换另一个MessageQueue.

搭建2m2s集群

创建2个master nameserver

# 创建2个master nameserver
# nameserver1
docker create -p 9876:9876 --name rmqserver01 \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-e "JAVA_OPTS=-Duser.home=/opt" \
-v /data/rmq/rmqserver01/logs:/root/logs \
-v /data/rmq/rmqserver01/store:/root/store \
rocketmqinc/rocketmq sh mqnamesrv
# nameserver2
docker create -p 9877:9876 --name rmqserver02 \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-e "JAVA_OPTS=-Duser.home=/opt" \
-v /data/rmq/rmqserver02/logs:/root/logs \
-v /data/rmq/rmqserver02/store:/root/store \
rocketmqinc/rocketmq sh mqnamesrv

创建第1个master broker

# 创建第1个master broker
# broker需要暴露3个端口
# - 与客户端交互的端口
# - 本集群中与其他节点交互的端口
# - MS交互的端口
docker create --net host --name rmqbroker01 \
-e "JAVA_OPTS=-Duser.home=/opt" \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-v /data/rmq/rmqbroker01/conf/broker.conf:/etc/rocketmq/broker.conf \
-v /data/rmq/rmqbroker01/logs:/root/logs \
-v /data/rmq/rmqbroker01/store:/root/store \
rocketmqinc/rocketmq sh mqbroker -c /etc/rocketmq/broker.conf
# 配置
brokerIP1=192.168.123.121
brokerIP2=192.168.123.121
namesrvAddr=192.168.123.121:9876;192.168.123.121:9877
brokerClusterName=RentCluster
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerName=broker01
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10911
enablePropertyFilter=true

创建第2个master broker

# 创建第2个master broker
docker create --net host --name rmqbroker02 \
-e "JAVA_OPTS=-Duser.home=/opt" \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-v /data/rmq/rmqbroker02/conf/broker.conf:/etc/rocketmq/broker.conf \
-v /data/rmq/rmqbroker02/logs:/root/logs \
-v /data/rmq/rmqbroker02/store:/root/store \
rocketmqinc/rocketmq sh mqbroker -c /etc/rocketmq/broker.conf
# 配置
brokerIP1=192.168.123.121
brokerIP2=192.168.123.121
namesrvAddr=192.168.123.121:9876;192.168.123.121:9877
brokerClusterName=RentCluster
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerName=broker02
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10811
enablePropertyFilter=true

创建第1个slave broker

# 创建第1个slave broker
docker create --net host --name rmqbroker03 \
-e "JAVA_OPTS=-Duser.home=/opt" \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-v /data/rmq/rmqbroker03/conf/broker.conf:/etc/rocketmq/broker.conf \
-v /data/rmq/rmqbroker03/logs:/root/logs \
-v /data/rmq/rmqbroker03/store:/root/store \
rocketmqinc/rocketmq sh mqbroker -c /etc/rocketmq/broker.conf
# 配置
brokerIP1=192.168.123.121
brokerIP2=192.168.123.121
namesrvAddr=192.168.123.121:9876;192.168.123.121:9877
brokerClusterName=RentCluster
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerName=broker01
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
listenPort=10711
enablePropertyFilter=true

创建第2个slave broker

# 创建第2个slave broker
docker create --net host --name rmqbroker04 \
-e "JAVA_OPTS=-Duser.home=/opt" \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-v /data/rmq/rmqbroker04/conf/broker.conf:/etc/rocketmq/broker.conf \
-v /data/rmq/rmqbroker04/logs:/root/logs \
-v /data/rmq/rmqbroker04/store:/root/store \
rocketmqinc/rocketmq sh mqbroker -c /etc/rocketmq/broker.conf
# 配置
brokerIP1=192.168.123.121
brokerIP2=192.168.123.121
namesrvAddr=192.168.123.121:9876;192.168.123.121:9877
brokerClusterName=RentCluster
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerName=broker02
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
listenPort=10611
enablePropertyFilter=true

其他配置

# 所属集群名字
brokerClusterName=DefaultCluster
# broker 名字,注意此处不同的配置文件填写的不一样,如果在 broker-a.properties 使用: broker-a,
# 在 broker-b.properties 使用: broker-b
brokerName=broker-a
# 0 表示 Master,> 0 表示 Slave
brokerId=0
# nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
# 解决方式1 加上一句 producer.setVipChannelEnabled(false);,解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP
brokerIP1=192.168.0.253
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 !!!这里仔细看是 false,false,false
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker 对外服务的监听端口
listenPort=10911
# 删除文件时间点,默认凌晨4点
deleteWhen=04
# 文件保留时间,默认48小时
fileReservedTime=120
# commitLog 每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
destroyMapedFileIntervalForcibly=120000
redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=/home/lgq/rocketmqg/store
# commitLog 存储路径
storePathCommitLog=/home/lgq/rocketmq/store/commitlog
# 消费队列存储
storePathConsumeQueue=/lgq/ztztdata/rocketmq/store/consumequeue
# 消息索引存储路径
storePathIndex=/home/lgq/rocketmq/store/index
# checkpoint 文件存储路径
storeCheckpoint=/home/lgq/rocketmq/store/checkpoint
# abort 文件存储路径
abortFile=/home/lgq/rocketmq/store/abort
# 限制的消息大小
maxMessageSize=65536
flushCommitLogLeastPages=4
flushConsumeQueueLeastPages=2
flushCommitLogThoroughInterval=10000
flushConsumeQueueThoroughInterval=60000
# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=ASYNC_MASTER
# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
# 发消息线程池数量
sendMessageThreadPoolNums=128
# 拉消息线程池数量
pullMessageThreadPoolNums=128

启动测试

docker start rmqserver01 rmqserver02
docker start rmqbroker01 rmqbroker02 rmqbroker03 rmqbroker04

评论