rocketmq也有延迟消息,经典的应用场景:订单30分钟未支付,则取消的场景
其他博客提到从rocketmq5.0开始,支持自定义延迟时间,4.x只支持预定义延迟时间,安装rocketmq可参考RocketMq简介及安装、docker安装rocketmq、安装rocketmq可视化管理端
引入rocketmq5.x对应的客户端
引入支持rocketmq5.x的rocketmq-spring
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version></dependency>
application.yml配置
rocketmq:name-server: ip:9876
生产者
延迟方法有2种,
- 延迟x时间执行,
syncSendDelayTimeMills
,延迟时间单位是毫秒、syncSendDelayTimeSeconds
,延迟时间单位是秒
- 指定未来的某个时间点执行
syncSendDeliverTimeMills
,某个时间点的时间戳
在实践中,我选择了延迟x时间执行
,因为在测试时发现,如果应用宿主机的时间与rocketmq宿主机的时间不同步时会出现问题,比如应用宿主机时间比rocketmq宿主机时间晚了2分钟,发送了个未来2分钟执行的消息,因为宿主机时间差原因,可能会刚发送就会立刻执行。
import com.alibaba.fastjson.JSONObject;
import org.apache.rocketmq.spring.core.RocketMQTemplate;@Autowiredprivate RocketMQTemplate rocketMQTemplate;pubic void sendDelayMsg(){//延迟时间,单位毫秒Integer delay = 5000;String msgBody= JSONObject.toJSONString(someObject);rocketMQTemplate.syncSendDelayTimeMills("myTopic:myTag",msgBody,delay);
}
消费者
import com.alibaba.fastjson.JSONObject;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
@RocketMQMessageListener(topic = "myTopic",selectorExpression = "myTag",consumerGroup = "myConsumerTag")
public class MyMsgListener implements RocketMQListener<String> {@Overridepublic void onMessage(String msgBody) {log.info("收到的消息是:{}", msgBody);final SomeClass someObj = JSONObject.parseObject(msgBody, SomeClass.class);}}