Spring Boot集成RocketMQ详解

RocketMQ简介

官网:http://rocketmq.apache.org/docs/quick-start/
GitHub:https://github.com/apache/rocketmq/

RcoketMQ是一款低延迟、高可靠、可伸缩、易于使用的消息中间件。具有以下特性:

  • 支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型
  • 在一个队列中可靠的先进先出(FIFO)和严格的顺序传递
  • 支持拉(pull)和推(push)两种消息模式
  • 单一队列百万消息的堆积能力
  • 支持多种消息协议,如 JMS、MQTT 等
  • 分布式高可用的部署架构,满足至少一次消息传递语义
  • 提供 docker 镜像用于隔离测试和云集群部署
  • 提供配置、指标和监控等功能丰富的 Dashboard

专业术语

(1)Producer

消息生产者,生产者的作用就是将消息发送到 MQ,生产者本身既可以产生消息,如读取文本信息等。也可以对外提供接口,由外部应用来调用接口,再由生产者将收到的消息发送到 MQ。

(2)Producer Group

生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。

(3)Consumer

消息消费者,简单来说,消费 MQ 上的消息的应用程序就是消费者,至于消息是否进行逻辑处理,还是直接存储到数据库等取决于业务需要。

(4)Consumer Group

消费者组,和生产者类似,消费同一类消息的多个consumer实例组成一个消费者组。

(5)Topic

Topic 是一种消息的逻辑分类,比如有订单类的消息,也有库存类的消息,那么就需要进行分类,一个是订单Topic存放订单相关的消息,一个是库存Topic存储库存相关的消息。

(6)Message

Message是消息的载体。一个Message必须指定topic,相当于寄信的地址。Message还有一个可选的tag设置,以便消费端可以基于tag进行过滤消息。也可以添加额外的键值对,例如需要一个业务key来查找broker上的消息,方便在开发过程中诊断问题。

(7)Tag

标签可以被认为是对 Topic 进一步细化。一般在相同业务模块中通过引入标签来标记不同用途的消息。

(8)Broker

Broker 是 RocketMQ 系统的主要角色,其实就是前面一直说的 MQ。Broker 接收来自生产者的消息,储存以及为消费者拉取消息的请求做好准备。

(9)Name Server

Name Server 为 producer 和 consumer 提供路由信息。

RocketMQ架构

NameServer: 提供轻量级的服务发现和路由。每个NameServer记录完整的路由信息,提供等效的读写服务,并支持快速存储扩展
Broker: 通过提供轻量级的Topic和Queue机制来处理消息存储,同时支持推(push)和拉(pull)模式以及主从结构的容错机制
Producer:生产者,产生消息的实例,拥有相同Producer Group的Producer组成一个集群
Consumer:消费者,接收消息进行消费的实例,拥有相同Consumer Group的Consumer组成一个集群

从Broker开始,Broker Master和Broker Slave是主从结构,它们之间会进行数据同步,即Date Sync。同时每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer中。

Producer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Broker Master 建立长连接,且定时向 Broker 发送心跳。Producer 只能将消息发送到 Broker master,但是 Consumer 则不一样,它同时和提供 Topic 服务的 Master 和 Slave
建立长连接,既可以从 Broker Master 订阅消息,也可以从 Broker Slave 订阅消息。

RocketMQ集群部署模式

单master模式

也就是只有一个master节点,称不上是集群,一旦这个master节点宕机,那么整个服务就不可用,适合个人学习使用。

多master模式

多个master节点组成集群,单个 master 节点宕机或者重启对应用没有影响。

优点:所有模式中性能最高
缺点:单个master节点宕机期间,未被消费的消息在节点恢复之前不可用,消息的实时性就受到影响

注意:使用同步刷盘可以保证消息不丢失,同时Topic相对应的queue应该分布在集群中各个节点,而不是只在某各节点上,否则,该节点宕机会对订阅该 topic 的应用造成影响。

多master多slave异步复制模式

在多master模式的基础上,每个master节点都有至少一个对应的slave。master节点可读可写,但是slave只能读不能写,类似于mysql的主备模式。

优点:在 master 宕机时,消费者可以从 slave 读取消息,消息的实时性不会受影响,性能几乎和多master一样
缺点:使用异步复制的同步方式有可能会有消息丢失的问题

多master多slave同步双写模式

同多master多slave异步复制模式类似,区别在于master和slave之间的数据同步方式。

优点:同步双写的同步模式能保证数据不丢失
缺点:发送单个消息 RT 会略长,性能相比异步复制低10%左右
刷盘策略:同步刷盘和异步刷盘(指的是节点自身数据是同步还是异步存储)
同步方式:同步双写和异步复制(指的一组 master 和 slave 之间数据的同步)

注意:要保证数据可靠,需采用同步刷盘和同步双写的方式,但性能会较其他方式低

Spring Boot集成RocketMQ

Maven依赖

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>

DefaultMQProducer

单从producer分类来看主要分成3种:

  • DefaultMQProducer
  • TransactionMQProducer
  • messagingAccessPoint.createProducer()

(1)application.yml配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
spring:
application:
name: appblog

rocketmq:
# 生产者配置
producer:
#该应用是否启用生产者
isOnOff: on
default: true
transaction: false
#发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
groupName: ${spring.application.name}
#mq的nameserver地址
namesrvAddr: 127.0.0.1:9876
#消息最大长度 默认1024*4(4M)
maxMessageSize: 4096
#发送消息超时时间,默认3000
sendMsgTimeout: 3000
#发送消息失败重试次数,默认2
retryTimesWhenSendFailed: 2

(2)yml文件配置读取类

1
2
3
4
5
6
7
8
9
10
@Getter
@Setter
@ConfigurationProperties(prefix = "rocketmq.producer")
@Configuration
@ToString
public class ProducerConfig {
private String namesrvAddr;

private String groupName;
}

Producer类的创建类,需要注意的是这个producer在一个程序里面只能出现一个,当重复创建时就会报错

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Log4j2
@Configuration
public class MyDefaultProducer {
@Autowired
private ProducerConfig producerConfig;

private DefaultMQProducer producer;

/**
* 创建普通消息发送者实例
*
* @return
* @throws MQClientException
*/
@Bean
@ConditionalOnProperty(prefix = "rocketmq.producer", value = "default", havingValue = "true")
public DefaultMQProducer defaultProducer() throws MQClientException {
log.info(producerConfig.toString());
log.info("DefaultMQProducer 正在创建...");
producer = new DefaultMQProducer(producerConfig.getGroupName());
//指定NameServer地址, 多个地址以 ; 隔开
producer.setNamesrvAddr(producerConfig.getNamesrvAddr());
producer.setVipChannelEnabled(false);
producer.setRetryTimesWhenSendAsyncFailed(10);
producer.start();
log.info("RocketMQ Producer Server 启动成功");
return producer;
}

public String send(String topic, String tags, String body) throws InterruptedException, RemotingException, MQClientException, MQBrokerException, UnsupportedEncodingException {
Message message = new Message(topic, tags, body.getBytes(RemotingHelper.DEFAULT_CHARSET));
StopWatch stop = new StopWatch();
stop.start();
SendResult result = producer.send(message);
System.out.println("发送响应 - MsgId:" + result.getMsgId() + ", 发送状态:" + result.getSendStatus());
stop.stop();
return "{\"MsgId\":\"" + result.getMsgId() + "\"}";
}

public String send(Message message) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
StopWatch stop = new StopWatch();
stop.start();
SendResult result = producer.send(message);
System.out.println("发送响应 - MsgId:" + result.getMsgId() + ", 发送状态:" + result.getSendStatus());
stop.stop();
return "{\"MsgId\":\"" + result.getMsgId() + "\"}";
}

public void send(Message message, SendCallback callback) throws InterruptedException, RemotingException, MQClientException {
StopWatch stop = new StopWatch();
stop.start();
producer.send(message, callback);
stop.stop();
}

}

当producer创建完毕之后就是consumer的公用设置

首先也是yml和配置类的定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
rocketmq:
# 消费者配置
consumer:
#该应用是否启用消费者
isOnOff: on
groupName: ${spring.application.name}
#mq的nameserver地址
namesrvAddr: 127.0.0.1:9876
#该消费者订阅的主题和tags("*"号表示订阅该主题下所有的tags),格式:topic~tag1||tag2||tag3;topic2~*;
topics: DemoTopic~*;
consumeThreadMin: 20
consumeThreadMax: 64
#设置一次消费消息的条数,默认为1条
rocketmq.consumer.consumeMessageBatchMaxSize: 1
1
2
3
4
5
6
7
8
9
10
@Getter
@Setter
@ConfigurationProperties(prefix = "rocketmq.consumer")
@Configuration
@ToString
public class ConsumerConfig {
private String groupName;

private String namesrvAddr;
}

Consumer类的创建类,对body的操作抽象出来,提供给实现类做处理,方便业务抽取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Configuration
@Log4j2
public abstract class BaseDefaultConsumer {
@Autowired
private ConsumerConfig consumerConfig;

// 开启消费者监听服务
public void listener(String topic, String tag) throws MQClientException {
log.info("开启[" + topic + ":" + tag + "]消费者");
log.info(consumerConfig.toString());

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerConfig.getGroupName());
consumer.setNamesrvAddr(consumerConfig.getNamesrvAddr());

// 程序第一次启动从消息队列头获取数据
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 可以修改每次消费消息的数量,默认设置是每次消费一条
consumer.setConsumeMessageBatchMaxSize(1);

// 设置Consumer的消费策略
// CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息
// CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
// CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

// 设置consumer所订阅的Topic和Tag,*代表全部的Tag
// consumer.subscribe("TopicTest", "*");
// 订阅topic下TAG为tag的消息
consumer.subscribe(topic, tag);

// 开启内部类实现监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
return BaseDefaultConsumer.this.dealBody(msgList);
}
});

// 调用start()方法启动Consumer
consumer.start();

log.info("RocketMQ Consumer Server 启动成功");
}

// 处理body的业务
public abstract ConsumeConcurrentlyStatus dealBody(List<MessageExt> msgList);
}

值得注意的是,这里DefaultConsumerConfigure没有定义在什么时候运行,这里以实现ApplicationListener的onApplicationEvent方法开启消费者监听服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Log4j2
@Configuration
public class MyConsumer extends BaseDefaultConsumer implements ApplicationListener<ContextRefreshedEvent> {

@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
try {
super.listener("my_topic", "my_tag");
} catch (MQClientException e) {
log.error("消费者监听器启动失败", e);
}
}

@Override
public ConsumeConcurrentlyStatus dealBody(List<MessageExt> msgList) {
int num = 1;
for(MessageExt msg : msgList) {
log.info("第" + (num++) + "次消息");
try {
String msgStr = new String(msg.getBody(), "utf-8");
log.info(msgStr);
} catch (UnsupportedEncodingException e) {
log.error("Body转字符串解析失败");
}
}
// 返回消费状态
// CONSUME_SUCCESS 消费成功
// RECONSUME_LATER 消费失败,需要稍后重新消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

}

Consumer构建类实现ApplicationListener,在Application启动时就开始执行注册Consumer。相信有些同学会喜欢用@PostConstruct,但是不要这么做,因为它会在init之前执行,那么有些类会加载不完全,会导致无法启动

单元测试及运行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Log4j2
@RunWith(SpringRunner.class)
@SpringBootTest
public class RocketTest {
@Autowired
private MyDefaultProducer producer;

@Test
public void testDefaultRocketMQ() throws Exception {
Message message = new Message("my_topic", "my_tag", "123456", "RocketMQ测试成功".getBytes());
// 这里用到了这个mq的异步处理,类似ajax,可以得到发送到mq的情况,并做相应的处理
// 不过要注意的是这个是异步的
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("传输成功");
log.info(JSON.toJSONString(sendResult));
}
@Override
public void onException(Throwable e) {
log.error("传输失败", e);
}
});
}
}
1
2
3
4
5
6
7
8
9
10
11
ProducerConfig(namesrvAddr=127.0.0.1:9876, groupName=appblog)
DefaultMQProducer 正在创建...
RocketMQ Producer Server 启动成功
开启[my_topic:my_tag]消费者
ConsumerConfig(groupName=appblog, namesrvAddr=127.0.0.1:9876)
RocketMQ Consumer Server 启动成功
Started RocketTest in 23.857 seconds (JVM running for 33.942)
第1次消息
RocketMQ测试成功
传输成功
{"messageQueue":{"brokerName":"LT-YEZHOU","queueId":2,"topic":"my_topic"},"msgId":"0200011A67C818B4AAC2616911F30000","offsetMsgId":"0200011A00002A9F000000000000030C","queueOffset":1,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}

TransactionMQProducer

(1)application.yml配置文件

将default设置为false,transaction设置为true

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
spring:
application:
name: appblog

rocketmq:
# 生产者配置
producer:
#该应用是否启用生产者
isOnOff: on
default: false
transaction: true
#发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
groupName: ${spring.application.name}
#mq的nameserver地址
namesrvAddr: 127.0.0.1:9876
#消息最大长度 默认1024*4(4M)
maxMessageSize: 4096
#发送消息超时时间,默认3000
sendMsgTimeout: 3000
#发送消息失败重试次数,默认2
retryTimesWhenSendFailed: 2

再来看TransactionMQProducer,需要注意的是ConditionalOnProperty这个必须得有,而且配置文件中transaction和default中只能有一个是true,不然就会同时创建两个producer,那么启动就会报错。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Log4j2
@Configuration
public class MyTransactionProducer {
@Autowired
private ProducerConfig producerConfig;

private TransactionMQProducer producer;

/**
* 创建事务消息发送者实例
*
* @return
* @throws MQClientException
*/
@Bean
@ConditionalOnProperty(prefix = "rocketmq.producer", value = "transaction", havingValue = "true")
public TransactionMQProducer transactionMQProducer() throws MQClientException {
log.info(producerConfig.toString());
log.info("TransactionMQProducer 正在创建...");
producer = new TransactionMQProducer(producerConfig.getGroupName());

ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setNamesrvAddr(producerConfig.getNamesrvAddr());
producer.setExecutorService(executorService);
producer.start();
log.info("TransactionMQ Producer Server 启动成功");
return producer;
}

public TransactionMQProducer getProducer() {
return producer;
}
}

因为Transaction的流程下,RocketMQ会先发送一个consumer不可见的消息,然后在调用TransactionListener这个接口中的executeLocalTransaction方法执行事务,然后方法内部需要返回一个LocalTransactionState的枚举信息,分别为

1
2
3
4
5
public enum LocalTransactionState {
COMMIT_MESSAGE, // 提交
ROLLBACK_MESSAGE, // 回滚
UNKNOW, // 未知
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public interface TransactionListener {
/**
* When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
*
* @param msg Half(prepare) message
* @param arg Custom business parameter
* @return Transaction state
*/
LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);

/**
* When no response to prepare(half) message. broker will send check message to check the transaction status, and this
* method will be invoked to get local transaction status.
*
* @param msg Check message
* @return Transaction state
*/
LocalTransactionState checkLocalTransaction(final MessageExt msg);
}

相应的当我们返回的是COMMIT_MESSAGE时,那么producer会把消息提交到MQ上,如果是ROLLBACK_MESSAGE那么producer就会结束,并且不提交到MQ。需要注意的是checkLocalTransaction是MQ长时间没有收到producer的executeLocalTransaction响应的时候调用的,这个类在3.0之后的版本就被阉割了,只有接口,却没有实现,那么直接写一个空实现即可。在这边的代码上,做了一个抽象,把需要实现的executeLocalTransaction抽象出来

1
2
3
4
5
6
7
8
public abstract class AbstractTransactionListener implements TransactionListener {

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
return LocalTransactionState.COMMIT_MESSAGE;
}

}

下面是executeLocalTransaction的实现类,简单的做了些业务,然后返回了一个commit

1
2
3
4
5
6
7
8
9
10
11
@Configuration
@Log4j2
public class MyTransactionListener extends AbstractTransactionListener {

@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
log.info(new String(msg.getBody()));
return LocalTransactionState.COMMIT_MESSAGE;
}

}

Consumer是没有变化的,基本相同,上测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Log4j2
@RunWith(SpringRunner.class)
@SpringBootTest
public class RocketTest {
@Autowired
private MyTransactionProducer transactionProducer;
@Autowired
private MyTransactionListener transactionListener;

@Test
public void testTransactionRocketMQ() throws Exception {
Message message = new Message("my_topic", "my_tag", "123456", "RocketMQ测试成功".getBytes());
TransactionMQProducer producer = transactionProducer.getProducer();
producer.setTransactionListener(transactionListener);
producer.setSendMsgTimeout(10000);
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("传输成功");
log.info(JSON.toJSONString(sendResult));
}
@Override
public void onException(Throwable e) {
log.error("传输失败", e);
}
});
}

}
1
2
3
4
5
6
7
8
9
10
11
ProducerConfig(namesrvAddr=127.0.0.1:9876, groupName=appblog)
TransactionMQProducer 正在创建...
TransactionMQ Producer Server 启动成功
开启[my_topic:my_tag]消费者
ConsumerConfig(groupName=appblog_cn, namesrvAddr=127.0.0.1:9876)
RocketMQ Consumer Server 启动成功
Started RocketTest in 20.065 seconds (JVM running for 23.38)
传输成功
{"messageQueue":{"brokerName":"YEZHOU.ME","queueId":1,"topic":"my_topic"},"msgId":"0200011AA8B418B4AAC260E5F1FE0000","offsetMsgId":"0A030A7200002A9F0000000000000249","queueOffset":1,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}
第1次消息
RocketMQ测试成功

常见错误

磁盘空间不足

1
org.apache.rocketmq.client.exception.MQBrokerException: CODE: 14  DESC: service not available now, maybe disk full, CL:  0.91 CQ:  0.91 INDEX:  0.91, maybe your broker machine memory too small.
  1. 清理磁盘空间
  2. 修改store路径
  3. 修改logs路径
  4. 修改rmq_bk_gc.log路径
  5. 修改rmq_srv_gc.log路径

修改store路径

① 获取RocketMQ源码:https://github.com/apache/rocketmq/
② 修改rocketmq/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig类中的成员变量默认值,设置为自定义路径。例如:

1
2
private String storePathRootDir = File.separator + "app" + File.separator + "mqfile" + File.separator + "store";
private String storePathCommitLog = File.separator + "app" + File.separator + "mqfile" + File.separator + "store"+ File.separator + "commitlog";

③ 使用mvn install 命令打成jar包
④ 备注:storePathCommitLog可以在broker-a.properties等配置文件中指定。其余路径不可以指定

修改logs路径

1
2
3
rocketmq-all-4.4.0-bin-release/conf/logback_broker.xml
rocketmq-all-4.4.0-bin-release/conf/logback_namesrv.xml
rocketmq-all-4.4.0-bin-release/conf/logback_tools.xml
1
2
3
> start mqnamesrv.cmd

> start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

Powered by AppBlog.CN     浙ICP备14037229号

Copyright © 2012 - 2020 APP开发技术博客 All Rights Reserved.

访客数 : | 访问量 :