RocketMQ生产者Producer发送消息的三种方式

可靠同步发送

同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。

调用DefaultMQProducersend方法

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("example_group_name");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest", "TagA",
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            //Call send message to deliver message to one of brokers.
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

可靠异步发送

异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。 MQ 的异步发送,需要用户实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务器响应即可返回,进行第二条消息发送。发送方通过回调接口接收服务器响应,并对响应结果进行处理。

public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("example_group_name");
        //Launch the instance.
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);
        for (int i = 0; i < 100; i++) {
            final int index = i;
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest", "TagA", "OrderID188",
                    "Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET));
            producer.send(msg, new SendCallback() {
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("%-10d OK %s %n", index,
                            sendResult.getMsgId());
                }

                public void onException(Throwable e) {
                    System.out.printf("%-10d Exception %s %n", index, e);
                    e.printStackTrace();
                }
            });
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

单向(Oneway)发送

单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。

调用DefaultMQProducersendOneway方法

public class OnewayProducer {
    public static void main(String[] args) throws Exception{
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("example_group_name");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest", "TagA",
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            //Call send message to deliver message to one of brokers.
            producer.sendOneway(msg);
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

比较特点

发送方式 发送TPS 发送结果反馈 可靠性
同步发送 不丢失
异步发送 不丢失
单项发送 最快 可能丢失

参考:https://help.aliyun.com/document_detail/29547.html

版权声明:
作者:Joe.Ye
链接:https://www.appblog.cn/index.php/2023/03/19/three-ways-for-rocketmq-producers-to-send-messages/
来源:APP全栈技术分享
文章版权归作者所有,未经允许请勿转载。

THE END
分享
二维码
打赏
海报
RocketMQ生产者Producer发送消息的三种方式
可靠同步发送 同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。 调用DefaultMQProducer的send方法 public class ……
<<上一篇
下一篇>>
文章目录
关闭
目 录