2.12日学习打卡----初学RocketMQ(三)

2.12日学习打卡

目录:

  • 2.12日学习打卡
  • 一. RocketMQ高级特性(续)
    • 消息重试
    • 延迟消息
    • 消息查询
  • 二.RocketMQ应用实战
    • 生产端发送同步消息
    • 发送异步消息
    • 单向发送消息
    • 顺序发送消息
    • 消费顺序消息
    • 全局顺序消息
    • 延迟消息
    • 事务消息
    • 消息查询

一. RocketMQ高级特性(续)

消息重试

生产端重试

例如由于网络原因导致生产者发送消息到MQ失败,即发送端没有收到Broker的ACK,导致最终Consumer无法消费消息,此时RocketMQ会自动进行重试。

// 同步发送消息,如果5秒内没有发送成功,则重试3次
DefaultMQProducer producer = new DefaultMQProducer("DefaultProducer");
producer.setRetryTimesWhenSendFailed(3);
producer.send(msg, 5000L);

消费端重试

同样的,由于网络原因,Broker发送消息给消费者后,没有受到消费端的ACK响应,所以Broker又会尝试将消息重新发送给Consumer,在实际开发过程中,我们更应该考虑的是消费端的重试。消费端的消息重试可以分为顺序消息的重试以及无序消息的重试。

  • 顺序消息重试
    对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ
    会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用
    会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必
    保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的
    发生

  • 无序消息重试
    对于无序消息(普通、定时、延时、事务消息),当消费者消费
    消息失败时,可以通过设置返回状态达到消息重试的结果。

    • 最大重试次数
      消息消费失败后,可被消息队列RocketMQ重复投递的最大
      次数。
      TCP协议无序消息重试时间间隔:
      在这里插入图片描述
    • 消费失败后重新配置方式
    • 需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):
      • 返回 ConsumeConcurrentlyStatus.RECONSUME_LATER; (推荐)
      • 返回 Null
      • 抛出异常

    延迟消息

    Producer将消息发送到消息队列RocketMQ服务端,但并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。
    消息生产和消费有时间窗口要求,例如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在30分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略。通过消息触发一些定时任务,例如在某一固定时间点向用户发送提醒消息。
    定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并且会根据
    delayTimeLevel存入特定的queue,queueId = delayTimeLevel –1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。

//
org/apache/rocketmq/store/config/MessageStore
Config.java
private String messageDelayLevel = "1s 5s 10s
30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h
2h";

代码测试
生产者

package com.jjy.produce;import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;public class DelayMessageProducer {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer=new DefaultMQProducer("producer_group");producer.setNamesrvAddr("192.168.66.100:9876");producer.start();Message message=null;for(int i=0; i<20;i++){message=new Message("tp_demo_3",("hello RocketMQ delayMessage -"+i).getBytes());//设置延迟时间0-17 0表示2秒 大于18都是2小时message.setDelayTimeLevel(i);producer.send(message);}producer.shutdown();}
}

消费者

package com.jjy.consumer;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.message.MessageExt;public class DelayMessageConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("transactionConsumer-grp");consumer.setNamesrvAddr("192.168.66.100:9876");System.out.println("==========================================");//设置消息重试次数consumer.setMaxReconsumeTimes(5);//设置可以批量处理consumer.setConsumeMessageBatchMaxSize(1);//订阅主题consumer.subscribe("tp_demo_3","*");consumer.setMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {System.out.println(System.currentTimeMillis()/1000);for(MessageExt message:list){System.out.println(message.getTopic()+"\t"+message.getQueueId()+"\t"+message.getDelayTimeLevel()+"\t"+new String(message.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();}
}

运行结果
在这里插入图片描述

消息查询

在这里插入图片描述

在实际开发中,经常需要查看MQ中消息的内容来排查问题。RocketMQ提供了三种消息查询的方式,分别是按Message ID、Message Key以及Unique Key查询。

//返回结果
SendResult [sendStatus=SEND_OK,
msgId=C0A801030D4B18B4AAC247DE4A0D0000,
offsetMsgId=C0A8010300002A9F000000000007BEE9,messageQueue=MessageQueue [topic=TopicA,
brokerName=broker-a, queueId=0],queueOffset=0]
  • 按MessageId查询消息
    Message Id 是消息发送后,在Broker端生成的,其包含了
    Broker的地址、偏移信息,并且会把Message Id作为结果的一
    部分返回。Message Id中属于精确匹配,代表唯一一条消息,
    查询效率更高。

  • 按照Message Key查询消息
    消息的key是开发人员在发送消息之前自行指定的,通常把具有
    业务含义,区分度高的字段作为消息的key,如用户id,订单id
    等。

  • 按照Unique Key查询消息
    除了开发人员指定的消息key,生产者在发送发送消息之前,会
    自动生成一个UNIQ_KEY,设置到消息的属性中,从逻辑上唯一
    代表一条消息

消息在消息队列RocketMQ中存储的时间默认为3天(不建议修
改),即只能查询从消息发送时间算起3天内的消息,三种查询方式
的特点和对比如下表所述:
在这里插入图片描述

二.RocketMQ应用实战

在这里插入图片描述

生产端发送同步消息

同步发送是指消息发送方发出数据后,同步等待,直到收到接收方发回响应之后才发下一个请求
在这里插入图片描述

package com.jjy.produce;import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;import java.io.UnsupportedEncodingException;public class SyncProducer {public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, MQBrokerException, RemotingException, InterruptedException {//实例化消息生产者DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// 设置NameServer的地址producer.setNamesrvAddr("192.168.66.100:9876");// 启动Producer实例producer.start();for (int i = 0; i < 100; i++) {// 创建消息,并指定Topic,Tag和消息体Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);// 发送消息到一个BrokerSendResult sendResult = producer.send(msg);// 通过sendResult返回消息是否成功送达System.out.printf("%s%n", sendResult);}// 如果不再发送消息,关闭Producer实例。producer.shutdown();}
}

运行结果

SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2DFEB0000, offsetMsgId=AC12FA2E00002A9F0000000000012E3A, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=1], queueOffset=100]
SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2E0030001, offsetMsgId=AC12FA2E00002A9F0000000000012EF8, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=2], queueOffset=101]
SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2E0060002, offsetMsgId=AC12FA2E00002A9F0000000000012FB6, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=3], queueOffset=100]
SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2E0090003, offsetMsgId=AC12FA2E00002A9F0000000000013074, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=0], queueOffset=99]
... ...

Message ID:消息的全局唯一标识(内部机制的ID生成是使用机器IP和消息偏移量的组成,所以有可能重复,如果是幂等性还是最好考虑Key),由消息队列MQ系统自动生成,唯一标识某条消息。

SendStatus:发送的标识。成功,失败等

Queue:相当于是Topic的分区;用于并行发送和接收消息

发送异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。在这里插入图片描述
消息发送方在发送了一条消息后,不等接收方发回响应,接着进行第二条消息发送。发送方通过回调接口的方式接收服务器响应,并对响应结果进行处理

package com.jjy.produce;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.CountDownLatch2;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;import java.io.UnsupportedEncodingException;
import java.util.concurrent.TimeUnit;public class AsyncProducer {public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// 设置NameServer的地址producer.setNamesrvAddr("192.168.66.100:9876");// 启动Producer实例producer.start();//消息失败重试次数producer.setRetryTimesWhenSendAsyncFailed(0);int messageCount = 100;// 根据消息数量实例化倒计时计算器final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);//创建消息for(int i=0;i<messageCount;i++){final int index=i;Message msg=new Message("topic_demo","TagA","OrderID188","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));// SendCallback接收异步返回结果的回调producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {countDownLatch.countDown();System.out.printf("%-10d OK %s %n", index,sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {countDownLatch.countDown();System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();}});}// 等待5scountDownLatch.await(5, TimeUnit.SECONDS);// 如果不再发送消息,关闭Producer实例。producer.shutdown();}
}

单向发送消息

这种方式主要用在不特别关心发送结果的场景,例如日志发送
在这里插入图片描述
单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别

package com.jjy.produce;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class OnewayProducer {public static void main(String[] args) throws Exception{// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// 设置NameServer的地址producer.setNamesrvAddr("192.168.66.100:9876");// 启动Producer实例producer.start();for (int i = 0; i < 100; i++) {// 创建消息,并指定Topic,Tag和消息体Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);// 发送单向消息,没有任何返回结果producer.sendOneway(msg);}// 如果不再发送消息,关闭Producer实例。producer.shutdown();}
}

消息发送时的权衡

发送方式发送TPS发送结果反馈可靠性使用场景
同步发送可靠邮件、短信、推送
异步发送可靠视频转码
单向发送最快可能丢失日志收集

顺序发送消息

一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。

package com.jjy.produce;import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;import java.util.List;public class OrderProducer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {DefaultMQProducer producer = new DefaultMQProducer("producer_grp_02");producer.setNamesrvAddr("192.168.66.100:9876");producer.start();// 获取指定主题的MQ列表final List<MessageQueue> messageQueues = producer.fetchPublishMessageQueues("tp_demo_11");Message message = null;MessageQueue messageQueue = null;for (int i = 0; i < 100; i++) {// 采用轮询的方式指定MQ,发送订单消息,保证同一个订单的消息按顺序// 发送到同一个MQmessageQueue = messageQueues.get(i % 8);//创建message对象//发送创建订单消息message = new Message("tp_demo_11", ("hello rocketmq order create - " + i).getBytes());producer.send(message, messageQueue);//发送付款订单消息message = new Message("tp_demo_11", ("hello rocketmq order pay - " + i).getBytes());producer.send(message, messageQueue);//发送订单送货消息message = new Message("tp_demo_11", ("hello rocketmq order delivery - " + i).getBytes());producer.send(message, messageQueue);}producer.shutdown();}
}

消费顺序消息

package com.jjy.consumer;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class OrderConsumer {public static void main(String[] args) throws MQClientException {//创建消费者对象DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_02");consumer.setNamesrvAddr("192.168.66.100:9876");//订阅主题consumer.subscribe("tp_demo_11", "*");//最小消费线程数consumer.setConsumeThreadMin(1);//最大消费线程数consumer.setConsumeThreadMax(1);//一次拉取的消息数量consumer.setPullBatchSize(1);//一次消费的消息数量consumer.setConsumeMessageBatchMaxSize(1);// 使用有序消息监听器consumer.setMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,  ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {System.out.println(msg.getTopic() + "\t" +msg.getQueueId() + "\t" +new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();}
}

全局顺序消息

生产者

package com.jjy.produce;import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;import java.util.List;public class GlobalOrderProducer {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("producer_grp_02");producer.setNamesrvAddr("192.168.66.100:9876");producer.start();Message message = null;for(int i=0;i<100;i++){message=new Message("tp_demo_11",("全局有序消息...."+i).getBytes());producer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {return list.get((Integer)0);}},1);}producer.shutdown();}
}

消费者

package com.jjy.consumer;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class GlobalConsumer {public static void main(String[] args) throws MQClientException {//创建消费者对象DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_02");consumer.setNamesrvAddr("192.168.66.100:9876");//订阅主题consumer.subscribe("tp_demo_11", "*");//最小消费线程数consumer.setConsumeThreadMin(1);//最大消费线程数consumer.setConsumeThreadMax(1);//一次拉取的消息数量consumer.setPullBatchSize(1);//一次消费的消息数量consumer.setConsumeMessageBatchMaxSize(1);consumer.setMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {for (MessageExt msg : list) {System.out.println("消费线程=" + Thread.currentThread().getName() +", queueId=" + msg.getQueueId() + ", 消息内容:" + new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();}
}

延迟消息

生产者

package com.jjy.produce;import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;public class DelayMessageProducer {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer=new DefaultMQProducer("producer_group");producer.setNamesrvAddr("192.168.66.100:9876");producer.start();Message message=null;for(int i=0; i<20;i++){message=new Message("tp_demo_3",("hello RocketMQ delayMessage -"+i).getBytes());// 设置延迟时间级别0,18,0表示不延迟,18表示延迟2h,大于18的都是2hmessage.setDelayTimeLevel(i);producer.send(message);}producer.shutdown();}
}

消费者

package com.jjy.consumer;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.message.MessageExt;public class DelayMessageConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("transactionConsumer-grp");consumer.setNamesrvAddr("192.168.66.100:9876");System.out.println("==========================================");//设置消息重试次数consumer.setMaxReconsumeTimes(5);//设置可以批量处理consumer.setConsumeMessageBatchMaxSize(1);//订阅主题consumer.subscribe("tp_demo_3","*");consumer.setMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {System.out.println(System.currentTimeMillis()/1000);for(MessageExt message:list){System.out.println(message.getTopic()+"\t"+message.getQueueId()+"\t"+message.getDelayTimeLevel()+"\t"+new String(message.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();}
}

事务消息

生产者

package com.jjy.produce;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;public class  TransactionProducer {public static void main(String[] args) throws MQClientException {TransactionListener listener=new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message message, Object o) {// 当发送事务消息prepare(half)成功后,调用该方法执行本地事务System.out.println("执行本地事务......");// return LocalTransactionState.COMMIT_MESSAGE;//使用下面事务回滚 消费端无法接收到消息了try {Thread.sleep(100000);} catch (InterruptedException e) {e.printStackTrace();}return LocalTransactionState.ROLLBACK_MESSAGE;}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt messageExt) {// 该方法用于获取本地事务执行的状态。System.out.println("检查本地事务的状态:" + messageExt);return LocalTransactionState.COMMIT_MESSAGE;}};//创建事务消息生产者TransactionMQProducer producer=new TransactionMQProducer("producer_grp_01");//设置事务监听器producer.setTransactionListener(listener);producer.setNamesrvAddr("192.168.66.100:9876");producer.start();Message message=null;message=new Message("tp_demo_11","hello translation message".getBytes());producer.sendMessageInTransaction(message,"{\" name\":\"zhansan\"}");}
}

消费者

package com.jjy.consumer;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class TransactionMsgConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("txconsumer_grp_12_01");consumer.setNamesrvAddr("192.168.66.100:9876");consumer.subscribe("tp_demo_11", "*");consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.println(new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}}

消息查询

package com.itbaizhan.consumer;public class QueryingMessageDemo {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {//创建消费者对象DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_01");//设置nameserver地址consumer.setNamesrvAddr("192.168.66.100:9876");//设置消息监听器consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();//根据messageId查询消息MessageExt message = consumer.viewMessage("topic_springboot_demo_02", "C0A88B8000002A9F000000000000C8E8");System.out.println(message);System.out.println(message.getMsgId());consumer.shutdown();}
}

如果我的内容对你有帮助,请点赞,评论,收藏。创作不易,大家的支持就是我坚持下去的动力!
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/258445.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

红蓝对抗:网络安全领域的模拟实战演练

引言&#xff1a; 随着信息技术的快速发展&#xff0c;网络安全问题日益突出。为了应对这一挑战&#xff0c;企业和组织需要不断提升自身的安全防护能力。红蓝对抗作为一种模拟实战演练方法&#xff0c;在网络安全领域得到了广泛应用。本文将介绍红蓝对抗的概念、目的、过程和…

【精品】关于枚举的高级用法

枚举父接口 public interface BaseEnum {Integer getCode();String getLabel();/*** 根据值获取枚举** param code* param clazz* return*/static <E extends Enum<E> & BaseEnum> E getEnumByCode(Integer code, Class<E> clazz) {Objects.requireNonN…

ASCII编码的诞生:解决字符标准化与跨平台通信的需求

title: ASCII编码的诞生&#xff1a;解决字符标准化与跨平台通信的需求 date: 2024/2/17 14:27:01 updated: 2024/2/17 14:27:01 tags: ASCII编码标准化跨平台字符集兼容性简洁性影响力 在计算机的发展过程中&#xff0c;字符的表示和传输一直是一个重要的问题。为了实现字符的…

python-自动化篇-终极工具-用GUI自动控制键盘和鼠标-pyautogui

文章目录 用GUI自动控制键盘和鼠标pyautogui 模块鼠标——记忆宫殿屏幕位置——移动地图——pyautogui.size鼠标位置——自身定位——pyautogui.position()移动鼠标——pyautogui.moveTo拖动鼠标——滚动鼠标——scroll 键盘按下键盘释放键盘 开始与结束通过注销关闭所有程序 用…

linux系统zabbix监控分布式监控的部署

分布式监控 服务器安装分布式监控安装工具安装mysql导入数据结构配置proxy端浏览器配置 zabbix server端监控到大量zabbix agent端&#xff0c;这样会使zabbix server端压力过大&#xff0c;使用zabbix proxy进行分布式监控 服务器安装分布式监控 安装工具 rpm -Uvh https://…

HTML | DOM | 网页前端 | 常见HTML标签总结

文章目录 1.前端开发简单分类2.前端开发环境配置3.HTML的简单介绍4.常用的HTML标签介绍 1.前端开发简单分类 前端开发&#xff0c;这里是一个广义的概念&#xff0c;不单指网页开发&#xff0c;它的常见分类 网页开发&#xff1a;前端开发的主要领域&#xff0c;使用HTML、CS…

网络安全威胁,如何解决缓冲区溢出攻击

目录 一、什么是网络安全 二、什么是缓冲区 三、缓冲区溢出 四、缓冲区溢出攻击的类型 一、什么是网络安全 网络安全&#xff08;Network Security&#xff09;指的是保护计算机网络及其相关设备、系统和数据免受未经授权访问、破坏、篡改、窃取或滥用的威胁和攻击。随着网…

单片机学习笔记---LCD1602

LCD1602介绍 LCD1602&#xff08;Liquid Crystal Display&#xff09;液晶显示屏是一种字符型液晶显示模块&#xff0c;可以显示ASCII码的标准字符和其它的一些内置特殊字符&#xff08;比如日文的片假名&#xff09;&#xff0c;还可以有8个自定义字符 显示容量&#xff1a;…

linux系统zabbix工具监控web页面

web页面监控 内建key介绍浏览器配置浏览器页面查看方式 监控指定的站点的资源下载速度&#xff0c;及页面响应时间&#xff0c;还有响应代码&#xff1b; web Scenario&#xff1a; web场景&#xff08;站点&#xff09;web page &#xff1a;web页面&#xff0c;一个场景有多…

Excel TEXT函数格式化日期

一. 基本语法 ⏹Excel 的 TEXT 函数用于将数值或日期格式化为指定的文本格式 TEXT(value, format_text)二. 拼接路径案例 # 将当前单元格日期格式化 "ls -ld /data/jmw/01/"&TEXT(A2,"YYYYMMDD")&""# 此处的日期, 是名称管理器里面定…

深入解析鸿蒙系统的页面路由(Router)机制

鸿蒙系统以其独特的分布式架构和跨设备的统一体验而备受瞩目。在这个系统中&#xff0c;页面路由&#xff08;Router&#xff09;机制是连接应用各页面的关键组成部分。本文将深入探讨鸿蒙系统的页面路由&#xff0c;揭示其工作原理、特点以及在应用开发中的实际应用。 1. 实现…

【每日一题】06 排序链表

问题描述 给你链表的头结点 head &#xff0c;请将其按 升序 排列并返回 排序后的链表 。 求解 /*** Definition for singly-linked list.* struct ListNode {* int val;* struct ListNode *next;* };*/ struct ListNode* sortList(struct ListNode* head) {struct…

什么是 Flet?

什么是 Flet&#xff1f; Flet 是一个框架&#xff0c;允许使用您喜欢的语言构建交互式多用户 Web、桌面和移动应用程序&#xff0c;而无需前端开发经验。 您可以使用基于 Google 的 Flutter 的 Flet 控件为程序构建 UI。Flet 不只是“包装”Flutter 小部件&#xff0c;而是…

【数学建模】【2024年】【第40届】【MCM/ICM】【A题 七鳃鳗性别比与资源可用性】【解题思路】

我们通过将近半天的搜索数据&#xff0c;查到了美国五大湖中优势物种的食物网数据&#xff0c;以Eric伊利湖为例&#xff0c;共包含34各优势物种&#xff0c;相互之间的关系如下图所示&#xff1a; 一、题目 &#xff08;一&#xff09; 赛题原文 2024 MCM Problem A: Reso…

【设计模式】springboot3项目整合模板方法深入理解设计模式之模板方法(Template Method)

&#x1f389;&#x1f389;欢迎光临&#x1f389;&#x1f389; &#x1f3c5;我是苏泽&#xff0c;一位对技术充满热情的探索者和分享者。&#x1f680;&#x1f680; &#x1f31f;特别推荐给大家我的最新专栏《Spring 狂野之旅&#xff1a;底层原理高级进阶》 &#x1f680…

K8S集群实践之十:虚拟机部署阶段性总结

目录 1. 说明&#xff1a; 2. 安装准备 2.1 每个节点设置双网卡&#xff0c;一卡做网桥&#xff08;外部访问&#xff09;&#xff0c;一卡做NAT&#xff08;集群内网访问&#xff09; 2.2 准备一个可用的代理服务器 3. 由于虚拟机崩溃&#xff08;停电&#xff0c;宿主机…

探索Gorm - Golang流行的数据库ORM框架

&#x1f3f7;️个人主页&#xff1a;鼠鼠我捏&#xff0c;要死了捏的主页 &#x1f3f7;️系列专栏&#xff1a;Golang全栈-专栏 &#x1f3f7;️个人学习笔记&#xff0c;若有缺误&#xff0c;欢迎评论区指正 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&…

每日五道java面试题之java基础篇(九)

目录&#xff1a; 第一题 你们项⽬如何排查JVM问题第二题 ⼀个对象从加载到JVM&#xff0c;再到被GC清除&#xff0c;都经历了什么过程&#xff1f;第三题 怎么确定⼀个对象到底是不是垃圾&#xff1f;第四题 JVM有哪些垃圾回收算法&#xff1f;第五题 什么是STW&#xff1f; 第…

Spring Resource

java.net.URL 类可用于访问带有各种URL前缀的资源&#xff0c;但是对于访问一些资源还是不够方便。比如不能从类路径或者相对于ServletContext来获取资源。而Spring 的Resource接口&#xff0c;则可以通过类路径等方式来访问资源。 1 Resource接口 图 Resource接口及方法 getI…

数据结构与算法:双向链表

朋友们大家好啊&#xff0c;在上节完成单链表的讲解后&#xff0c;我们本篇文章来对带头循环双向链表进行讲解 双向链表 双向链表、头节点和循环的介绍构建双向链表节点的构建初始化双向循环链表&#xff08;空链表&#xff09;销毁双向链表 链表的打印双向链表头尾的插与删尾插…