Fanout消息模型
* 广播模型:* 一个交换机绑定多个队列* 每个队列都有一个消费者* 每个消费者消费自己队列中的消息,每个队列的信息是一样的
生产者
package com. example. demo02. mq. fanout ; import com. example. demo02. mq. util. ConnectionUtils ;
import com. rabbitmq. client. BuiltinExchangeType ;
import com. rabbitmq. client. Channel ;
import com. rabbitmq. client. Connection ; import java. io. IOException ;
public class FanoutSender { public static void main ( String [ ] args) throws Exception { Connection connection = ConnectionUtils . getConnection ( ) ; Channel channel = connection. createChannel ( ) ; channel. exchangeDeclare ( "fanout.exchange" , BuiltinExchangeType . FANOUT , false ) ; String msg = "fanout message" ; channel. basicPublish ( "fanout.exchange" , "" , null , msg. getBytes ( ) ) ; channel. close ( ) ; connection. close ( ) ; }
}
消费者1
package com. example. demo02. mq. fanout ; import com. example. demo02. mq. util. ConnectionUtils ;
import com. rabbitmq. client. * ; import java. io. IOException ;
public class FanoutReceiver1 { public static void main ( String [ ] args) throws Exception { Connection connection = ConnectionUtils . getConnection ( ) ; Channel channel = connection. createChannel ( ) ; channel. exchangeDeclare ( "fanout.exchange" , BuiltinExchangeType . FANOUT , false ) ; channel. queueDeclare ( "fanout.queue1" , false , false , false , null ) ; channel. queueBind ( "fanout.queue1" , "fanout.exchange" , "" ) ; Consumer consumer = new DefaultConsumer ( channel) { @Override public void handleDelivery ( String consumerTag, Envelope envelope, AMQP. BasicProperties properties, byte [ ] body) throws IOException { System . out. println ( "Fanout1接收到的消息是:" + new String ( body) ) ; channel. basicAck ( envelope. getDeliveryTag ( ) , false ) ; } } ; channel. basicConsume ( "fanout.queue1" , false , consumer) ; }
}
消费者2
package com. example. demo02. mq. fanout ; import com. example. demo02. mq. util. ConnectionUtils ;
import com. rabbitmq. client. * ; import java. io. IOException ;
public class FanoutReceiver2 { public static void main ( String [ ] args) throws Exception { Connection connection = ConnectionUtils . getConnection ( ) ; Channel channel = connection. createChannel ( ) ; channel. exchangeDeclare ( "fanout.exchange" , BuiltinExchangeType . FANOUT , false ) ; channel. queueDeclare ( "fanout.queue2" , false , false , false , null ) ; channel. queueBind ( "fanout.queue2" , "fanout.exchange" , "" ) ; Consumer consumer = new DefaultConsumer ( channel) { @Override public void handleDelivery ( String consumerTag, Envelope envelope, AMQP. BasicProperties properties, byte [ ] body) throws IOException { System . out. println ( "Fanout2接收到的消息是:" + new String ( body) ) ; channel. basicAck ( envelope. getDeliveryTag ( ) , false ) ; } } ; channel. basicConsume ( "fanout.queue2" , false , consumer) ; }
}
结果