Spring Boot集成RocketMQ详解

RocketMQ简介

官网:http://rocketmq.apache.org/docs/quick-start/
GitHub:https://github.com/apache/rocketmq/

RcoketMQ是一款低延迟、高可靠、可伸缩、易于使用的消息中间件。具有以下特性:

  • 支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型
  • 在一个队列中可靠的先进先出(FIFO)和严格的顺序传递
  • 支持拉(pull)和推(push)两种消息模式
  • 单一队列百万消息的堆积能力
  • 支持多种消息协议,如 JMS、MQTT 等
  • 分布式高可用的部署架构,满足至少一次消息传递语义
  • 提供 docker 镜像用于隔离测试和云集群部署
  • 提供配置、指标和监控等功能丰富的 Dashboard

专业术语

(1)Producer

消息生产者,生产者的作用就是将消息发送到 MQ,生产者本身既可以产生消息,如读取文本信息等。也可以对外提供接口,由外部应用来调用接口,再由生产者将收到的消息发送到 MQ。

(2)Producer Group

生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。

(3)Consumer

消息消费者,简单来说,消费 MQ 上的消息的应用程序就是消费者,至于消息是否进行逻辑处理,还是直接存储到数据库等取决于业务需要。

(4)Consumer Group

消费者组,和生产者类似,消费同一类消息的多个consumer实例组成一个消费者组。

(5)Topic

Topic 是一种消息的逻辑分类,比如有订单类的消息,也有库存类的消息,那么就需要进行分类,一个是订单Topic存放订单相关的消息,一个是库存Topic存储库存相关的消息。

(6)Message

Message是消息的载体。一个Message必须指定topic,相当于寄信的地址。Message还有一个可选的tag设置,以便消费端可以基于tag进行过滤消息。也可以添加额外的键值对,例如需要一个业务key来查找broker上的消息,方便在开发过程中诊断问题。

(7)Tag

标签可以被认为是对 Topic 进一步细化。一般在相同业务模块中通过引入标签来标记不同用途的消息。

(8)Broker

Broker 是 RocketMQ 系统的主要角色,其实就是前面一直说的 MQ。Broker 接收来自生产者的消息,储存以及为消费者拉取消息的请求做好准备。

(9)Name Server

Name Server 为 producer 和 consumer 提供路由信息。

RocketMQ架构

NameServer: 提供轻量级的服务发现和路由。每个NameServer记录完整的路由信息,提供等效的读写服务,并支持快速存储扩展
Broker: 通过提供轻量级的Topic和Queue机制来处理消息存储,同时支持推(push)和拉(pull)模式以及主从结构的容错机制
Producer:生产者,产生消息的实例,拥有相同Producer Group的Producer组成一个集群
Consumer:消费者,接收消息进行消费的实例,拥有相同Consumer Group的Consumer组成一个集群

从Broker开始,Broker Master和Broker Slave是主从结构,它们之间会进行数据同步,即Date Sync。同时每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer中。

Producer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Broker Master 建立长连接,且定时向 Broker 发送心跳。Producer 只能将消息发送到 Broker master,但是 Consumer 则不一样,它同时和提供 Topic 服务的 Master 和 Slave
建立长连接,既可以从 Broker Master 订阅消息,也可以从 Broker Slave 订阅消息。

RocketMQ集群部署模式

单master模式

也就是只有一个master节点,称不上是集群,一旦这个master节点宕机,那么整个服务就不可用,适合个人学习使用。

多master模式

多个master节点组成集群,单个 master 节点宕机或者重启对应用没有影响。

优点:所有模式中性能最高
缺点:单个master节点宕机期间,未被消费的消息在节点恢复之前不可用,消息的实时性就受到影响

注意:使用同步刷盘可以保证消息不丢失,同时Topic相对应的queue应该分布在集群中各个节点,而不是只在某各节点上,否则,该节点宕机会对订阅该 topic 的应用造成影响。

多master多slave异步复制模式

在多master模式的基础上,每个master节点都有至少一个对应的slave。master节点可读可写,但是slave只能读不能写,类似于mysql的主备模式。

优点:在 master 宕机时,消费者可以从 slave 读取消息,消息的实时性不会受影响,性能几乎和多master一样
缺点:使用异步复制的同步方式有可能会有消息丢失的问题

多master多slave同步双写模式

同多master多slave异步复制模式类似,区别在于master和slave之间的数据同步方式。

优点:同步双写的同步模式能保证数据不丢失
缺点:发送单个消息 RT 会略长,性能相比异步复制低10%左右
刷盘策略:同步刷盘和异步刷盘(指的是节点自身数据是同步还是异步存储)
同步方式:同步双写和异步复制(指的一组 master 和 slave 之间数据的同步)

注意:要保证数据可靠,需采用同步刷盘和同步双写的方式,但性能会较其他方式低

Spring Boot集成RocketMQ

Maven依赖

<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
</dependency>

DefaultMQProducer

单从producer分类来看主要分成3种:

  • DefaultMQProducer
  • TransactionMQProducer
  • messagingAccessPoint.createProducer()

(1)application.yml配置文件

spring:
  application:
    name: appblog

rocketmq:
  # 生产者配置
  producer:
    #该应用是否启用生产者
    isOnOff: on
    default: true
    transaction: false
    #发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
    groupName: ${spring.application.name}
    #mq的nameserver地址
    namesrvAddr: 127.0.0.1:9876
    #消息最大长度 默认1024*4(4M)
    maxMessageSize: 4096
    #发送消息超时时间,默认3000
    sendMsgTimeout: 3000
    #发送消息失败重试次数,默认2
    retryTimesWhenSendFailed: 2

(2)yml文件配置读取类

@Getter
@Setter
@ConfigurationProperties(prefix = "rocketmq.producer")
@Configuration
@ToString
public class ProducerConfig {
    private String namesrvAddr;

    private String groupName;
}

Producer类的创建类,需要注意的是这个producer在一个程序里面只能出现一个,当重复创建时就会报错

@Log4j2
@Configuration
public class MyDefaultProducer {
    @Autowired
    private ProducerConfig producerConfig;

    private DefaultMQProducer producer;

    /**
     * 创建普通消息发送者实例
     *
     * @return
     * @throws MQClientException
     */
    @Bean
    @ConditionalOnProperty(prefix = "rocketmq.producer", value = "default", havingValue = "true")
    public DefaultMQProducer defaultProducer() throws MQClientException {
        log.info(producerConfig.toString());
        log.info("DefaultMQProducer 正在创建...");
        producer = new DefaultMQProducer(producerConfig.getGroupName());
        //指定NameServer地址, 多个地址以 ; 隔开
        producer.setNamesrvAddr(producerConfig.getNamesrvAddr());
        producer.setVipChannelEnabled(false);
        producer.setRetryTimesWhenSendAsyncFailed(10);
        producer.start();
        log.info("RocketMQ Producer Server 启动成功");
        return producer;
    }

    public String send(String topic, String tags, String body) throws InterruptedException, RemotingException, MQClientException, MQBrokerException, UnsupportedEncodingException {
        Message message = new Message(topic, tags, body.getBytes(RemotingHelper.DEFAULT_CHARSET));
        StopWatch stop = new StopWatch();
        stop.start();
        SendResult result = producer.send(message);
        System.out.println("发送响应 - MsgId:" + result.getMsgId() + ", 发送状态:" + result.getSendStatus());
        stop.stop();
        return "{\"MsgId\":\"" + result.getMsgId() + "\"}";
    }

    public String send(Message message) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        StopWatch stop = new StopWatch();
        stop.start();
        SendResult result = producer.send(message);
        System.out.println("发送响应 - MsgId:" + result.getMsgId() + ", 发送状态:" + result.getSendStatus());
        stop.stop();
        return "{\"MsgId\":\"" + result.getMsgId() + "\"}";
    }

    public void send(Message message, SendCallback callback) throws InterruptedException, RemotingException, MQClientException {
        StopWatch stop = new StopWatch();
        stop.start();
        producer.send(message, callback);
        stop.stop();
    }

}

当producer创建完毕之后就是consumer的公用设置

首先也是yml和配置类的定义

rocketmq:
  # 消费者配置
  consumer:
    #该应用是否启用消费者
    isOnOff: on
    groupName: ${spring.application.name}
    #mq的nameserver地址
    namesrvAddr: 127.0.0.1:9876
    #该消费者订阅的主题和tags("*"号表示订阅该主题下所有的tags),格式:topic~tag1||tag2||tag3;topic2~*;
    topics: DemoTopic~*;
    consumeThreadMin: 20
    consumeThreadMax: 64
    #设置一次消费消息的条数,默认为1条
    rocketmq.consumer.consumeMessageBatchMaxSize: 1
@Getter
@Setter
@ConfigurationProperties(prefix = "rocketmq.consumer")
@Configuration
@ToString
public class ConsumerConfig {
    private String groupName;

    private String namesrvAddr;
}

Consumer类的创建类,对body的操作抽象出来,提供给实现类做处理,方便业务抽取

@Configuration
@Log4j2
public abstract class BaseDefaultConsumer {
    @Autowired
    private ConsumerConfig consumerConfig;

    // 开启消费者监听服务
    public void listener(String topic, String tag) throws MQClientException {
        log.info("开启[" + topic + ":" + tag + "]消费者");
        log.info(consumerConfig.toString());

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerConfig.getGroupName());
        consumer.setNamesrvAddr(consumerConfig.getNamesrvAddr());

        // 程序第一次启动从消息队列头获取数据
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 可以修改每次消费消息的数量,默认设置是每次消费一条
        consumer.setConsumeMessageBatchMaxSize(1);

        // 设置Consumer的消费策略
        // CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息
        // CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
        // CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        // 设置consumer所订阅的Topic和Tag,*代表全部的Tag
        // consumer.subscribe("TopicTest", "*");
        // 订阅topic下TAG为tag的消息
        consumer.subscribe(topic, tag);

        // 开启内部类实现监听
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
                return BaseDefaultConsumer.this.dealBody(msgList);
            }
        });

        // 调用start()方法启动Consumer
        consumer.start();

        log.info("RocketMQ Consumer Server 启动成功");
    }

    // 处理body的业务
    public abstract ConsumeConcurrentlyStatus dealBody(List<MessageExt> msgList);
}

值得注意的是,这里DefaultConsumerConfigure没有定义在什么时候运行,这里以实现ApplicationListener的onApplicationEvent方法开启消费者监听服务

@Log4j2
@Configuration
public class MyConsumer extends BaseDefaultConsumer implements ApplicationListener<ContextRefreshedEvent> {

    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        try {
            super.listener("my_topic", "my_tag");
        } catch (MQClientException e) {
            log.error("消费者监听器启动失败", e);
        }
    }

    @Override
    public ConsumeConcurrentlyStatus dealBody(List<MessageExt> msgList) {
        int num = 1;
        for(MessageExt msg : msgList) {
            log.info("第" + (num++) + "次消息");
            try {
                String msgStr = new String(msg.getBody(), "utf-8");
                log.info(msgStr);
            } catch (UnsupportedEncodingException e) {
                log.error("Body转字符串解析失败");
            }
        }
        // 返回消费状态
        // CONSUME_SUCCESS 消费成功
        // RECONSUME_LATER 消费失败,需要稍后重新消费
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

}

Consumer构建类实现ApplicationListener,在Application启动时就开始执行注册Consumer。相信有些同学会喜欢用@PostConstruct,但是不要这么做,因为它会在init之前执行,那么有些类会加载不完全,会导致无法启动

单元测试及运行结果

@Log4j2
@RunWith(SpringRunner.class)
@SpringBootTest
public class RocketTest {
    @Autowired
    private MyDefaultProducer producer;

    @Test
    public void testDefaultRocketMQ() throws Exception {
        Message message = new Message("my_topic", "my_tag", "123456", "RocketMQ测试成功".getBytes());
        // 这里用到了这个mq的异步处理,类似ajax,可以得到发送到mq的情况,并做相应的处理
        // 不过要注意的是这个是异步的
        producer.send(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("传输成功");
                log.info(JSON.toJSONString(sendResult));
            }
            @Override
            public void onException(Throwable e) {
                log.error("传输失败", e);
            }
        });
    }
}
ProducerConfig(namesrvAddr=127.0.0.1:9876, groupName=appblog)
DefaultMQProducer 正在创建...
RocketMQ Producer Server 启动成功
开启[my_topic:my_tag]消费者
ConsumerConfig(groupName=appblog, namesrvAddr=127.0.0.1:9876)
RocketMQ Consumer Server 启动成功
Started RocketTest in 23.857 seconds (JVM running for 33.942)
第1次消息
RocketMQ测试成功
传输成功
{"messageQueue":{"brokerName":"LT-YEZHOU","queueId":2,"topic":"my_topic"},"msgId":"0200011A67C818B4AAC2616911F30000","offsetMsgId":"0200011A00002A9F000000000000030C","queueOffset":1,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}

TransactionMQProducer

(1)application.yml配置文件

将default设置为false,transaction设置为true

spring:
  application:
    name: appblog

rocketmq:
  # 生产者配置
  producer:
    #该应用是否启用生产者
    isOnOff: on
    default: false
    transaction: true
    #发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
    groupName: ${spring.application.name}
    #mq的nameserver地址
    namesrvAddr: 127.0.0.1:9876
    #消息最大长度 默认1024*4(4M)
    maxMessageSize: 4096
    #发送消息超时时间,默认3000
    sendMsgTimeout: 3000
    #发送消息失败重试次数,默认2
    retryTimesWhenSendFailed: 2

再来看TransactionMQProducer,需要注意的是ConditionalOnProperty这个必须得有,而且配置文件中transaction和default中只能有一个是true,不然就会同时创建两个producer,那么启动就会报错。

@Log4j2
@Configuration
public class MyTransactionProducer {
    @Autowired
    private ProducerConfig producerConfig;

    private TransactionMQProducer producer;

    /**
     * 创建事务消息发送者实例
     *
     * @return
     * @throws MQClientException
     */
    @Bean
    @ConditionalOnProperty(prefix = "rocketmq.producer", value = "transaction", havingValue = "true")
    public TransactionMQProducer transactionMQProducer() throws MQClientException {
        log.info(producerConfig.toString());
        log.info("TransactionMQProducer 正在创建...");
        producer = new TransactionMQProducer(producerConfig.getGroupName());

        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });
        producer.setNamesrvAddr(producerConfig.getNamesrvAddr());
        producer.setExecutorService(executorService);
        producer.start();
        log.info("TransactionMQ Producer Server 启动成功");
        return producer;
    }

    public TransactionMQProducer getProducer() {
        return producer;
    }
}

因为Transaction的流程下,RocketMQ会先发送一个consumer不可见的消息,然后在调用TransactionListener这个接口中的executeLocalTransaction方法执行事务,然后方法内部需要返回一个LocalTransactionState的枚举信息,分别为

public enum LocalTransactionState {
    COMMIT_MESSAGE, // 提交
    ROLLBACK_MESSAGE, // 回滚
    UNKNOW, // 未知
}
public interface TransactionListener {
    /**
     * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
     *
     * @param msg Half(prepare) message
     * @param arg Custom business parameter
     * @return Transaction state
     */
    LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);

    /**
     * When no response to prepare(half) message. broker will send check message to check the transaction status, and this
     * method will be invoked to get local transaction status.
     *
     * @param msg Check message
     * @return Transaction state
     */
    LocalTransactionState checkLocalTransaction(final MessageExt msg);
}

相应的当我们返回的是COMMIT_MESSAGE时,那么producer会把消息提交到MQ上,如果是ROLLBACK_MESSAGE那么producer就会结束,并且不提交到MQ。需要注意的是checkLocalTransaction是MQ长时间没有收到producer的executeLocalTransaction响应的时候调用的,这个类在3.0之后的版本就被阉割了,只有接口,却没有实现,那么直接写一个空实现即可。在这边的代码上,做了一个抽象,把需要实现的executeLocalTransaction抽象出来

public abstract class AbstractTransactionListener implements TransactionListener {

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        return LocalTransactionState.COMMIT_MESSAGE;
    }

}

下面是executeLocalTransaction的实现类,简单的做了些业务,然后返回了一个commit

@Configuration
@Log4j2
public class MyTransactionListener extends AbstractTransactionListener {

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        log.info(new String(msg.getBody()));
        return LocalTransactionState.COMMIT_MESSAGE;
    }

}

Consumer是没有变化的,基本相同,上测试代码

@Log4j2
@RunWith(SpringRunner.class)
@SpringBootTest
public class RocketTest {
    @Autowired
    private MyTransactionProducer transactionProducer;
    @Autowired
    private MyTransactionListener transactionListener;

    @Test
    public void testTransactionRocketMQ() throws Exception {
        Message message = new Message("my_topic", "my_tag", "123456", "RocketMQ测试成功".getBytes());
        TransactionMQProducer producer = transactionProducer.getProducer();
        producer.setTransactionListener(transactionListener);
        producer.setSendMsgTimeout(10000);
        producer.send(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("传输成功");
                log.info(JSON.toJSONString(sendResult));
            }
            @Override
            public void onException(Throwable e) {
                log.error("传输失败", e);
            }
        });
    }

}
ProducerConfig(namesrvAddr=127.0.0.1:9876, groupName=appblog)
TransactionMQProducer 正在创建...
TransactionMQ Producer Server 启动成功
开启[my_topic:my_tag]消费者
ConsumerConfig(groupName=appblog_cn, namesrvAddr=127.0.0.1:9876)
RocketMQ Consumer Server 启动成功
Started RocketTest in 20.065 seconds (JVM running for 23.38)
传输成功
{"messageQueue":{"brokerName":"YEZHOU.ME","queueId":1,"topic":"my_topic"},"msgId":"0200011AA8B418B4AAC260E5F1FE0000","offsetMsgId":"0A030A7200002A9F0000000000000249","queueOffset":1,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}
第1次消息
RocketMQ测试成功

常见错误

磁盘空间不足

org.apache.rocketmq.client.exception.MQBrokerException: CODE: 14  DESC: service not available now, maybe disk full, CL:  0.91 CQ:  0.91 INDEX:  0.91, maybe your broker machine memory too small.
  1. 清理磁盘空间
  2. 修改store路径
  3. 修改logs路径
  4. 修改rmq_bk_gc.log路径
  5. 修改rmq_srv_gc.log路径

修改store路径

① 获取RocketMQ源码:https://github.com/apache/rocketmq/
② 修改rocketmq/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig类中的成员变量默认值,设置为自定义路径。例如:

private String storePathRootDir = File.separator + "app" + File.separator + "mqfile" + File.separator + "store";
private String storePathCommitLog = File.separator + "app" + File.separator + "mqfile" + File.separator + "store"+ File.separator + "commitlog";

③ 使用mvn install 命令打成jar包
④ 备注:storePathCommitLog可以在broker-a.properties等配置文件中指定。其余路径不可以指定

修改logs路径

rocketmq-all-4.4.0-bin-release/conf/logback_broker.xml
rocketmq-all-4.4.0-bin-release/conf/logback_namesrv.xml
rocketmq-all-4.4.0-bin-release/conf/logback_tools.xml
> start mqnamesrv.cmd

> start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

版权声明:
作者:Joe.Ye
链接:https://www.appblog.cn/index.php/2023/03/09/spring-boot-integrate-rocketmq/
来源:APP全栈技术分享
文章版权归作者所有,未经允许请勿转载。

THE END
分享
二维码
打赏
海报
Spring Boot集成RocketMQ详解
RocketMQ简介 官网:http://rocketmq.apache.org/docs/quick-start/ GitHub:https://github.com/apache/rocketmq/ RcoketMQ是一款低延迟、高可靠、可伸缩、易……
<<上一篇
下一篇>>
文章目录
关闭
目 录