RabbitMQ应用场景:订单自动过期取消
场景描述
场景描述:当用户下单后,状态为待支付,假如在规定的过期时间内尚未支付金额,那么就应该设置订单状态为取消。在不用MQ的情况下,我们可以设置一个定时器,每秒轮询数据库查找超出过期时间且未支付的订单,然后修改状态,但是这种方式会占用很多资源,所以在这里我们可以利用RabbitMQ的死信队列。
死信队列与普通队列一样,在以下情况下会变成死信队列:
- 消息被拒绝(
basic.reject
/basic.nack
)并且不再重新投递requeue=false
- 消息过期 (
rabbitmq Time-To-Live -> messageProperties.setExpiration()
) - 队列超出最大长度
模拟案例
下面是基于Spring Boot的简单模拟案例:
(1)pom.xml
文件
<!-- rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
(2)application.properties
文件
##rabbitmq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
# 消息发送到交换机确认机制,是否确认回调
spring.rabbitmq.publisher-confirms=true
(3)queue以及Exchange的创建与绑定
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfig {
/**
* 订单死信队列交换机标识符 属性值不能改,写死
*/
private static final String ORDER_DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
/**
* 订单死信队列交换机绑定键 标识符 属性值不能改,写死
*/
private static final String ORDER_DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
//----------------------------订单死信定义------------------------------
// 订单过期流程: 消息(创建的订单号)---> 发送到订单死信队列,不消费(设置过期时间)---> (超过设定的过期时间)根据ORDER_DEAD_LETTER_QUEUE_KEY路由死信交换机 ---> 重新消费,根据ORDER_DEAD_LETTER_ROUTING_KEY转发到转发队列(取出消息订单号查找订单,假如仍然未支付就取消订单)---> end
/**
* orderDeadLetterExchange(direct类型交换机)
*/
@Bean("orderDeadLetterExchange")
public Exchange orderDeadLetterExchange() {
return ExchangeBuilder.directExchange("ORDER_DL_EXCHANGE").durable(true).build();
}
/**
* 声明一个订单死信队列.
* x-dead-letter-exchange 对应 死信交换机
* x-dead-letter-routing-key 对应 死信队列
*/
@Bean("orderDeadLetterQueue")
public Queue orderDeadLetterQueue() {
// 参数
Map<String, Object> args = new HashMap<>(2);
// 出现dead letter之后将dead letter重新发送到指定exchange
args.put(ORDER_DEAD_LETTER_QUEUE_KEY, "ORDER_DL_EXCHANGE");
// 出现dead letter之后将dead letter重新按照指定的routing-key发送
args.put(ORDER_DEAD_LETTER_ROUTING_KEY, "RED_KEY");
// name队列名字 durable是否持久化,true保证消息的不丢失, exclusive是否排他队列,如果一个队列被声明为排他队列,该队列仅对首次申明它的连接可见,并在连接断开时自动删除, autoDelete如果该队列没有任何订阅的消费者的话,该队列是否会被自动删除, arguments参数map
return new Queue("ORDER_DL_QUEUE", true, false, false, args);
}
/**
* 定义订单死信队列转发队列.
*/
@Bean("orderRedirectQueue")
public Queue orderRedirectQueue() {
return new Queue("ORDER_REDIRECT_QUEUE", true, false, false);
}
/**
* 死信路由通过 DL_KEY 绑定键绑定到订单死信队列上.
*/
@Bean
public Binding orderDeadLetterBinding() {
return new Binding("ORDER_DL_QUEUE", Binding.DestinationType.QUEUE, "ORDER_DL_EXCHANGE", "DL_KEY", null);
}
/**
* 死信路由通过 RED_KEY 绑定键绑定到订单转发队列上.
*/
@Bean
public Binding orderRedirectBinding() {
return new Binding("ORDER_REDIRECT_QUEUE", Binding.DestinationType.QUEUE, "ORDER_DL_EXCHANGE", "RED_KEY", null);
}
}
(4)消息的发送
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
import java.util.UUID;
@RestController
public class OrderController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/test")
public void sendMessage(@RequestParam String orderNo){
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); // 生成一个消息的唯一id,可不选
// 声明消息处理器 设置消息的编码以及消息的过期时间 时间毫秒值 为字符串
MessagePostProcessor messagePostProcessor = message -> {
MessageProperties messageProperties = message.getMessageProperties();
// 设置编码
messageProperties.setContentEncoding("utf-8");
// 设置过期时间 一分钟
int expiration = 1000 * 20;
messageProperties.setExpiration(String.valueOf(expiration));
return message;
};
// 向ORDER_DL_EXCHANGE 发送消息 形成死信 在OrderQueueReceiver类处理死信交换机转发给转发队列的信息
rabbitTemplate.convertAndSend("ORDER_DL_EXCHANGE", "DL_KEY", orderNo, messagePostProcessor, correlationData);
System.out.println(new Date() + "发送消息,订单号为" + orderNo);
}
}
(5)消息的消费
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Date;
@Component
public class MesageReceiver {
/**
* 监听转发队列 走逻辑判断,尚未支付且超过过期时间的订单号设置为失效订单
* @param message 信息包装类
* @param channel 通道
*/
@RabbitListener(queues = {"ORDER_REDIRECT_QUEUE"})
public void redirect(Message message, Channel channel) throws IOException {
// 从队列中取出订单号
byte[] body = message.getBody();
String orderNo = new String(body,"UTF-8");
System.out.println(new Date() + "消费消息,订单号为" + orderNo);
// 确认消息有没有被收到,false表示手动确认 在处理完消息时,返回应答状态
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
(6)测试结果
发送消息时可以看到RabbitMQ管理界面的ORDER_DL_QUEUE
队列有一条待消费的消息,然后在20秒过期后变成死信队列发送至ORDER_DL_EXCHANGE
交换器,然后交换器根据路由转发到ORDER_REDIRECT_QUEUE
队列,并被监听消费掉。
版权声明:
作者:Joe.Ye
链接:https://www.appblog.cn/index.php/2023/03/27/rabbitmq-application-scenario-automatic-expiration-and-cancellation-of-orders/
来源:APP全栈技术分享
文章版权归作者所有,未经允许请勿转载。
共有 0 条评论