概念

延迟队列,队列内部是有序的,最重要的特性就体现在它的延迟属性上,延迟队列中元素是希望在指定时间到了以后或之前取出和处理,简单来说,延迟队列就是用来存放需要在指定时间被处理的消息的队列。延迟队列相当于是死信队列思想的一种变体。

延迟队列使用场景

  1. 订单在十分钟内未支付则自动取消
  2. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒
  3. 用户注册成功后,如果三天内没有登录则进行短信提醒
  4. ....

这些场景都有一个特点:需要在某个事件发生之后或之前的指定时间点完成某项任务。例如:发生订单生成时间,在10分钟之后检查该订单支付状态,然后将未支付的订单进行关闭。看起来似乎使用定时任务,一直轮循数据,每秒检查一次,取出需要被处理的数据,然后处理不就完事了吗?

如果数据量比较少或者任务的时效性不强,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求,如果对于时间不是严格限制,而是宽松意义上的一周,那么每晚跑个定时任务检查一下所有未支付的账单,确实是一个可行的方案。

但是对于数据量大并且时效性较强的场景,如:“订单十分钟内未支付则关闭”,短期内未支付的订单数据可能有很多,在活动期间甚至会达到百万甚至千万的级别,对于这么庞大的数据量任然使用轮循的方式是不可取的,不仅耗费资源性能低下,给数据库带来压力,而且很可能在短时间内无法完成所有订单的检查,无法满足业务的需求。这时,延迟队列站出来了!

以购票订单的场景为例,业务流程图如下:
订单业务流程图

接下来用SpringBoot整合rabbitmq写一个延时队列的简单案例

基于死信的延迟队列代码架构图:
基于死信的延迟队列

我们创建两个队列QA和QB,它们的消息TTL分别设置为10s和40s,在创建一个普通交换机X和死信交换机Y以及死信队列QD,交换机的类型都是direct。
注意,使用SpringBoot整合之后,像这种交换机与队列之间的绑定关系以及它们的申明等在broker中的操作都通过配置类来实现。

配置类实现:

@Configuration
public class RabbitMQConfig {

    private static final String ORDINARY_EXCHANGE="X";
    private static final String DEAD_EXCHANGE="Y";
    private static final String QUEUE_A="QA";
    private static final String QUEUE_B="QB";
    private static final String DEAD_QUEUE="QD";

    //声明普通交换机X
    @Bean("xExchange") //参数是这个交换机对象的别名
    public DirectExchange xExchange(){
        return new DirectExchange(ORDINARY_EXCHANGE);
    }

    //声明死信交换机
    @Bean("yExchange")
    public DirectExchange yExchange(){
        return new DirectExchange(DEAD_EXCHANGE);
    }

    //声明普通队列QA
    @Bean("QA")
    public Queue qaQueue(){
        HashMap<String,Object> arguments=new HashMap(3);
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        arguments.put("x-dead-letter-routing-key","YD");
        arguments.put("x-message-ttl",10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
    }

    //声明普通队列QB
    @Bean("QB")
    public Queue qbQueue(){
        HashMap<String,Object> arguments=new HashMap(3);
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        arguments.put("x-dead-letter-routing-key","YD");
        arguments.put("x-message-ttl",40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
    }

    //声明死信队列QD
    @Bean("QD")
    public Queue qdQueue(){
        return QueueBuilder.durable(DEAD_QUEUE).build();
    }
    //绑定交换机X与队列QA
    @Bean
    public Binding queueQABindingX(@Qualifier("QA") Queue QA,@Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(QA).to(xExchange).with("XA");
    }
    //绑定交换机X与队列QB
    @Bean
    public Binding queueQBBindingX(@Qualifier("QB") Queue QB,@Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(QB).to(xExchange).with("XB");
    }
    //绑定交换机Y与队列QD
    @Bean
    public Binding queueQDBindingY(@Qualifier("QD") Queue QD,@Qualifier("yExchange") DirectExchange yExchange){
        return BindingBuilder.bind(QD).to(yExchange).with("YD");
    }
}

实现配置类中就相当于把一个个的配置封装成一个个Bean,将它交给IOC容器去管理。使得整体的架构更加的清晰明朗。

生产者

我们用Controller层来代替,将要发送的消息放在请求中,然后接收到请求,得到请求中的消息参数,将消息进行发送。

@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //发送数据
    @RequestMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable("message") String message){
        log.info("当前时间:{},要发送的消息为{}",new Date().toString(),message);
        rabbitTemplate.convertAndSend("X","XA","来自TTL为10s的QA队列的消息:"+message);
        rabbitTemplate.convertAndSend("X","XB","来自TTL为40s的QB队列的消息:"+message);
    }
}

我们把消息发送给两个不同TTL的队列QA和QB,并使用log.info()进行日志打印

消费者

编写一个消息的监听器去接收延迟队列中的消息,需要使用到@RabbitListener()注解,参数为该队列的名称

@Slf4j
@Component
public class MessageConsumer {
    //接收基于死信实现的延迟队列的消息
    @RabbitListener(queues = "QD")
    public void receiveMsg(Message message, Channel channel) throws UnsupportedEncodingException {
        String msg = new String(message.getBody(),"UTF-8");
        log.info("当前时间:{},接收到延迟队列中的消息为:{}",new Date().toString(),msg);
    }
}

结果
我们发起一个Web请求:http://localhost:8080/ttl/sendMsg/下午好!
结果

这样一个延时队列就打造完成了。

但是这种方式有一个弊端,就是我们把消息的TTL设置在队列中,这样的话一个队列对应一种过期时间,假如我们很多种过期时间的需求,那就要生成很多个队列。因此我们需要优化:队列不需要设置消息的TTL,我们在发送消息的时候设置消息的TTL。这样一个队列就可以存放多种不同TTL的消息

试图优化

我们再声明一个不设置TTL的队列QC。

    //声明普通队列QC
    @Bean("QC")
    public Queue qcQueue(){
        HashMap<String,Object> arguments=new HashMap(2);
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        arguments.put("x-dead-letter-routing-key","YD");
        return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
    }
    //绑定交换机X与队列QC
    @Bean
    public Binding queueQCBindingX(@Qualifier("QC")Queue QC,@Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(QC).to(xExchange).with("XC");
    }

我们在Controller中再定义一个接口,接收的参数为消息和过期时间。

//发送消息,带过期时间
@RequestMapping("/sendMsg/{message}/{timeout}")
public void sendMsg(@PathVariable("message") String message,@PathVariable("timeout") String timeout){
    log.info("当前时间:{},要发送的消息为:{},该消息的TTL为:{}",new Date().toString(),message,timeout);
    rabbitTemplate.convertAndSend("X",
            "XC",
            "来自TTL为"+timeout+"ms的QC队列的消息:"+message,
            msg -> {
                //设置消息的TTL
                msg.getMessageProperties().setExpiration(timeout);
                return msg;
            });
}

发送两个不同过期时间web请求:

http://localhost:8080/ttl/sendMsg/下午好!/10000

http://localhost:8080/ttl/sendMsg/下午好!/20000

此时我们想要设置消息的TTL只需在请求后面追加即可。

结果:
看似没毛病

看似没什么问题,但我们再发两次请求:第一次请求的消息TTL为20000ms,第二次TTL为2000ms,按照正常的逻辑,消费者应该先收到第二次请求的消息,因为它的TTL时间更短。但是结果:
哦豁

我们发现,尽管第一次消息的TTL比第二次更长,但还是先接收到第一次发送的消息。

这是因为消息在队列中是排队等待的,RabbitMQ只会检查第一个消息是否过期,如果过期则转发到死信队列中,如果没有过期则会一直等待第一条消息过期,期间就算后面的消息过期了,也不会得到检查。所以这种优化是有问题的。因此只有同种过期时间的消息放在一个队列才不会存在这种问题。但是那样又太麻烦太耗费资源了。

最终解决方案:使用RabbitMQ插件实现延迟队列

基于插件实现延迟队列

安装延迟队列插件:

官方下载:https://www.rabbitmq.com/community-plugins.html

将下载的插件rabbitmq_delayed_message_exchange插件,然后解压放置到RabbitMQ的插件目录plugins

进入RabbitMQ的安装目录下的plugins目录:

cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.x.x/plugins

执行命令启动插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

启动完成后重启RabbitMQ服务
此时如果安装好了,在RabbitMQ的管理页面新增交换机时可以看到type=x-delayed-message这一项

此时,我们可以创建类型为x-delayed-message的交换机。这种基于插件实现的延迟队列的实现过程与基于死信实现的是不同的。
基于插件实现延迟队列,消息是放在了交换机上,等消息的TTL满足,交换机才会把消息推送到队列中去。因此这种方式是不需要再额外创建死信交换机以及死信队列。

基于插件实现延迟队列流程图:
基于插件实现延迟队列

配置类实现:

//基于插件实现延迟队列
@Configuration
public class DelayedQueueConfig {
    private static final String EXCHANGE_NAME="delayed_exchange";
    private static final String QUEUE_NAME="delayed_queue";
    private static final String ROUTING_KEY="delayed";
    private static final String EXCHANGE_TYPE="x-delayed-message";

    //声明自定义(延迟)交换机
    @Bean("delayed_exchange")
    public CustomExchange delayedExchange(){
        Map<String, Object> arguments=new HashMap<>();
        arguments.put("x-delayed-type","direct");
        return new CustomExchange(EXCHANGE_NAME,EXCHANGE_TYPE,false,false,arguments);
    }

    //声明延迟队列
    @Bean("delayed_queue")
    public Queue  delayedQueue(){
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    //将延迟交换机与队列进行绑定
    @Bean
    public Binding delayedBinding(@Qualifier("delayed_exchange") CustomExchange exchange,
                                  @Qualifier("delayed_queue") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY).noargs();
    }
}

生产者:

//发送延迟消息,基于插件
@RequestMapping("/sendDelayedMsg/{message}/{delayedTime}")
public void sendDelayedMsg(@PathVariable("message")String message,
                           @PathVariable("delayedTime")Integer delayedTime){
    log.info("当前时间:{},要发送的消息为:{},该消息的延迟时间为:{}ms",new Date().toString(),message,delayedTime);
    rabbitTemplate.convertAndSend("delayed_exchange","delayed",message,msg -> {
        //设置消息的延迟时间,单位ms
        msg.getMessageProperties().setDelay(delayedTime);
        return msg;
    });
}

消费者:

//接收基于插件实现的延迟消息
@RabbitListener(queues = "delayed_queue")
public void receiveDelayedMsg(Message message) throws UnsupportedEncodingException {
    String msg = new String(message.getBody(), "UTF-8");
    log.info("当前时间:{},接收到延迟队列delayed_queue中的消息为:{}",new Date().toString(),msg);
}

我们继续连发两次请求:

第一次请求:http://localhost:8080/ttl/sendDelayedMsg/good night!/20000

第二次请求:http://localhost:8080/ttl/sendDelayedMsg/good morning!/2000

结果:
成功!

第二次发送的消息因为延迟时间短,所以达到延迟等待的时间后先被消费,复合我们的预期!

因此,通过插件的方式自定义延迟交换机比基于死信实现更加灵活和方便!

总结:延时队列在需要延时处理的场景下非常有用,使用RabbitMQ来实现延时队列可以很好的利用RabbitMQ的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会丢弃。另外,通过RabbitMQ集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。

当然,延时队列还有其他很多选择,比如Java中的DelayQueue,利用Redis的ZSet,利用Quartz或者利用kafka的时间轮,这些方式各有特点,有各自适用的场景。

最后修改:2021 年 07 月 25 日 10 : 40 PM
如果觉得我的文章对你有用,请随意赞赏