生产者
package com.qf.mq2302.routing;import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class EmitLog {public static final String EXCHANGE_NAME="emitlogs";public static void main(String[] args) throws Exception {Connection connection = MQUtils.getConnection();Channel channel = connection.createChannel();//创建一个路由模式的交换机,默认创出来,不持久化,不自动删除,不是内部交换机channel.exchangeDeclare(EXCHANGE_NAME,"direct");String msg="hello routing!!";//准备routingKeyString routingKey="info";//发送消息channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes("utf-8"));channel.close();connection.close();}}
消费者1号
package com.qf.mq2302.routing;import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;import java.io.IOException;public class ReceiveError {private static final String EXCHANGE_NAME="emitlogs";public static void main(String[] args) throws Exception {Connection connection = MQUtils.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME,"direct");//该消费者创建一个自己独占的队列,绑定到指定交换机接收消息。String queueName = channel.queueDeclare().getQueue();//准备号要绑定时使用的routingkeyString routingKey = "error";//绑定该队列到交换机channel.queueBind(queueName,EXCHANGE_NAME,routingKey);//设置预留消息队列,也就是,RabbitMQ发过来,我可以存几个。当确认一个就会又发过来一个,// 但是这些相当于线程池里的线程,然后每个线程又去开辟一个新的线程去执行,回调方法,// 当回调方法确认完事,才会释放当前这个线程,然后去队列里在消费一个过来。channel.basicQos(1);//autoAck :false不自动确认,需要手动确认,如果手动不确认,就会按照 channel.basicQos(1);的数量,给多少就消费多少,不会再给你发了。channel.basicConsume(queueName, false, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {byte[] body = message.getBody();String msg = new String(body, "utf-8");System.out.println(msg);//手动ACKchannel.basicAck(message.getEnvelope().getDeliveryTag(),false);}},consumerTag -> {});}}
消费者2号
package com.qf.mq2302.routing;import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;import java.io.IOException;public class ReceiveIOther {private static final String EXCHANGE_NAME="emitlogs";public static void main(String[] args) throws Exception {Connection connection = MQUtils.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME,"direct");//该消费者创建一个自己独占的队列,绑定到指定交换机接收消息。String queueName = channel.queueDeclare().getQueue();//准备号要绑定时使用的routingkeyString routingKey1 = "error";String routingKey2 = "info";String routingKey3 = "warn";//绑定该队列到交换机channel.queueBind(queueName,EXCHANGE_NAME,routingKey1);channel.queueBind(queueName,EXCHANGE_NAME,routingKey2);channel.queueBind(queueName,EXCHANGE_NAME,routingKey3);channel.basicQos(1);channel.basicConsume(queueName, false, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {byte[] body = message.getBody();//获取routingKeyString routingKey = message.getEnvelope().getRoutingKey();String msg = new String(body, "utf-8");System.out.println(msg);//手动ACKchannel.basicAck(message.getEnvelope().getDeliveryTag(),false);}},consumerTag -> {});}}