内窥镜消息队列发送消息原理
目的
有一个多线程的Java应用程序,使用消息队列来处理命令
时序图
@startumlactor User
participant "sendCmdWhiteBalance()" as Controller
participant CommandConsumer
participant MessageQueueUser -> Controller: 调用sendCmdWhiteBalance()Controller -> MessageQueue: 将白平衡命令加入队列
activate MessageQueueController -> CommandConsumer: 等待命令完成
activate CommandConsumerCommandConsumer -> MessageQueue: 从队列中获取命令
MessageQueue -> CommandConsumer: 返回命令
CommandConsumer -> CommandConsumer: 执行命令
CommandConsumer -> Controller: 命令执行完成
deactivate CommandConsumerController -> User: 返回成功响应@enduml
线程安全和性能优化
使用阻塞队列(ArrayBlockingQueue):
使用ArrayBlockingQueue是因为它是一种线程安全的队列数据结构,它在多线程环境中提供了同步。
它的核心原理是基于锁和条件变量,它能够安全地协调不同线程之间的操作,确保数据的正确性。这是在多线程编程中一种重要的工具,它避免了显式的锁管理,降低了错误的可能性,提高了代码的可维护性。
使用blockingQueue.take():
调用blockingQueue.take()的主要目的是防止线程空转(busy-waiting)。
在没有消息进入队列的情况下,如果你使用简单的轮询(如poll()或isEmpty()检查),线程可能会在空循环中浪费大量的CPU时间。而take()方法会使线程在队列为空时进入阻塞状态,直到队列中有数据可以被取出。这降低了线程的 CPU 消耗,有效地减少了开销,使得线程能够更有效地等待新消息。
Java代码
下面是我的代码
@Component
@Slf4j
public class QueueJob {@PostConstructpublic void QueueJob() {log.info("消费者开启****************************************************************************************************");if (System.getProperty("os.name") != null && System.getProperty("os.name").toLowerCase().startsWith("windows")) {return;}log.info("消费者开启");// 创建消息队列 (MessageQueue) 实例和消费者线程 (CommandConsumer)CommandConsumer commandConsumer = new CommandConsumer(messageQueue);commandConsumer.start();}}@Slf4j
public class CommandConsumer extends Thread {private final MessageQueue messageQueue;public CommandConsumer(MessageQueue messageQueue) {log.info("放入参数");this.messageQueue = messageQueue;}@Overridepublic void run() {log.info("进入消费者模块中***********************************");while (true) {/*** 初始化完成后在执行命令* @author lst* @date 2023/8/8 9:18*/if (!initStatus.get()) {continue;}log.info("run****************************************");CommandQueue commandQueue;try {commandQueue = messageQueue.takeFromQueue();} catch (InterruptedException e) {throw new RuntimeException(e);}if (commandQueue != null) {// 调用 sendMsg 方法发送指令,等待响应成功try {log.info("sendMsg***********************************");sendMsg(commandQueue.getCommand());log.info("success***********************************");} catch (Exception e) {catchSendMsgException(commandQueue.getMethod(), e);} finally {handleCountDownLatch(commandQueue.getMethod());}}}}@Slf4j
public class MessageQueue {/*** 顺序发送* 有应答才能发送下一条指令** @author lst* @date 2023/8/7 13:45* @param null* @return null*/private final BlockingQueue<CommandQueue> blockingQueue = new ArrayBlockingQueue<>(30, true);public void addToQueue(CommandQueue c) {log.info("添加队列中" + c.toString());if (!blockingQueue.offer(c)) {throw new IllegalStateException("队列已满,无法接受新的指令");}}public CommandQueue takeFromQueue() throws InterruptedException {return blockingQueue.take();}public int getQueueFreeSize() {return blockingQueue.remainingCapacity();}
}基于上面我调用白平衡接口,会先将白平衡命令加入到阻塞队列中,阻塞队列中while循环持续消费@Operation(summary = "白平衡")@ConcurrentControl@GetMapping("sendCmdWhiteBalance")public CommonResult sendCmdWhiteBalance() throws InterruptedException {String key = WHITEBALANCE;messageQueue.addToQueue(new CommandQueue("sendCmdWhiteBalance", key));cmdWhiteBalanceCountDownLatch.await();cmdWhiteBalanceCountDownLatch = new CountDownLatch(1);if (cmdWhiteBalanceCountDownLatchException != null) {Exception e = cmdWhiteBalanceCountDownLatchException;cmdWhiteBalanceCountDownLatchException = null;throw new RuntimeException(e);}return CommonResult.success();}