我用ChatGPT,给RabbitMQ加了个连接池

上次我把 RabbitMQ 集成到项目中,但是每次使用 RabbitMQ 时都去 New 一个连接,导致并发起不来,所以这次我们就给 RabbitMQ 加一个连接池。

为了偷个懒,我直接用 ChatGPT 教我加。

01 ChatGPT 生成的代码

我输入的关键字如下:

 

复制代码

rabbitmq 实现一个连接池 java

下面是 GhatGPT 4.0 给我的回复:

 

java

复制代码

以下是在Java中使用连接池创建RabbitMQ连接的示例: 首先,需要添加pika客户端依赖项,例如: <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.9.0</version> </dependency> 创建一个RabbitMQ连接类: import com.rabbitmq.client.Connection; public class RabbitMQConnection { private Connection connection; public RabbitMQConnection(String host, int port, String username, String password) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setPort(port); factory.setUsername(username); factory.setPassword(password); try { connection = factory.newConnection(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } } public Connection getConnection() { return connection; } public void close() { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } 创建一个RabbitMQ连接池类: import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class RabbitMQConnectionPool { private final BlockingQueue<RabbitMQConnection> pool; public RabbitMQConnectionPool(String host, int port, String username, String password, int poolSize) { pool = new LinkedBlockingQueue<>(poolSize); for (int i = 0; i < poolSize; i++) { pool.add(new RabbitMQConnection(host, port, username, password)); } } public RabbitMQConnection getConnection() throws InterruptedException { return pool.take(); } public void returnConnection(RabbitMQConnection connection) { pool.add(connection); } public void close() { pool.forEach(RabbitMQConnection::close); } } 创建一个消费者类: import com.rabbitmq.client.*; public class RabbitMQConsumer implements Runnable { private final RabbitMQConnectionPool connectionPool; private final String queueName; public RabbitMQConsumer(RabbitMQConnectionPool connectionPool, String queueName) { this.connectionPool = connectionPool; this.queueName = queueName; } @Override public void run() { try { Connection connection = connectionPool.getConnection().getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(queueName, false, false, false, null); channel.basicConsume(queueName, true, (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("Received message: " + message); }, consumerTag -> {}); } catch (InterruptedException | IOException e) { e.printStackTrace(); } } } 创建一个生产者类: import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.MessageProperties; public class RabbitMQProducer { private final RabbitMQConnectionPool connectionPool; private final String queueName; public RabbitMQProducer(RabbitMQConnectionPool connectionPool, String queueName) { this.connectionPool = connectionPool; this.queueName = queueName; } public void send(String message) { try { Connection connection = connectionPool.getConnection().getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(queueName, false, false, false, null); channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println("Sent message: " + message); channel.close(); connection.close(); } catch (InterruptedException|IOException e) { e.printStackTrace(); } } } 创建一个RabbitMQ连接池和多个消费者来消费队列: public class Main { public static void main(String[] args) throws InterruptedException { RabbitMQConnectionPool connectionPool = new RabbitMQConnectionPool("localhost", 5672, "guest", "guest", 10); RabbitMQProducer producer = new RabbitMQProducer(connectionPool, "hello"); producer.send("Hello, world!"); RabbitMQConsumer consumer1 = new RabbitMQConsumer(connectionPool, "hello"); Thread thread1 = new Thread(consumer1); thread1.start(); RabbitMQConsumer consumer2 = new RabbitMQConsumer(connectionPool, "hello"); Thread thread2 = new Thread(consumer2); thread2.start(); thread1.join(); thread2.join(); connectionPool.close(); } } 该示例代码创建了大小为10的连接池,并创建了两个消费者线程来消费队列,并使用一个生产者向队列发送消息。每个消费者从连接池获取连接,并消费队列中的消息,当完成任务后,将连接还回连接池。在实际部署中,您需要根据您的应用程序需求选择合适大小的池,并调整代码以使用正确的参数。

我靠,这代码写的真牛逼,我需要做的事情,就是把里面连接池的部分,集成到我的项目中。

02 给 RabbitMQ 加连接池

2.1 加入连接池

之前我们给 ConnectionFactory 加了个单例工厂,具体可以参考我上一篇文章,由于我们有连接池,所以这个单例工厂就直接废弃了。

 

typescript

复制代码

/** * @author Louzai * @date 2023/5/10 */ public class RabbitmqConnection { private Connection connection; public RabbitmqConnection(String host, int port, String userName, String password, String virtualhost) { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(userName); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualhost); try { connection = connectionFactory.newConnection(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } } /** * 获取链接 * * @return */ public Connection getConnection() { return connection; } /** * 关闭链接 * */ public void close() { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } }

这个代码就是 ChatGPT 给我写的,我是直接 Copy 过来,然后稍微改动了一下。

 

arduino

复制代码

public class RabbitmqConnectionPool { private static BlockingQueue<RabbitmqConnection> pool; public static void initRabbitmqConnectionPool(String host, int port, String userName, String password, String virtualhost, Integer poolSize) { pool = new LinkedBlockingQueue<>(poolSize); for (int i = 0; i < poolSize; i++) { pool.add(new RabbitmqConnection(host, port, userName, password, virtualhost)); } } public static RabbitmqConnection getConnection() throws InterruptedException { return pool.take(); } public static void returnConnection(RabbitmqConnection connection) { pool.add(connection); } public static void close() { pool.forEach(RabbitmqConnection::close); } }

2.2 RabbitMQ 发送消费消息

RabbitMQ 发送消息:从连接池拿到连接 -> 创建通道 -> 声明交换机 -> 发送消息 -> 将连接归还连接池。

这里的逻辑基本和之前的一样,只是之前是 New 一个连接,现在是直接从连接池拿到连接,然后最后多了一步归还连接的操作。

 

typescript

复制代码

@Override public void publishMsg(String exchange, BuiltinExchangeType exchangeType, String toutingKey, String message) { try { //创建连接 RabbitmqConnection rabbitmqConnection = RabbitmqConnectionPool.getConnection(); Connection connection = rabbitmqConnection.getConnection(); //创建消息通道 Channel channel = connection.createChannel(); // 声明exchange中的消息为可持久化,不自动删除 channel.exchangeDeclare(exchange, exchangeType, true, false, null); // 发布消息 channel.basicPublish(exchange, toutingKey, null, message.getBytes()); System.out.println("Publish msg:" + message); channel.close(); RabbitmqConnectionPool.returnConnection(rabbitmqConnection); } catch (InterruptedException | IOException | TimeoutException e) { e.printStackTrace(); } }

RabbitMQ 消费消息:从连接池拿到连接 -> 创建通道 -> 确定消息队列 -> 绑定队列到交换机 -> 接受并消费消息 -> 将连接归还连接池。

同上,这里的逻辑基本和之前的一样,只是多了一个拿连接和归还连接的过程。

 

java

复制代码

@Override public void consumerMsg(String exchange, String queueName, String routingKey) { try { //创建连接 RabbitmqConnection rabbitmqConnection = RabbitmqConnectionPool.getConnection(); Connection connection = rabbitmqConnection.getConnection(); //创建消息信道 final Channel channel = connection.createChannel(); //消息队列 channel.queueDeclare(queueName, true, false, false, null); //绑定队列到交换机 channel.queueBind(queueName, exchange, routingKey); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Consumer msg:" + message); // 获取Rabbitmq消息,并保存到DB // 说明:这里仅作为示例,如果有多种类型的消息,可以根据消息判定,简单的用 if...else 处理,复杂的用工厂 + 策略模式 notifyService.saveArticleNotify(JsonUtil.toObj(message, UserFootDO.class), NotifyTypeEnum.PRAISE); channel.basicAck(envelope.getDeliveryTag(), false); } }; // 取消自动ack channel.basicConsume(queueName, false, consumer); RabbitmqConnectionPool.returnConnection(rabbitmqConnection); } catch (InterruptedException | IOException e) { e.printStackTrace(); } }

这个代码,其实 ChatGPT 写的有问题,你再回过头去看 ChatGPT 写的代码,发现连接取出,但是没有归还,那会出现什么问题呢?

这里给大家分析一下,由于我们的连接池用的是 BlockingQueue,连接池大小是 5,如果连接全部取出并都不归还,当第 6 个请求过来后,请求就卡住了,导致界面操作会被阻塞,请求完全没有反应。

不要问我怎么知道,因为我是踩坑过来的。

2.3 代码仓库

为了方便大家学习功能演变的过程,每个模块都会单独开个分支,连接池的分支和仓库如下:

  • 代码仓库:github.com/itwanger/pa…
  • 代码分支:feature/rabbitmq_connection_pool_20230511

如果需要运行 RabbitMQ,下面的配置需要改成 true,因为代码默认是 false。

3 实际效果

我们是把技术派的“点赞”功能消息,通过 RabbitMQ 方式处理,我们多次点击“点赞”按钮,触发 RammitMQ 消息发送。

可以通过日志,也可以看到发送和消费过的消息。

最后就是大家关心的连接池个数,打开 RabbitMQ 后台,发现永远只有 5 个连接,和我们的连接池大小一致,符合预期。

再看看打开的 Channel,由于每次都关闭,所以也没有了,也符合预期。

这里抛个疑问,每次新开一个 Channel,用完后关闭,是不是也很耗时?是否需要给 Channel 也搞一个连接池呢?可以评论区告诉我哈~~

4 后记

如果用 ChatGPT 3.5,给的结果就不一样,需要加入更多关键字,如果需要达到 GhatGPT 4.0 给的结果,你需要给 ChatGPT 3.5 以下关键字。

 

复制代码

rabbitmq 用BlockingQueue实现一个连接池 java

再回到 RabbitMQ 本身,上一篇文章只是告诉大家 RabbitMQ 的基本原理,以及如何集成 RabbitMQ,这篇文章主要教大家加连接池,其实这个示例,里面可以加的东西还很多。

学习嘛,就是边学边玩,后面有空,我想再加点东西,主要是想把 RabbitMQ 的消费方式,由阻塞改成非阻塞方式,

作者:二进制狂人
链接:https://juejin.cn/post/7254037028051943481
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/29596.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ClickPrompt

太长不看版&#xff1a; GitHub&#xff1a;github.com/prompt-engi…在线地址&#xff1a;www.clickprompt.org/ 欢迎来添加你的 AI Prompt 和使用心得。 再上个截图&#xff1a; ClickPrompt&#xff1a;简化你的 Prompt 设计之旅 上周末&#xff0c;为了更好地记录与分享…

线下嗨聊 AIGC两整天!为什么我推荐你参加稀土开发者大会?

&#x1f914; 若问今年谁最火&#xff1f; ChatGPT 答曰&#xff1a;AIGC 还有我&#xff01; 要问技术圈今年最火的话题是什么&#xff1f;无外乎这些关键词&#xff1a;AIGC、ChatGPT、某端已死、各行各业✖️ChatGPT…… 究其原因&#xff0c;是因为 AI 这些年在不断更新…

nodejs 获取微信小程序二维码

getQRCode(accessToken) {const { ctx } this;let parameters;let url https://api.weixin.qq.com/wxa/getwxacodeunlimit?access_token${accessToken};await axios({method: "post",url: url,responseEncoding: "base64", //重点data: {page: "pa…

我要开始在B站分享视频教程了

目录 前言一、精品网站分享ChatGPT菜鸟在线编辑器在线音乐网站 二、常用工具知识学习编程前端 视图文关于俺 前言 大家好&#xff0c;我是MIKE笔记&#xff0c;一名在CSDN经常分享技术文章和教程的博主。如果你也喜欢我的文章&#xff0c;那么我想向你推荐我的B站频道&#xf…

chatgpt赋能python:如何用Python做兼职的SEO

如何用Python做兼职的SEO SEO是指优化网站内容和结构&#xff0c;以便在搜索引擎排名中获得更高的排名。在今天的数字时代中&#xff0c;SEO对于许多企业和网站来说非常重要。Python作为一种简单易学的编程语言&#xff0c;可以用来帮助优化和管理网站。在本文中&#xff0c;我…

[经验教程]iPhone苹果手机Siri怎么设置?

iPhone苹果手机Siri怎么设置&#xff1f; 1、在iPhone苹果手机上打开手机设置&#xff1b; 2、在iPhone苹果手机设置内找到Siri与搜索&#xff0c;并点击进入&#xff1b; 3、进入后&#xff0c;在Siri与搜索内设置&#xff1b; 原文来源&#xff1a;https://www.caochai.com/…

siri快捷指令_入门的最佳Siri快捷方式

siri快捷指令 The new Shortcuts app that was introduced in iOS 12 lets you create shortcuts to automate any number of tasks. Here are a handful of good ones to get you started, with the hopes that it gets your creative juices flowing. iOS 12中引入的新的快捷…

练习四级英语听力推荐用哪个APP?

练习四级英语听力推荐用哪个APP&#xff1f;大学英语四级想必是大多数同学都要去追求的一个目标&#xff0c;今天给大家推荐几个对学习英语和四级考试帮助非常大的软件&#xff0c;希望能帮助到每一个面临英语四级考试的同学。 有道词典 大家都知道对于英语四级考试来说&#…

English Learning - L2-12 英音地道语音语调 音的同化 爆破技巧 2023.04.4 周二

English Learning - L2-12 英音地道语音语调 音的同化 爆破技巧 2023.04.4 周二 音的同化因声带影响发生同化因发音部位的影响而发生同化 失去爆破摆拍爆破&#xff08;爆破音&#xff0b;爆破音&#xff09;谦让型爆破爆破音摩擦音 与 爆破音破擦音爆破音鼻辅音/舌边音 爆破综…

如何看懂“听力图”?

听力图是一张形象地“描绘”患者的听力状况的“阴晴”表&#xff0c;它标记了患者的听力的情况&#xff0c;是判断听力是否健康的重要途径之一。 当我们拿到听力检查报告时&#xff0c;我们应该看些什么&#xff0c;关注些什么内容&#xff0c;我们能从检查单中得到怎样的信息…

Lecture 5 Part of Speech Tagging

目录 POS application: Information Extraction 词性应用&#xff1a;信息提取 POS Open Class 开放类词性Problem of word classes: Ambiguity 词类问题&#xff1a;模糊性Tagsets 标记集Penn Treebank Tags:Derived Tags: 衍生标签Tagged Text Example 标记文本示例Reasons f…

英语听力练习

生词积累生词所在句子 blast an explosion, such as that caused by dynamite(dynamite:a powerful explosive used especially in mining) The blast hit a bus about to set off for the Ugandan capital Kampala. collapse the act of falling down or fa…

如何做代码抽象设计,多种设计模式的应用【四种抽象方式,干货附上代码】

代码这种东西&#xff0c;机器不管你写的怎么样&#xff0c;肯定能识别&#xff0c; 人就不一样了&#xff0c;前人拉屎后人踩屎&#xff0c;一坨认栽&#xff0c;n坨就有点过分了哈 一般写代码也不用太高大上&#xff0c;很多接口还是很简单的&#xff0c;如果把各种设计模式搞…

我开发了一个AI网站,速速围观,文末全是福利

目录 前言 AI大侠能提供什么功能 AI聊天 AI写作 AI编程 AI绘画 以及其它近10种AI工具 AI大侠后续功能计划 SQL优化 共建计划 写到最后 大家好&#xff0c;我是大侠&#xff0c;AI领域的专业博主 前言 最近有很多小伙伴问大侠&#xff1a; 有没有国内免费安全可用…

chatgpt赋能python:如何将Python写的游戏添加联机功能

如何将Python写的游戏添加联机功能 Python是一种流行的编程语言&#xff0c;广泛用于游戏开发。有许多游戏是单机版&#xff0c;但联机游戏可以为玩家们提供更多的娱乐和社交互动。本文将介绍如何为你的Python游戏添加联机功能&#xff0c;以便玩家们可以在Internet上与其他玩…

带你了解一下关于ChatGPT的快速进化过程

ChatGPT是一种基于GPT-3的聊天机器人&#xff0c;它可以理解自然语言&#xff0c;并生成符合人类对话语义的响应。在过去几年中&#xff0c;随着深度学习技术和NLP领域的快速发展&#xff0c;ChatGPT也在不断地进化和发展。 ChatGPT早期的版本是建立在GPT-3的基础上&#xff0…

BUUCTF-Reverse —— 第二页的题目集合

[MRCTF2020]hello_world_go 用go语言编写的程序&#xff0c;go语言编写的程序用的是静态链接的方法所以程序都很大&#xff0c;而且反汇编后的伪代码也很麻烦。 因为是elf文件&#xff0c;动态调试elf文件的话&#xff0c;可以用IDA连linux虚拟机&#xff0c;也可以使用gdb动…

前端技术搭建俄罗斯方块(内含源码)

The sand accumulates to form a pagoda ✨ 写在前面✨ 功能介绍✨ 页面搭建✨ 样式设置✨ 逻辑部分 ✨ 写在前面 上周我们实通过前端基础实现了扫雷游戏&#xff0c;今天还是继续按照我们原定的节奏来带领大家完成俄罗斯方块游戏&#xff0c;功能也比较简单简单&#xff0c;也…

订餐系统之微信点餐

经过几天的开发、测试微信点餐demo终于完成了&#xff0c;特在此分享下&#xff0c;不好的地方请大家多指正下哈&#xff01;一开始&#xff0c;就想这东西出来这么久了&#xff0c;网上应该有很多现成的东西&#xff0c;于是&#xff0c;baidu、google了半天&#xff0c;基本没…

点餐系统测试

一、设计测试用例 二、提交BUG a)BUG 一 标题:兼容性差&#xff0c;只能在Chrome浏览器和火狐浏览器上使用 1.版本号&#xff1a;V0009 2.测试环境&#xff1a;Chrome 浏览器 版本号 96.0.4664.45 火狐 浏览器 版本号 97.0.1 操作系统&#xff1a;win10 3.测试数据…