Spring Boot集成RocketMQ详解
RocketMQ简介
官网:http://rocketmq.apache.org/docs/quick-start/
GitHub:https://github.com/apache/rocketmq/
RcoketMQ是一款低延迟、高可靠、可伸缩、易于使用的消息中间件。具有以下特性:
- 支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型
- 在一个队列中可靠的先进先出(FIFO)和严格的顺序传递
- 支持拉(pull)和推(push)两种消息模式
- 单一队列百万消息的堆积能力
- 支持多种消息协议,如 JMS、MQTT 等
- 分布式高可用的部署架构,满足至少一次消息传递语义
- 提供 docker 镜像用于隔离测试和云集群部署
- 提供配置、指标和监控等功能丰富的 Dashboard
专业术语
(1)Producer
消息生产者,生产者的作用就是将消息发送到 MQ,生产者本身既可以产生消息,如读取文本信息等。也可以对外提供接口,由外部应用来调用接口,再由生产者将收到的消息发送到 MQ。
(2)Producer Group
生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。
(3)Consumer
消息消费者,简单来说,消费 MQ 上的消息的应用程序就是消费者,至于消息是否进行逻辑处理,还是直接存储到数据库等取决于业务需要。
(4)Consumer Group
消费者组,和生产者类似,消费同一类消息的多个consumer实例组成一个消费者组。
(5)Topic
Topic 是一种消息的逻辑分类,比如有订单类的消息,也有库存类的消息,那么就需要进行分类,一个是订单Topic存放订单相关的消息,一个是库存Topic存储库存相关的消息。
(6)Message
Message是消息的载体。一个Message必须指定topic,相当于寄信的地址。Message还有一个可选的tag设置,以便消费端可以基于tag进行过滤消息。也可以添加额外的键值对,例如需要一个业务key来查找broker上的消息,方便在开发过程中诊断问题。
(7)Tag
标签可以被认为是对 Topic 进一步细化。一般在相同业务模块中通过引入标签来标记不同用途的消息。
(8)Broker
Broker 是 RocketMQ 系统的主要角色,其实就是前面一直说的 MQ。Broker 接收来自生产者的消息,储存以及为消费者拉取消息的请求做好准备。
(9)Name Server
Name Server 为 producer 和 consumer 提供路由信息。
RocketMQ架构
NameServer: 提供轻量级的服务发现和路由。每个NameServer记录完整的路由信息,提供等效的读写服务,并支持快速存储扩展
Broker: 通过提供轻量级的Topic和Queue机制来处理消息存储,同时支持推(push)和拉(pull)模式以及主从结构的容错机制
Producer:生产者,产生消息的实例,拥有相同Producer Group的Producer组成一个集群
Consumer:消费者,接收消息进行消费的实例,拥有相同Consumer Group的Consumer组成一个集群
从Broker开始,Broker Master和Broker Slave是主从结构,它们之间会进行数据同步,即Date Sync。同时每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer中。
Producer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Broker Master 建立长连接,且定时向 Broker 发送心跳。Producer 只能将消息发送到 Broker master,但是 Consumer 则不一样,它同时和提供 Topic 服务的 Master 和 Slave
建立长连接,既可以从 Broker Master 订阅消息,也可以从 Broker Slave 订阅消息。
RocketMQ集群部署模式
单master模式
也就是只有一个master节点,称不上是集群,一旦这个master节点宕机,那么整个服务就不可用,适合个人学习使用。
多master模式
多个master节点组成集群,单个 master 节点宕机或者重启对应用没有影响。
优点:所有模式中性能最高
缺点:单个master节点宕机期间,未被消费的消息在节点恢复之前不可用,消息的实时性就受到影响
注意:使用同步刷盘可以保证消息不丢失,同时Topic相对应的queue应该分布在集群中各个节点,而不是只在某各节点上,否则,该节点宕机会对订阅该 topic 的应用造成影响。
多master多slave异步复制模式
在多master模式的基础上,每个master节点都有至少一个对应的slave。master节点可读可写,但是slave只能读不能写,类似于mysql的主备模式。
优点:在 master 宕机时,消费者可以从 slave 读取消息,消息的实时性不会受影响,性能几乎和多master一样
缺点:使用异步复制的同步方式有可能会有消息丢失的问题
多master多slave同步双写模式
同多master多slave异步复制模式类似,区别在于master和slave之间的数据同步方式。
优点:同步双写的同步模式能保证数据不丢失
缺点:发送单个消息 RT 会略长,性能相比异步复制低10%左右
刷盘策略:同步刷盘和异步刷盘(指的是节点自身数据是同步还是异步存储)
同步方式:同步双写和异步复制(指的一组 master 和 slave 之间数据的同步)
注意:要保证数据可靠,需采用同步刷盘和同步双写的方式,但性能会较其他方式低
Spring Boot集成RocketMQ
Maven依赖
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
DefaultMQProducer
单从producer分类来看主要分成3种:
DefaultMQProducer
TransactionMQProducer
messagingAccessPoint.createProducer()
(1)application.yml配置文件
spring:
application:
name: appblog
rocketmq:
# 生产者配置
producer:
#该应用是否启用生产者
isOnOff: on
default: true
transaction: false
#发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
groupName: ${spring.application.name}
#mq的nameserver地址
namesrvAddr: 127.0.0.1:9876
#消息最大长度 默认1024*4(4M)
maxMessageSize: 4096
#发送消息超时时间,默认3000
sendMsgTimeout: 3000
#发送消息失败重试次数,默认2
retryTimesWhenSendFailed: 2
(2)yml文件配置读取类
@Getter
@Setter
@ConfigurationProperties(prefix = "rocketmq.producer")
@Configuration
@ToString
public class ProducerConfig {
private String namesrvAddr;
private String groupName;
}
Producer类的创建类,需要注意的是这个producer在一个程序里面只能出现一个,当重复创建时就会报错
@Log4j2
@Configuration
public class MyDefaultProducer {
@Autowired
private ProducerConfig producerConfig;
private DefaultMQProducer producer;
/**
* 创建普通消息发送者实例
*
* @return
* @throws MQClientException
*/
@Bean
@ConditionalOnProperty(prefix = "rocketmq.producer", value = "default", havingValue = "true")
public DefaultMQProducer defaultProducer() throws MQClientException {
log.info(producerConfig.toString());
log.info("DefaultMQProducer 正在创建...");
producer = new DefaultMQProducer(producerConfig.getGroupName());
//指定NameServer地址, 多个地址以 ; 隔开
producer.setNamesrvAddr(producerConfig.getNamesrvAddr());
producer.setVipChannelEnabled(false);
producer.setRetryTimesWhenSendAsyncFailed(10);
producer.start();
log.info("RocketMQ Producer Server 启动成功");
return producer;
}
public String send(String topic, String tags, String body) throws InterruptedException, RemotingException, MQClientException, MQBrokerException, UnsupportedEncodingException {
Message message = new Message(topic, tags, body.getBytes(RemotingHelper.DEFAULT_CHARSET));
StopWatch stop = new StopWatch();
stop.start();
SendResult result = producer.send(message);
System.out.println("发送响应 - MsgId:" + result.getMsgId() + ", 发送状态:" + result.getSendStatus());
stop.stop();
return "{\"MsgId\":\"" + result.getMsgId() + "\"}";
}
public String send(Message message) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
StopWatch stop = new StopWatch();
stop.start();
SendResult result = producer.send(message);
System.out.println("发送响应 - MsgId:" + result.getMsgId() + ", 发送状态:" + result.getSendStatus());
stop.stop();
return "{\"MsgId\":\"" + result.getMsgId() + "\"}";
}
public void send(Message message, SendCallback callback) throws InterruptedException, RemotingException, MQClientException {
StopWatch stop = new StopWatch();
stop.start();
producer.send(message, callback);
stop.stop();
}
}
当producer创建完毕之后就是consumer的公用设置
首先也是yml和配置类的定义
rocketmq:
# 消费者配置
consumer:
#该应用是否启用消费者
isOnOff: on
groupName: ${spring.application.name}
#mq的nameserver地址
namesrvAddr: 127.0.0.1:9876
#该消费者订阅的主题和tags("*"号表示订阅该主题下所有的tags),格式:topic~tag1||tag2||tag3;topic2~*;
topics: DemoTopic~*;
consumeThreadMin: 20
consumeThreadMax: 64
#设置一次消费消息的条数,默认为1条
rocketmq.consumer.consumeMessageBatchMaxSize: 1
@Getter
@Setter
@ConfigurationProperties(prefix = "rocketmq.consumer")
@Configuration
@ToString
public class ConsumerConfig {
private String groupName;
private String namesrvAddr;
}
Consumer类的创建类,对body的操作抽象出来,提供给实现类做处理,方便业务抽取
@Configuration
@Log4j2
public abstract class BaseDefaultConsumer {
@Autowired
private ConsumerConfig consumerConfig;
// 开启消费者监听服务
public void listener(String topic, String tag) throws MQClientException {
log.info("开启[" + topic + ":" + tag + "]消费者");
log.info(consumerConfig.toString());
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerConfig.getGroupName());
consumer.setNamesrvAddr(consumerConfig.getNamesrvAddr());
// 程序第一次启动从消息队列头获取数据
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 可以修改每次消费消息的数量,默认设置是每次消费一条
consumer.setConsumeMessageBatchMaxSize(1);
// 设置Consumer的消费策略
// CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息
// CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
// CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 设置consumer所订阅的Topic和Tag,*代表全部的Tag
// consumer.subscribe("TopicTest", "*");
// 订阅topic下TAG为tag的消息
consumer.subscribe(topic, tag);
// 开启内部类实现监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
return BaseDefaultConsumer.this.dealBody(msgList);
}
});
// 调用start()方法启动Consumer
consumer.start();
log.info("RocketMQ Consumer Server 启动成功");
}
// 处理body的业务
public abstract ConsumeConcurrentlyStatus dealBody(List<MessageExt> msgList);
}
值得注意的是,这里DefaultConsumerConfigure没有定义在什么时候运行,这里以实现ApplicationListener的onApplicationEvent方法开启消费者监听服务
@Log4j2
@Configuration
public class MyConsumer extends BaseDefaultConsumer implements ApplicationListener<ContextRefreshedEvent> {
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
try {
super.listener("my_topic", "my_tag");
} catch (MQClientException e) {
log.error("消费者监听器启动失败", e);
}
}
@Override
public ConsumeConcurrentlyStatus dealBody(List<MessageExt> msgList) {
int num = 1;
for(MessageExt msg : msgList) {
log.info("第" + (num++) + "次消息");
try {
String msgStr = new String(msg.getBody(), "utf-8");
log.info(msgStr);
} catch (UnsupportedEncodingException e) {
log.error("Body转字符串解析失败");
}
}
// 返回消费状态
// CONSUME_SUCCESS 消费成功
// RECONSUME_LATER 消费失败,需要稍后重新消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
Consumer构建类实现
ApplicationListener
,在Application启动时就开始执行注册Consumer。相信有些同学会喜欢用@PostConstruct
,但是不要这么做,因为它会在init之前执行,那么有些类会加载不完全,会导致无法启动
单元测试及运行结果
@Log4j2
@RunWith(SpringRunner.class)
@SpringBootTest
public class RocketTest {
@Autowired
private MyDefaultProducer producer;
@Test
public void testDefaultRocketMQ() throws Exception {
Message message = new Message("my_topic", "my_tag", "123456", "RocketMQ测试成功".getBytes());
// 这里用到了这个mq的异步处理,类似ajax,可以得到发送到mq的情况,并做相应的处理
// 不过要注意的是这个是异步的
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("传输成功");
log.info(JSON.toJSONString(sendResult));
}
@Override
public void onException(Throwable e) {
log.error("传输失败", e);
}
});
}
}
ProducerConfig(namesrvAddr=127.0.0.1:9876, groupName=appblog)
DefaultMQProducer 正在创建...
RocketMQ Producer Server 启动成功
开启[my_topic:my_tag]消费者
ConsumerConfig(groupName=appblog, namesrvAddr=127.0.0.1:9876)
RocketMQ Consumer Server 启动成功
Started RocketTest in 23.857 seconds (JVM running for 33.942)
第1次消息
RocketMQ测试成功
传输成功
{"messageQueue":{"brokerName":"LT-YEZHOU","queueId":2,"topic":"my_topic"},"msgId":"0200011A67C818B4AAC2616911F30000","offsetMsgId":"0200011A00002A9F000000000000030C","queueOffset":1,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}
TransactionMQProducer
(1)application.yml配置文件
将default设置为false,transaction设置为true
spring:
application:
name: appblog
rocketmq:
# 生产者配置
producer:
#该应用是否启用生产者
isOnOff: on
default: false
transaction: true
#发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
groupName: ${spring.application.name}
#mq的nameserver地址
namesrvAddr: 127.0.0.1:9876
#消息最大长度 默认1024*4(4M)
maxMessageSize: 4096
#发送消息超时时间,默认3000
sendMsgTimeout: 3000
#发送消息失败重试次数,默认2
retryTimesWhenSendFailed: 2
再来看TransactionMQProducer,需要注意的是ConditionalOnProperty这个必须得有,而且配置文件中transaction和default中只能有一个是true,不然就会同时创建两个producer,那么启动就会报错。
@Log4j2
@Configuration
public class MyTransactionProducer {
@Autowired
private ProducerConfig producerConfig;
private TransactionMQProducer producer;
/**
* 创建事务消息发送者实例
*
* @return
* @throws MQClientException
*/
@Bean
@ConditionalOnProperty(prefix = "rocketmq.producer", value = "transaction", havingValue = "true")
public TransactionMQProducer transactionMQProducer() throws MQClientException {
log.info(producerConfig.toString());
log.info("TransactionMQProducer 正在创建...");
producer = new TransactionMQProducer(producerConfig.getGroupName());
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setNamesrvAddr(producerConfig.getNamesrvAddr());
producer.setExecutorService(executorService);
producer.start();
log.info("TransactionMQ Producer Server 启动成功");
return producer;
}
public TransactionMQProducer getProducer() {
return producer;
}
}
因为Transaction的流程下,RocketMQ会先发送一个consumer不可见的消息,然后在调用TransactionListener这个接口中的executeLocalTransaction方法执行事务,然后方法内部需要返回一个LocalTransactionState的枚举信息,分别为
public enum LocalTransactionState {
COMMIT_MESSAGE, // 提交
ROLLBACK_MESSAGE, // 回滚
UNKNOW, // 未知
}
public interface TransactionListener {
/**
* When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
*
* @param msg Half(prepare) message
* @param arg Custom business parameter
* @return Transaction state
*/
LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
/**
* When no response to prepare(half) message. broker will send check message to check the transaction status, and this
* method will be invoked to get local transaction status.
*
* @param msg Check message
* @return Transaction state
*/
LocalTransactionState checkLocalTransaction(final MessageExt msg);
}
相应的当我们返回的是COMMIT_MESSAGE
时,那么producer会把消息提交到MQ上,如果是ROLLBACK_MESSAGE
那么producer就会结束,并且不提交到MQ。需要注意的是checkLocalTransaction是MQ长时间没有收到producer的executeLocalTransaction响应的时候调用的,这个类在3.0之后的版本就被阉割了,只有接口,却没有实现,那么直接写一个空实现即可。在这边的代码上,做了一个抽象,把需要实现的executeLocalTransaction抽象出来
public abstract class AbstractTransactionListener implements TransactionListener {
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
return LocalTransactionState.COMMIT_MESSAGE;
}
}
下面是executeLocalTransaction的实现类,简单的做了些业务,然后返回了一个commit
@Configuration
@Log4j2
public class MyTransactionListener extends AbstractTransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
log.info(new String(msg.getBody()));
return LocalTransactionState.COMMIT_MESSAGE;
}
}
Consumer是没有变化的,基本相同,上测试代码
@Log4j2
@RunWith(SpringRunner.class)
@SpringBootTest
public class RocketTest {
@Autowired
private MyTransactionProducer transactionProducer;
@Autowired
private MyTransactionListener transactionListener;
@Test
public void testTransactionRocketMQ() throws Exception {
Message message = new Message("my_topic", "my_tag", "123456", "RocketMQ测试成功".getBytes());
TransactionMQProducer producer = transactionProducer.getProducer();
producer.setTransactionListener(transactionListener);
producer.setSendMsgTimeout(10000);
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("传输成功");
log.info(JSON.toJSONString(sendResult));
}
@Override
public void onException(Throwable e) {
log.error("传输失败", e);
}
});
}
}
ProducerConfig(namesrvAddr=127.0.0.1:9876, groupName=appblog)
TransactionMQProducer 正在创建...
TransactionMQ Producer Server 启动成功
开启[my_topic:my_tag]消费者
ConsumerConfig(groupName=appblog_cn, namesrvAddr=127.0.0.1:9876)
RocketMQ Consumer Server 启动成功
Started RocketTest in 20.065 seconds (JVM running for 23.38)
传输成功
{"messageQueue":{"brokerName":"YEZHOU.ME","queueId":1,"topic":"my_topic"},"msgId":"0200011AA8B418B4AAC260E5F1FE0000","offsetMsgId":"0A030A7200002A9F0000000000000249","queueOffset":1,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}
第1次消息
RocketMQ测试成功
常见错误
磁盘空间不足
org.apache.rocketmq.client.exception.MQBrokerException: CODE: 14 DESC: service not available now, maybe disk full, CL: 0.91 CQ: 0.91 INDEX: 0.91, maybe your broker machine memory too small.
- 清理磁盘空间
- 修改store路径
- 修改logs路径
- 修改rmq_bk_gc.log路径
- 修改rmq_srv_gc.log路径
修改store路径
① 获取RocketMQ源码:https://github.com/apache/rocketmq/
② 修改rocketmq/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig
类中的成员变量默认值,设置为自定义路径。例如:
private String storePathRootDir = File.separator + "app" + File.separator + "mqfile" + File.separator + "store";
private String storePathCommitLog = File.separator + "app" + File.separator + "mqfile" + File.separator + "store"+ File.separator + "commitlog";
③ 使用mvn install 命令打成jar包
④ 备注:storePathCommitLog可以在broker-a.properties等配置文件中指定。其余路径不可以指定
修改logs路径
rocketmq-all-4.4.0-bin-release/conf/logback_broker.xml
rocketmq-all-4.4.0-bin-release/conf/logback_namesrv.xml
rocketmq-all-4.4.0-bin-release/conf/logback_tools.xml
> start mqnamesrv.cmd
> start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
版权声明:
作者:Joe.Ye
链接:https://www.appblog.cn/index.php/2023/03/09/spring-boot-integrate-rocketmq/
来源:APP全栈技术分享
文章版权归作者所有,未经允许请勿转载。
共有 0 条评论