目录
1.POM文件添加依赖及yml配置
2.RocketmqUtil
3.生产者(异步发送示例)
4.消费者
5.测试
1.POM文件添加依赖及yml配置
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version></dependency>
rocketmq:name-server: 127.0.0.1:9876producer:group: My_Groupsend-message-timeout: 3000retry-times-when-send-failed: 3retry-times-when-send-async-failed: 3
2.RocketmqUtil
package com.kaying.marketing.platform.common.util.rocketMq;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;/*** @Description: RocketMQ消息的生产者* @Author: hwk*/@Component
@Slf4j
public class RocketMqUtil {@Autowiredprivate RocketMQTemplate rocketMqTemplate;public void sendMsg(String topic,String data) {rocketMqTemplate.convertAndSend(topic,data);log.info("【RocketMQ】发送同步消息:{}", data);}public void asyncSend(String topic, String tag, String data,Integer messageDelayLevel) {rocketMqTemplate.asyncSend(topic + ":" + tag, MessageBuilder.withPayload(data).build(), new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 消息发送成功log.error("消息发送成功"+sendResult);}@Overridepublic void onException(Throwable throwable) {// 消息发送异常log.error("异步发送消息异常。topic:" + topic + ";tag:" + tag + ";mqMsg" + data, throwable);}},3000L,// messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d// messageDelayLevel = 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19messageDelayLevel);}/*** 发送同步消息:消息响应后发送下一条消息** @param topic 消息主题* @param tag 消息tag* @param key 业务号* @param data 消息内容*/public void sendSyncMsg(String topic, String tag, String key, String data) {//消息Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();//主题String destination = topic + ":" + tag;SendResult sendResult = rocketMqTemplate.syncSend(destination, message);log.info("【RocketMQ】发送同步消息:{}", sendResult);}/*** 发送异步消息:异步回调通知消息发送的状况** @param topic 消息主题* @param tag 消息tag* @param key 业务号* @param data 消息内容*/public void sendAsyncMsg(String topic, String tag, String key, String data) {//消息Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();//主题String destination = topic + ":" + tag;rocketMqTemplate.asyncSend(destination, message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("【RocketMQ】发送异步消息:{}", sendResult);}@Overridepublic void onException(Throwable e) {log.info("【RocketMQ】发送异步消息异常:{}", e.getMessage());}});}/*** 发送单向消息:消息发送后无响应,可靠性差,效率高** @param topic 消息主题* @param tag 消息tag* @param key 业务号* @param data 消息内容*/public void sendOneWayMsg(String topic, String tag, String key, String data) {//消息Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();//主题String destination = topic + ":" + tag;rocketMqTemplate.sendOneWay(destination, message);}/*** 同步延迟消息** @param topic 主题* @param tag 标签* @param key 业务号* @param data 消息体* @param timeout 发送消息的过期时间* @param delayLevel 延迟等级-----固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h**/public void sendSyncDelayMsg(String topic, String tag, String key, String data, long timeout, int delayLevel) {// messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d// messageDelayLevel = 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19//消息Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();//主题String destination = topic + ":" + tag;SendResult sendResult = rocketMqTemplate.syncSend(destination, message, timeout, delayLevel);log.info("【RocketMQ】发送同步延迟消息:{}", sendResult);}/*** 异步延迟消息** @param topic 主题* @param tag 标签* @param key 业务号* @param data 消息体* @param timeout 发送消息的过期时间* @param delayLevel 延迟等级-----固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h*/public void sendAsyncDelayMsg(String topic, String tag, String key, String data, long timeout, int delayLevel) {//消息Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();//主题String destination = topic + ":" + tag;rocketMqTemplate.asyncSend(destination, message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("【RocketMQ】发送异步延迟消息:{}", sendResult);}@Overridepublic void onException(Throwable e) {log.info("【RocketMQ】发送异步延迟消息异常:{}", e.getMessage());}}, timeout, delayLevel);}/*** 同步顺序消息** @param topic 主题* @param tag 标签* @param key 业务号* @param data 消息体*/public void sendSyncOrderlyMsg(String topic, String tag, String key, String data) {//消息Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();//主题String destination = topic + ":" + tag;SendResult sendResult = rocketMqTemplate.syncSendOrderly(destination, message, key);log.info("【RocketMQ】发送同步顺序消息:{}", sendResult);}/*** 异步顺序消息** @param topic 主题* @param tag 标签* @param key 业务号* @param data 消息体*/public void sendAsyncOrderlyMsg(String topic, String tag, String key, String data) {//消息Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();//主题String destination = topic + ":" + tag;rocketMqTemplate.asyncSendOrderly(destination, message, key, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("【RocketMQ】发送异步顺序消息:{}", sendResult);}@Overridepublic void onException(Throwable e) {log.info("【RocketMQ】发送异步顺序消息异常:{}", e.getMessage());}});}
}
3.生产者(异步发送示例)
//异步发送消息代码示例
rocketMqUtil.sendAsyncMsg(RocketConstant.TEST_TOPIC1, RocketConstant.TEST_TAG1, UUID.randomUUID().toString(), "测试消息一");
4.消费者
简单的负载均衡消费的示例(指定topic和tag,相同的组即为负载均衡消费)
也可以指定不同的topic和不同的tag进行消息区分
注意线上和本地连接同一个MQ也会导致负载均衡,导致线上消息丢失
@RocketMQMessageListener(consumerGroup = "1",topic = RocketConstant.TEST_TOPIC1,selectorExpression = RocketConstant.TEST_TAG1)@Servicepublic class RocketConsumerTag1 implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {String orderNo = message;log.info("tag1,接收:{}", orderNo);}}@RocketMQMessageListener(consumerGroup ="1",topic = RocketConstant.TEST_TOPIC1,selectorExpression = RocketConstant.TEST_TAG1)@Servicepublic class RocketConsumerTag2 implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {String orderNo = message;log.info("tag2,接收:{}", orderNo);}}