简单模式
我们以最普通的方式去理解,并没有整合Springboot的那种
这是最简单的模式,一个生产者,一个消费者,一个队列
测试
1、 导包,没整合,不需要编写配置
2、需要生产者消费者
- 导包
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.10.0</version>
</dependency>
- Producer
public class Producer {public static void main(String[] args) {//ip port//创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();//创建连接工程connectionFactory.setHost("47.120.50.213");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("/");//创建连接connectionConnection connection = null;Channel channel = null;try {connection = connectionFactory.newConnection("producer");//通过连接获取通道Channelchannel = connection.createChannel();//通过创建交换机,声明队列,绑定关系,路由key,发送接收消息String queueName = "queue";/*** 队列的名称* 是否要持久化* 排他性,是否独占独立* 是否自动删除,在最后一个消费者消费完后* 携带附属参数*/channel.queueDeclare(queueName,false,false,false,null);String message = "hello world";//发送消息到消息队列channel.basicPublish("",queueName,null,message.getBytes());System.out.println("消息发送成功");} catch (IOException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}finally {//关闭连接if(channel != null && channel.isOpen()){try {channel.close();} catch (IOException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}}if(connection != null && connection.isOpen()){try {connection.close();} catch (IOException e) {throw new RuntimeException(e);}}}}
}
- Consumer
public class Consumer {public static void main(String[] args) {//ip port//创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();//创建连接工程connectionFactory.setHost("47.120.50.213");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("/");//创建连接connectionConnection connection = null;Channel channel = null;try {connection = connectionFactory.newConnection("producer");//通过连接获取通道Channelchannel = connection.createChannel();//第一个是消息队列的名字channel.basicConsume("queue", true, new DeliverCallback() {@Overridepublic void handle(String s, Delivery message) throws IOException {System.out.println("收到的消息的是"+new String(message.getBody(),"UTF-8"));}},new CancelCallback() {@Overridepublic void handle(String s) throws IOException {System.out.println("接收消息失败");}});System.out.println("开始接收消息");System.in.read();} catch (IOException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}finally {//关闭连接if(channel != null && channel.isOpen()){try {channel.close();} catch (IOException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}}if(connection != null && connection.isOpen()){try {connection.close();} catch (IOException e) {throw new RuntimeException(e);}}}}
}
总结
代码流程
-
上述消息没有设置为持久化
-
没持久化,消息创建了依旧存在,除非服务器重启,就会删除
-
持久化,服务器重启后都不会删除
-
发送消息
-
channel.queueDeclare(queueName,false,false,false,null); String message = "hello world"; //发送消息到消息队列 channel.basicPublish("",queueName,null,message.getBytes());
-
-
接收消息
-
channel.basicConsume("queue", true, new DeliverCallback() {@Overridepublic void handle(String s, Delivery message) throws IOException {System.out.println("收到的消息的是"+new String(message.getBody(),"UTF-8"));} },new CancelCallback() {@Overridepublic void handle(String s) throws IOException {System.out.println("接收消息失败");} } );
-
问题
1、连接超时
这里可能是NO access ,点击admin修改
命令方式给用户分配权限
rabbitmqctl set_permissions -p / admin '*' '.*' '.*' 给用户分配权限
发现并没有解决问题
- 访问的端口时5672,因为15672是给web访问的所以需要访问5672
- 需要开通安全组与端口号,即5672,15672都需要开启
#开启端口
[root@iZf8zhsqf64x47n1tpdy6oZ rabbitmq]# firewall-cmd --zone=public --add-port=15672/tcp --permanent
#重启防火墙
firewall-cmd --reload
#需要开启远程安全组
思考
为什么基于channel而不是连接??????
一个应用有多个线程需要从rabbitmq中消费,或是生产消息,那么必然会建立很多个connection ,也就是多个tcp连接,对操作系统而言,建立和销毁tcp连接是很昂贵的开销
,如果遇到使用高峰,性能瓶颈也随之显现,rabbitmq采用类似nio的做法,连接tcp连接复用,不仅可以减少性能开销,同时也便于管理