当前位置: 首页 > 图灵资讯 > 技术篇> 延迟消息的五种实现方案

延迟消息的五种实现方案

来源:图灵教育
时间:2023-06-14 09:41:22

在生产者将消息发送到消息队列后,他们不期望立即消费,而是等待消费者在指定时间后消费。这种消息通常被称为延迟消息。事实上,延迟消息的应用场景非常广泛,如以下场景:

  • 在线直播教学时,在课程开始前15分钟通知所有学生准备上课。
  • 订单提交成功后一小时内未付款,订单需及时关闭并释放相应商品的库存。
  • 当用户未登录超过15天时,将召回推送发送给用户。
  • 提交工单后24小时以上未处理,并向相关负责人发出催促处理提醒。

针对延迟消息,本文分享了五个实现方案。让我们逐一讨论各种方案的总体实现和优缺点。

一、Redis

Redis中有一个有序的集合(Sorted Set)在有序集合中的数据结构中,所有元素都是基于它的 Score 排序。我们可以把消息的预期时间戳作为Score,定期任务可以不断读取Score大于当前时间的元素。基本流程如下:

  1. 调用API,传输执行时间、消息体等数据。
  2. 在Redis的String结构中,生成唯一的key,序列化消息体数据。
  3. 将key和执行时间戳存储在Redis的有序集合结构中,而不是存储特定的信息数据,而是存储唯一的key。
  4. 定时任务不断读取时间戳最小的消息。
  5. 如果时间戳小于当前时间,将key放入Redis作为队列的List结构中。
  6. 另一个定时任务不断地从队列中读取需要消费的信息。
  7. 根据key获取消息体数据,消费消息。
  8. 如果消费信息成功,删除key对应的消息数据。
  9. 如果消费信息失败,重新存储key和时间戳(加60秒)。

具体方案如下:

延迟消息的五种实现方案_有序集合

为了避免在有序集合中存储过多的延迟信息、存储操作和查询操作速度较慢的问题,可以建立多个有序集合,并通过哈希算法将信息路由集中到不同的有序集合中。

优点

简单实用,落地快。

缺点
  • 单个有序集合不能支持太多的数据。
  • 不断阅读定期任务可能会导致不必要的请求。

因此,Redis方案不是一个非常成熟的方案,而是一个支持小新闻快速实施的方案。

二、RabbitMQ

RabitMQ本身并不支持延迟消息功能,一般的做法是通过最大的生存时间(Time-To-Live)和死信交换机(Dead Letter Exchanges)模拟延迟消息功能的两个特性。如果消息超过最大生存时间,不被消费,就会变成死信,重新送到死信交换机,然后死信交换机按照绑定规则转发到相应的死信队列。监控队列可以重新消耗消息。

然而,RabitMQ的3.5.8版本结束后,我们可以使用官方推荐的rabbitmq delayed message exchange插件很容易实现延迟消息的功能。

安装插件

先在官方插件列表页面下载rabbitmq__delayed_message_exchang插件,然后复制到RabitMQ每个节点的plugins目录中。使用命令启用插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

一旦插件启用,我们就可以开始使用它了。

使用示例

安装插件后,将生成Exchange类型,支持延迟交付机制:x-delayed-message。收到这类消息后,消息不会立即发送到目标队列,而是存储在mnesia表中,然后在检测到可交付时间后发送到目标队列。

使用延迟消息时,需要先声明一个x-delayed-message交换器类型:

Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");channel.exchangeDeclare("one-more-exchange", "x-delayed-message", true, false, args);

在header中添加延迟消息x-delay,毫秒表示延迟:

byte[] messageBodyBytes = "This is a delayed message".getBytes();AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();headers = new HashMap<String, Object>();headers.put("x-delay", 5000);props.headers(headers);channel.basicPublish("one-more-exchange", "", props.build(), messageBodyBytes);

优点

大品牌中间件,可靠稳定。

缺点

由于master单节点,性能瓶颈和吞吐量有限。

三、ActiveMQ

ActiveMQ开始支持5.4及以上版本的持久延迟信息功能,甚至支持Cron表达式。默认情况下,如果需要修改配置文件activemq,则不会打开该功能.xml,将schedulerSupport属性设置为broker节点的true,例如:

<broker xmlns="http://activemq.apache.org/schema/core" schedulerSupport="true"></broker>

当服务端打开延迟消息功能时,客户端可以利用以下属性发送延迟消息:

  • AMQ_SCHEDULED_DELAY:该消息延迟发送时间,单位为毫秒。
  • AMQ_SCHEDULED_PERIOD:单位每次重新发送消息的时间间隔为毫秒。
  • AMQ_SCHEDULED_REPEAT:重新发送消息的次数。
  • AMQ_SCHEDULED_CRON:使用Cron表达式设置发送消息的时机。

使用示例
  1. 发送消息延迟60秒:

MessageProducer producer = session.createProducer(destination);TextMessage message = session.createTextMessage("This is a delayed message");message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 60 * 1000);producer.send(message);

  1. 每次间隔10秒,消息延迟60秒,重复发送5次:

MessageProducer producer = session.createProducer(destination);TextMessage message = session.createTextMessage("This is a delayed message");message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 60 * 1000);message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 10 * 1000);message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 4);producer.send(message);

  1. 每天凌晨3点,使用Cron表达式发送一条消息:

MessageProducer producer = session.createProducer(destination);TextMessage message = session.createTextMessage("This is a delayed message");message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, " 0 3 * * *");producer.send(message);

需要注意的是,Cron表达式由五位组成,分别表示分钟(059)、小时(023)、日(131)、月(112)、星期(0~6,表示从周日到周六)。

  1. Cron表达式的优先级高于其他参数。如果在设置Cron表达式的同时设置其他参数,则将在每次Cron执行时应用其他参数。例如,该消息延迟60秒,重复5次,间隔10秒,每小时发送一系列信息:

MessageProducer producer = session.createProducer(destination);TextMessage message = session.createTextMessage("test msg");message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 60 * 1000);message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 10 * 1000);message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 4);producer.send(message);

优点

大品牌中间件,可靠稳定,甚至支持Cron表达式。

缺点

由于master单节点,性能瓶颈和吞吐量有限。

四、RocketMQ

在RocketMQ中,支持延迟信息,但不支持任何时间精度的延迟信息,只支持特定级别的延迟信息。如果你想支持任何时间精度,你不能避免在Broker层面进行新闻排序,然后涉及持久性考虑,那么新闻排名将不可避免地产生巨大的性能成本。

消息延迟水平分别为1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,共18个级别。发送消息时,可以设置消息延迟级别,设置消息延迟级别时有以下三种情况:

  1. 如果设置消息延迟等级为0,则该消息为非延迟消息。
  2. 设置消息延迟级别大于或等于1,小于或等于18。如果设置消息延迟级别等于1,则延迟1s;设置新闻延迟等级为2,则延迟5s,以此类推。
  3. 如果新闻延迟级别大于18,则新闻延迟级别为18,如果新闻延迟级别等于20,则延迟2h。
使用示例

首先,为消费延迟写一个消费者信息:

public class Consumer {    public static void main(String[] args) throws MQClientException {        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");        // 实例消费者        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OneMoreGroup");        // 设置Nameserver的地址        consumer.setNamesrvAddr("localhost:9876");        // 订阅一个或多个Topic,标签过滤需要消费的消息        consumer.subscribe("OneMoreTopic", "*");        // 注册回调实现类处理从broker拉回来的消息        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {            System.out.printf("%s %s Receive New Messages:%n"                    , sdf.format(new Date())                    , Thread.currentThread().getName());            for (MessageExt msg : msgs) {                System.out.printf("\tMsg Id: %s%n", msg.getMsgId());                System.out.printf("\tBody: %s%n", new String(msg.getBody()));            }            // 这个消息已经成功消费了            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;        });        // 启动消费者实例        consumer.start();        System.out.println("Consumer Started.");    }}

另一位制作延迟信息的制作人用于发送延迟信息:

public class DelayProducer {    public static void main(String[] args) throws Exception {        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");        // Producer        DefaultMQProducer producer = new DefaultMQProducer("OneMoreGroup");        // 设置Nameserver的地址        producer.setNamesrvAddr("localhost:9876");        // 启动Producer实例        producer.start();        Message msg = new Message("OneMoreTopic"                , "DelayMessage", "This is a delay message.".getBytes());        //"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"        //设置消息延迟级别为3,也就是说,延迟10s。        msg.setDelayTimeLevel(3);        // 将消息发送到Broker        SendResult sendResult = producer.send(msg);        // 通过sendresult返回消息是否成功送达        System.out.printf("%s Send Status: %s, Msg Id: %s %n"                , sdf.format(new Date())                , sendResult.getSendStatus()                , sendResult.getMsgId());        // 如果不发消息,关闭Producer实例。        producer.shutdown();    }}

生产者运营后,会发送延迟消息:

10:37:14.992 Send Status: SEND_OK, Msg Id: C0A8006DAB018B4AC216EDB60000000000000

10秒后,消费者收到了这条延迟消息:

10:37:25.026 ConsumeMessageThread_1 Receive New Messages:Msg Id: B6000Body,C006DAB018B4AC216E: This is a delay message.

优点

分布式、高吞吐量、高性能、高可靠性。

缺点

不能自定义延迟时间,只支持18个特定级别的延迟。

五、定制RocketMQ

上面提到的不支持定制延迟时间的RocketMQ是开源版,但是阿里云商业版的RocketMQ是支持的,可能是因为业务需求弱或者考虑业务因素,原因不得而知。如果可能的话,可以直接去阿里云;如果可能的话,可以修改开源版RocketMQ的源码,满足自己的需求。知己知彼,百战不殆,先看看RocketMQ开源版,如何支持18个时间级别:

原理分析

RocketMQ源码的版本号为4.7.1.不同版本的源码略有差异。

CommitLog

在Commitlog中,对延迟消息进行了一些处理:

// 延迟级别大于0,即延迟消息if (msg.getDelayTimeLevel() > 0) {    // 如果当前延迟水平大于最大延迟水平,则判断当前延迟水平    // 将当前延迟水平设置为最大延迟水平。    if (msg.getDelayTimeLevel() > this.defaultMessageStore            .getScheduleMessageService().getMaxDelayLevel()) {        msg.setDelayTimeLevel(this.defaultMessageStore                .getScheduleMessageService().getMaxDelayLevel());    }    // 获取延迟消息的主题,    // RMQ_其中RMQ__SYS_SCHEDULESCHEDULE__TOPIC值TOPIC_XXXX    topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;    // 队列ID根据延迟级别获取延迟消息,    // 事实上,队列ID的延迟级别减少了1    queueId = ScheduleMessageService.delaylevel2QueueId(msg.getDelayTimeLevel());    // 备份真正的主题和队列ID    MessageAccessor.putProperty(msg            , MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());    MessageAccessor.putProperty(msg            , MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));    msg.setPropertiesString(MessageDecoder.messageproperties2String(msg.getProperties()));    // 设置延迟消息的主题和队列ID    msg.setTopic(topic);    msg.setQueueId(queueId);}

可以看出,每一个延迟消息的主题都被暂时改为SCHEDULE_TOPIC_XXXX,并根据延迟级别延迟消息更改新的队列ID。接下来,ScheduleMessageservice将处理延迟消息。

ScheduleMessageService

Schedulemessageservice由defaultmessagestore初始化,包括构建对象和调用load方法。最后,执行ScheduleMessageService。start方法:

public void start() {    // 使用Atomicbolean确保start方法只能有效执行一次    if (started.compareAndSet(false, true)) {        this.timer = new Timer("ScheduleMessageTimerThread", true);        // 所有延迟级别遍历        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {            // key是延迟级别            Integer level = entry.getKey();            // value对应于延迟级别的毫秒数            Long timeDelay = entry.getValue();            // 根据延迟级别获得相应队列的偏移            Long offset = this.offsetTable.get(level);            // 若偏移量为null,则设置为0            if (null == offset) {                offset = 0L;            }            if (timeDelay != null) {                // 为每个延迟级别创建定时任务,                // 第一次启动任务延迟为FIRST_DELAY_TIME,也就是1秒                this.timer.schedule(                        new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);            }        }        // 每隔flushDelayoffsetinterval延迟10秒后执行一次任务。        // 其中,flushdelayoffsetinterval默认配置也是10秒        this.timer.scheduleAtFixedRate(new TimerTask() {            @Override            public void run() {                try {                    // 每个队列消费的持续偏移                    if (started.get()) ScheduleMessageService.this.persist();                } catch (Throwable e) {                    log.error("scheduleAtFixedRate flush exception", e);                }            }        }, 10000, this.defaultMessageStore        .getMessageStoreConfig().getFlushDelayOffsetInterval());    }}

遍历所有延迟级别,根据延迟级别获得相应队列的偏移。如果没有偏移,则设置为0。然后为每个延迟级别创建一个定时任务。第一个启动任务延迟1秒,第二个和以后的启动任务延迟是延迟级别的相应延迟时间。

然后,为每个队列的消费偏移创建了一个定时任务。持久频率由flushdelayofsetinterval属性配置,默认为10秒。

定时任务

ScheduleMessageServicestart方法实施后,每个延迟级别都会创建自己的定时任务。这里定时任务的具体实现在deliverdelayedmesagetimertask类中。它的核心代码是executeontimeup方法。我们来看看主要部分:

// 根据主题和队列ID获取消息队列ConsumeQueue cq =        ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(                TopicValidator.RMQ_SYS_SCHEDULE_TOPIC                , delaylevel2QueueId(delayLevel));

如果没有获得相应的消息队列,则在DELAY_FOR_A_WHILE(默认为100)毫秒后执行任务。如果获得,继续执行以下操作:

// SelectMapedbuferesulttersultter根据消费偏移量从消息队列中获取所有有效消息 bufferCQ = cq.getIndexBuffer(this.offset);

如果你没有得到有效的信息,那就在DELAY_FOR_A_WHILE(默认为100)毫秒后执行任务。如果获得,继续执行以下操作:

// 所有新闻for遍历 (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {    // 获取消息的物理偏移量    long offsetPy = bufferCQ.getByteBuffer().getLong();    // 获取消息的物理长度    int sizePy = bufferCQ.getByteBuffer().getInt();    long tagsCode = bufferCQ.getByteBuffer().getLong();        // 省略部分代码...    long now = System.currentTimeMillis();    // 计算消息应该消耗的时间    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);// 计算下一条消息的偏移量    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE)long countdown = deliverTimestamp - now;    // 省略部分代码...}

如果当前消息不到消费时间,那就是countdown毫秒后执行任务。在消费时间内,继续执行以下操作:

// Messageexttt根据消息的物理偏移量和大小获取消息 msgExt =    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(            offsetPy, sizePy);

若获得消息,则继续执行以下操作:

// 新消息的重建包括:/// 1.删除消息的延迟级别// 2.恢复真实的消息主题和队列Idmessageeextbrokeriner msgInner = this.messageTimeup(msgExt);

// 将消息重新发送到PutMessageresultteresulter putMessageResult =        ScheduleMessageService.this.writeMessageStore                .putMessage(msgInner);

消除了消息的延迟级别,恢复了真实的消息主题和队列ID,并将消息发送到真实的消息队列之后,消费者可以立即消费。

由于篇幅有限,源码的细节没有展开太多,感兴趣的朋友可以去GitHub下载源码仔细阅读。

定制化方案

通过以上对源码的分析,可以总结出延迟消息的实现步骤:

  1. 如果消息的延迟级别大于0,则表示该消息为延迟消息,修改该消息的主题为SCHEDULE_TOPIC_XXXX,队列ID为延迟级减1。
  2. SCHEDULE_TOPIC_XXXX队列中。
  3. 根据上次提取的偏移量,定时任务不断从队列中提取所有消息。
  4. 根据消息的物理偏移量和大小再次获取消息。
  5. 根据新闻属性重新创建新闻,删除延迟级别,恢复原主题和队列ID。
  6. 将消息重新发送到原主题队列,供消费者消费。

总结如下图所示:

延迟消息的五种实现方案_定时任务_02

在commitlog中,我们可以根据自定义的延迟时间选择最大的延迟级别,比如15分钟消费的消息,那么最大的延迟级别是10分钟。在ScheduleMessageservice中,判断消息是否真的是消费时间。如果是消费时间,恢复原主题和队列ID;如果没有时间消费,则选择最大延迟级别重新修改主题和队列ID。如下图:

延迟消息的五种实现方案_Redis_03

Commitlog中存储着新闻主体和元数据,只存储在Commitlog中的起始物理偏移、新闻大小和新闻标签的哈希值。虽然新闻需要重新放入队列,但空间浪费仍然相对有限。

优点

支持自定义延迟时间的分布式、高吞吐量、高性能、高可靠性。

缺点

定制RocketMQ不易维护,不能升级新版本。

总结

从延迟消息的概念和应用场景出发,我们逐一讨论了五种不同的实现方案:

  • 使用Redis的Sorted Set结构。
  • rabbitmq使用RabbitMQ delayed message exchange插件。
  • 使用ActiveMQ的5.4及以上版本的延迟消息功能。
  • 使用RocketMQ只支持特定级别的延迟消息。
  • 定制RocketMQ,通过重新计算延迟级别来实现自定义延迟。

以上每个方案都有自己的优缺点,所以没有一个普遍的延迟消息方案,最合适的方案需要根据数据规模和业务需求的实际情况来确定。

  • 在线直播教学时,在课程开始前15分钟通知所有学生准备上课。
  • 订单提交成功后一小时内未付款,订单需及时关闭并释放相应商品的库存。
  • 当用户未登录超过15天时,将召回推送发送给用户。
  • 提交工单后24小时以上未处理,并向相关负责人发出催促处理提醒。

针对延迟消息,本文分享了五个实现方案。让我们逐一讨论各种方案的总体实现和优缺点。

一、Redis

Redis中有一个有序的集合(Sorted Set)在有序集合中的数据结构中,所有元素都是基于它的 Score 排序。我们可以把消息的预期时间戳作为Score,定期任务可以不断读取Score大于当前时间的元素。基本流程如下:

  1. 调用API,传输执行时间、消息体等数据。
  2. 在Redis的String结构中,生成唯一的key,序列化消息体数据。
  3. 将key和执行时间戳存储在Redis的有序集合结构中,而不是存储特定的信息数据,而是存储唯一的key。
  4. 定时任务不断读取时间戳最小的消息。
  5. 如果时间戳小于当前时间,将key放入Redis作为队列的List结构中。
  6. 另一个定时任务不断地从队列中读取需要消费的信息。
  7. 根据key获取消息体数据,消费消息。
  8. 如果消费信息成功,删除key对应的消息数据。
  9. 如果消费信息失败,重新存储key和时间戳(加60秒)。

具体方案如下:

延迟消息的五种实现方案_有序集合

为了避免在有序集合中存储过多的延迟信息、存储操作和查询操作速度较慢的问题,可以建立多个有序集合,并通过哈希算法将信息路由集中到不同的有序集合中。

优点

简单实用,落地快。

缺点
  • 单个有序集合不能支持太多的数据。
  • 不断阅读定期任务可能会导致不必要的请求。

因此,Redis方案不是一个非常成熟的方案,而是一个支持小新闻快速实施的方案。

二、RabbitMQ

RabitMQ本身并不支持延迟消息功能,一般的做法是通过最大的生存时间(Time-To-Live)和死信交换机(Dead Letter Exchanges)模拟延迟消息功能的两个特性。如果消息超过最大生存时间,不被消费,就会变成死信,重新送到死信交换机,然后死信交换机按照绑定规则转发到相应的死信队列。监控队列可以重新消耗消息。

然而,RabitMQ的3.5.8版本结束后,我们可以使用官方推荐的rabbitmq delayed message exchange插件很容易实现延迟消息的功能。

安装插件

先在官方插件列表页面下载rabbitmq__delayed_message_exchang插件,然后复制到RabitMQ每个节点的plugins目录中。使用命令启用插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

一旦插件启用,我们就可以开始使用它了。

使用示例

安装插件后,将生成Exchange类型,支持延迟交付机制:x-delayed-message。收到这类消息后,消息不会立即发送到目标队列,而是存储在mnesia表中,然后在检测到可交付时间后发送到目标队列。

使用延迟消息时,需要先声明一个x-delayed-message交换器类型:

Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");channel.exchangeDeclare("one-more-exchange", "x-delayed-message", true, false, args);

在header中添加延迟消息x-delay,毫秒表示延迟:

byte[] messageBodyBytes = "This is a delayed message".getBytes();AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();headers = new HashMap<String, Object>();headers.put("x-delay", 5000);props.headers(headers);channel.basicPublish("one-more-exchange", "", props.build(), messageBodyBytes);

优点

大品牌中间件,可靠稳定。

缺点

由于master单节点,性能瓶颈和吞吐量有限。

三、ActiveMQ

ActiveMQ开始支持5.4及以上版本的持久延迟信息功能,甚至支持Cron表达式。默认情况下,如果需要修改配置文件activemq,则不会打开该功能.xml,将schedulerSupport属性设置为broker节点的true,例如:

<broker xmlns="http://activemq.apache.org/schema/core" schedulerSupport="true"></broker>

当服务端打开延迟消息功能时,客户端可以利用以下属性发送延迟消息:

  • AMQ_SCHEDULED_DELAY:该消息延迟发送时间,单位为毫秒。
  • AMQ_SCHEDULED_PERIOD:单位每次重新发送消息的时间间隔为毫秒。
  • AMQ_SCHEDULED_REPEAT:重新发送消息的次数。
  • AMQ_SCHEDULED_CRON:使用Cron表达式设置发送消息的时机。

使用示例
  1. 发送消息延迟60秒:

MessageProducer producer = session.createProducer(destination);TextMessage message = session.createTextMessage("This is a delayed message");message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 60 * 1000);producer.send(message);

  1. 每次间隔10秒,消息延迟60秒,重复发送5次:

MessageProducer producer = session.createProducer(destination);TextMessage message = session.createTextMessage("This is a delayed message");message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 60 * 1000);message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 10 * 1000);message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 4);producer.send(message);

  1. 每天凌晨3点,使用Cron表达式发送一条消息:

MessageProducer producer = session.createProducer(destination);TextMessage message = session.createTextMessage("This is a delayed message");message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, " 0 3 * * *");producer.send(message);

需要注意的是,Cron表达式由五位组成,分别表示分钟(059)、小时(023)、日(131)、月(112)、星期(0~6,表示星期日至星期六)。

  1. Cron表达式的优先级高于其他参数。如果在设置Cron表达式的同时设置其他参数,则将在每次Cron执行时应用其他参数。例如,该消息延迟60秒,重复5次,间隔10秒,每小时发送一系列信息:

MessageProducer producer = session.createProducer(destination);TextMessage message = session.createTextMessage("test msg");message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 60 * 1000);message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 10 * 1000);message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 4);producer.send(message);

优点

大品牌中间件,可靠稳定,甚至支持Cron表达式。

缺点

由于master单节点,性能瓶颈和吞吐量有限。

四、RocketMQ

在RocketMQ中,支持延迟信息,但不支持任何时间精度的延迟信息,只支持特定级别的延迟信息。如果你想支持任何时间精度,你不能避免在Broker层面进行新闻排序,然后涉及持久性考虑,那么新闻排名将不可避免地产生巨大的性能成本。

消息延迟水平分别为1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,共18个级别。发送消息时,可以设置消息延迟级别,设置消息延迟级别时有以下三种情况:

  1. 如果设置消息延迟等级为0,则该消息为非延迟消息。
  2. 设置消息延迟级别大于或等于1,小于或等于18。如果设置消息延迟级别等于1,则延迟1s;设置新闻延迟等级为2,则延迟5s,以此类推。
  3. 如果新闻延迟级别大于18,则新闻延迟级别为18,如果新闻延迟级别等于20,则延迟2h。
使用示例

首先,为消费延迟写一个消费者信息:

public class Consumer {    public static void main(String[] args) throws MQClientException {        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");        // 实例消费者        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OneMoreGroup");        // 设置Nameserver的地址        consumer.setNamesrvAddr("localhost:9876");        // 订阅一个或多个Topic,标签过滤需要消费的消息        consumer.subscribe("OneMoreTopic", "*");        // 注册回调实现类处理从broker拉回来的消息        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {            System.out.printf("%s %s Receive New Messages:%n"                    , sdf.format(new Date())                    , Thread.currentThread().getName());            for (MessageExt msg : msgs) {                System.out.printf("\tMsg Id: %s%n", msg.getMsgId());                System.out.printf("\tBody: %s%n", new String(msg.getBody()));            }            // 这个消息已经成功消费了            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;        });        // 启动消费者实例        consumer.start();        System.out.println("Consumer Started.");    }}

另一位制作延迟信息的制作人用于发送延迟信息:

public class DelayProducer {    public static void main(String[] args) throws Exception {        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");        // Producer        DefaultMQProducer producer = new DefaultMQProducer("OneMoreGroup");        // 设置Nameserver的地址        producer.setNamesrvAddr("localhost:9876");        // 启动Producer实例        producer.start();        Message msg = new Message("OneMoreTopic"                , "DelayMessage", "This is a delay message.".getBytes());        //"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"        //设置消息延迟级别为3,也就是说,延迟10s。        msg.setDelayTimeLevel(3);        // 将消息发送到Broker        SendResult sendResult = producer.send(msg);        // 通过sendresult返回消息是否成功送达        System.out.printf("%s Send Status: %s, Msg Id: %s %n"                , sdf.format(new Date())                , sendResult.getSendStatus()                , sendResult.getMsgId());        // 如果不发消息,关闭Producer实例。        producer.shutdown();    }}

生产者运行后,会发送延迟消息:/p>

10:37:14.992 Send Status: SEND_OK, Msg Id: C0A8006DAB018B4AC216EDB60000000000000

10秒后,消费者收到了这条延迟消息:

10:37:25.026 ConsumeMessageThread_1 Receive New Messages:Msg Id: B6000Body,C006DAB018B4AC216E: This is a delay message.

优点

分布式、高吞吐量、高性能、高可靠性。

缺点

不能自定义延迟时间,只支持18个特定级别的延迟。

五、定制RocketMQ

上面提到的不支持定制延迟时间的RocketMQ是开源版,但是阿里云商业版的RocketMQ是支持的,可能是因为业务需求弱或者考虑业务因素,原因不得而知。如果可能的话,可以直接去阿里云;如果可能的话,可以修改开源版RocketMQ的源码,满足自己的需求。知己知彼,百战不殆,先看看RocketMQ开源版,如何支持18个时间级别:

原理分析

RocketMQ源码的版本号为4.7.1.不同版本的源码略有差异。

CommitLog

在Commitlog中,对延迟消息进行了一些处理:

// 延迟级别大于0,即延迟消息if (msg.getDelayTimeLevel() > 0) {    // 如果当前延迟水平大于最大延迟水平,则判断当前延迟水平    // 将当前延迟水平设置为最大延迟水平。    if (msg.getDelayTimeLevel() > this.defaultMessageStore            .getScheduleMessageService().getMaxDelayLevel()) {        msg.setDelayTimeLevel(this.defaultMessageStore                .getScheduleMessageService().getMaxDelayLevel());    }    // 获取延迟消息的主题,    // RMQ_其中RMQ__SYS_SCHEDULESCHEDULE__TOPIC值TOPIC_XXXX    topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;    // 队列ID根据延迟级别获取延迟消息,    // 事实上,队列ID的延迟级别减少了1    queueId = ScheduleMessageService.delaylevel2QueueId(msg.getDelayTimeLevel());    // 备份真正的主题和队列ID    MessageAccessor.putProperty(msg            , MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());    MessageAccessor.putProperty(msg            , MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));    msg.setPropertiesString(MessageDecoder.messageproperties2String(msg.getProperties()));    // 设置延迟消息的主题和队列ID    msg.setTopic(topic);    msg.setQueueId(queueId);}

可以看出,每一个延迟消息的主题都被暂时改为SCHEDULE_TOPIC_XXXX,并根据延迟级别延迟消息更改新的队列ID。接下来,ScheduleMessageservice将处理延迟消息。

ScheduleMessageService

Schedulemessageservice由defaultmessagestore初始化,包括构建对象和调用load方法。最后,执行ScheduleMessageService。start方法:

public void start() {    // 使用Atomicbolean确保start方法只能有效执行一次    if (started.compareAndSet(false, true)) {        this.timer = new Timer("ScheduleMessageTimerThread", true);        // 所有延迟级别遍历        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {            // key是延迟级别            Integer level = entry.getKey();            // value对应于延迟级别的毫秒数            Long timeDelay = entry.getValue();            // 根据延迟级别获得相应队列的偏移            Long offset = this.offsetTable.get(level);            // 若偏移量为null,则设置为0            if (null == offset) {                offset = 0L;            }            if (timeDelay != null) {                // 为每个延迟级别创建定时任务,                // 第一次启动任务延迟为FIRST_DELAY_TIME,也就是1秒                this.timer.schedule(                        new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);            }        }        // 每隔flushDelayoffsetinterval延迟10秒后执行一次任务。        // 其中,flushdelayoffsetinterval默认配置也是10秒        this.timer.scheduleAtFixedRate(new TimerTask() {            @Override            public void run() {                try {                    // 每个队列消费的持续偏移                    if (started.get()) ScheduleMessageService.this.persist();                } catch (Throwable e) {                    log.error("scheduleAtFixedRate flush exception", e);                }            }        }, 10000, this.defaultMessageStore        .getMessageStoreConfig().getFlushDelayOffsetInterval());    }}

遍历所有延迟级别,根据延迟级别获得相应队列的偏移。如果没有偏移,则设置为0。然后为每个延迟级别创建一个定时任务。第一个启动任务延迟1秒,第二个和以后的启动任务延迟是延迟级别的相应延迟时间。

然后,为每个队列的消费偏移创建了一个定时任务。持久频率由flushdelayofsetinterval属性配置,默认为10秒。

定时任务

ScheduleMessageServicestart方法实施后,每个延迟级别都会创建自己的定时任务。这里定时任务的具体实现在deliverdelayedmesagetimertask类中。它的核心代码是executeontimeup方法。我们来看看主要部分:

// 根据主题和队列ID获取消息队列ConsumeQueue cq =        ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(                TopicValidator.RMQ_SYS_SCHEDULE_TOPIC                , delaylevel2QueueId(delayLevel));

如果没有获得相应的消息队列,则在DELAY_FOR_A_WHILE(默认为100)毫秒后执行任务。如果获得,继续执行以下操作:

// SelectMapedbuferesulttersultter根据消费偏移量从消息队列中获取所有有效消息 bufferCQ = cq.getIndexBuffer(this.offset);

如果你没有得到有效的信息,那就在DELAY_FOR_A_WHILE(默认为100)毫秒后执行任务。如果获得,继续执行以下操作:

// 所有新闻for遍历 (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {    // 获取消息的物理偏移量    long offsetPy = bufferCQ.getByteBuffer().getLong();    // 获取消息的物理长度    int sizePy = bufferCQ.getByteBuffer().getInt();    long tagsCode = bufferCQ.getByteBuffer().getLong();        // 省略部分代码...    long now = System.currentTimeMillis();    // 计算消息应该消耗的时间    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);// 计算下一条消息的偏移量    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE)long countdown = deliverTimestamp - now;    // 省略部分代码...}

如果当前消息不到消费时间,那就是countdown毫秒后执行任务。在消费时间内,继续执行以下操作:

// Messageexttt根据消息的物理偏移量和大小获取消息 msgExt =    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(            offsetPy, sizePy);

若获得消息,则继续执行以下操作:

// 新消息的重建包括:/// 1.删除消息的延迟级别// 2.恢复真实的消息主题和队列Idmessageeextbrokeriner msgInner = this.messageTimeup(msgExt);

// 将消息重新发送到PutMessageresultteresulter putMessageResult =        ScheduleMessageService.this.writeMessageStore                .putMessage(msgInner);

消费者可以立即消费,消除消息的延迟水平,恢复真实的消息主题和队列ID,并将消息发送到真实的消息队列。

由于篇幅有限,源码的细节没有展开太多,感兴趣的朋友可以去GitHub下载源码仔细阅读。

定制化方案

通过以上对源码的分析,可以总结出延迟消息的实现步骤:

  1. 如果消息的延迟级别大于0,则表示该消息为延迟消息,修改该消息的主题为SCHEDULE_TOPIC_XXXX,队列ID为延迟级减1。
  2. SCHEDULE_TOPIC_XXXX队列中。
  3. 根据上次提取的偏移量,定时任务不断从队列中提取所有消息。
  4. 根据消息的物理偏移量和大小再次获取消息。
  5. 根据新闻属性重新创建新闻,删除延迟级别,恢复原主题和队列ID。
  6. 将消息重新发送到原主题队列,供消费者消费。

总结如下图所示:

延迟消息的五种实现方案_定时任务_02

在commitlog中,我们可以根据自定义的延迟时间选择最大的延迟级别,比如15分钟消费的消息,那么最大的延迟级别是10分钟。在ScheduleMessageservice中,判断消息是否真的是消费时间。如果是消费时间,恢复原主题和队列ID;如果没有时间消费,则选择最大延迟级别重新修改主题和队列ID。如下图:

延迟消息的五种实现方案_Redis_03

Commitlog中存储着新闻主体和元数据,只存储在Commitlog中的起始物理偏移、新闻大小和新闻标签的哈希值。虽然新闻需要重新放入队列,但空间浪费仍然相对有限。

优点

支持自定义延迟时间的分布式、高吞吐量、高性能、高可靠性。

缺点

定制RocketMQ不易维护,不能升级新版本。

总结

从延迟消息的概念和应用场景出发,我们逐一讨论了五种不同的实现方案:

  • 使用Redis的Sorted Set结构。
  • rabbitmq使用RabbitMQ delayed message exchange插件。
  • 使用ActiveMQ的5.4及以上版本的延迟消息功能。
  • 使用RocketMQ只支持特定级别的延迟消息。
  • 定制RocketMQ,通过重新计算延迟级别来实现自定义延迟。

以上每个方案都有自己的优缺点,所以没有一个普遍的延迟消息方案,最合适的方案需要根据数据规模和业务需求的实际情况来确定。