Direct消息模型
* 路由模型:
* 一个交换机可以绑定多个队列
* 生产者给交换机发送消息时,需要指定消息的路由键
* 消费者绑定队列到交换机时,需要指定所需要消费的信息的路由键
* 交换机会根据消息的路由键将消息转发到对应的队列* 缺点:
* 当消息很多的时候,需要指定的路由键也会很多,究极复杂。
生产者
package com. example. demo02. mq. direct ; import com. example. demo02. mq. util. ConnectionUtils ;
import com. rabbitmq. client. BuiltinExchangeType ;
import com. rabbitmq. client. Channel ;
import com. rabbitmq. client. Connection ; import java. io. IOException ;
public class DirectSender { public static void main ( String [ ] args) throws Exception { Connection connection = ConnectionUtils . getConnection ( ) ; Channel channel = connection. createChannel ( ) ; channel. exchangeDeclare ( "direct.exchange" , BuiltinExchangeType . DIRECT , false ) ; String msg1 = "{To DirectReceiver1: orderId:1001}" ; String msg2 = "{To DirectReceiver2: orderId:1002}" ; channel. basicPublish ( "direct.exchange" , "order.save" , null , msg1. getBytes ( ) ) ; channel. basicPublish ( "direct.exchange" , "order.update" , null , msg2. getBytes ( ) ) ; channel. close ( ) ; connection. close ( ) ; }
}
消费者1
package com. example. demo02. mq. direct ; import com. example. demo02. mq. util. ConnectionUtils ;
import com. rabbitmq. client. * ; import java. io. IOException ;
public class DirectReceiver1 { public static void main ( String [ ] args) throws Exception { Connection connection = ConnectionUtils . getConnection ( ) ; Channel channel = connection. createChannel ( ) ; channel. exchangeDeclare ( "direct.exchange" , BuiltinExchangeType . DIRECT , false ) ; channel. queueDeclare ( "direct.queue1" , false , false , false , null ) ; channel. queueBind ( "direct.queue1" , "direct.exchange" , "order.save" ) ; Consumer consumer = new DefaultConsumer ( channel) { @Override public void handleDelivery ( String consumerTag, Envelope envelope, AMQP. BasicProperties properties, byte [ ] body) throws IOException { System . out. println ( "DirectReceiver1接收到的新增订单消息是:" + new String ( body) ) ; channel. basicAck ( envelope. getDeliveryTag ( ) , false ) ; } } ; channel. basicConsume ( "direct.queue1" , false , consumer) ; }
}
消费者2
package com. example. demo02. mq. direct ; import com. example. demo02. mq. util. ConnectionUtils ;
import com. rabbitmq. client. * ; import java. io. IOException ;
public class DirectReceiver2 { public static void main ( String [ ] args) throws Exception { Connection connection = ConnectionUtils . getConnection ( ) ; Channel channel = connection. createChannel ( ) ; channel. exchangeDeclare ( "direct.exchange" , BuiltinExchangeType . DIRECT , false ) ; channel. queueDeclare ( "direct.queue2" , false , false , false , null ) ; channel. queueBind ( "direct.queue2" , "direct.exchange" , "order.update" ) ; Consumer consumer = new DefaultConsumer ( channel) { @Override public void handleDelivery ( String consumerTag, Envelope envelope, AMQP. BasicProperties properties, byte [ ] body) throws IOException { System . out. println ( "DirectReceiver2接收到的修改订单消息:" + new String ( body) ) ; channel. basicAck ( envelope. getDeliveryTag ( ) , false ) ; } } ; channel. basicConsume ( "direct.queue2" , false , consumer) ; }
}
结果