聊聊并发 生产者消费者模式

http://ifeve.com/producers-and-consumers-mode/

本文首发于InfoQ   作者:方腾飞  校对:张龙

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问题,所以便有了生产者和消费者模式。

什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

这个阻塞队列就是用来给生产者和消费者解耦的。纵观大多数设计模式,都会找一个第三者出来进行解耦,如工厂模式的第三者是工厂类,模板模式的第三者是模板类。在学习一些设计模式的过程中,如果先找到这个模式的第三者,能帮助我们快速熟悉一个设计模式。

生产者消费者模式实战

我和同事一起利用业余时间开发的Yuna工具中使用了生产者和消费者模式。首先我先介绍下Yuna工具,在阿里巴巴很多同事都喜欢通过邮件分享技术文章,因为通过邮件分享很方便,同学们在网上看到好的技术文章,复制粘贴发送就完成了分享,但是我们发现技术文章不能沉淀下来,对于新来的同学看不到以前分享的技术文章,大家也很难找到以前分享过的技术文章。为了解决这问题,我们开发了Yuna工具。Yuna取名自我非常喜欢的一款RPG游戏”最终幻想”中女主角的名字。

首先我们申请了一个专门用来收集分享邮件的邮箱,比如share@alibaba.com,同学将分享的文章发送到这个邮箱,让同学们每次都抄送到这个邮箱肯定很麻烦,所以我们的做法是将这个邮箱地址放在部门邮件列表里,所以分享的同学只需要象以前一样向整个部门分享文章就行,Yuna工具通过读取邮件服务器里该邮箱的邮件,把所有分享的邮件下载下来,包括邮件的附件,图片,和邮件回复,我们可能会从这个邮箱里下载到一些非分享的文章,所以我们要求分享的邮件标题必须带有一个关键字,比如[内贸技术分享],下载完邮件之后,通过confluence的web service接口,把文章插入到confluence里,这样新同事就可以在confluence里看以前分享过的文章,并且Yuna工具还可以自动把文章进行分类和归档。

为了快速上线该功能,当时我们花了三天业余时间快速开发了Yuna1.0版本。在1.0版本中我并没有使用生产者消费模式,而是使用单线程来处理,因为当时只需要处理我们一个部门的邮件,所以单线程明显够用,整个过程是串行执行的。在一个线程里,程序先抽取全部的邮件,转化为文章对象,然后添加全部的文章,最后删除抽取过的邮件。代码如下:

01public void extract() {
02        logger.debug("开始" + getExtractorName() + "。。");
03        //抽取邮件
04        List<Article> articles = extractEmail();
05        //添加文章
06        for (Article article : articles) {
07            addArticleOrComment(article);
08        }
09        //清空邮件
10        cleanEmail();
11        logger.debug("完成" + getExtractorName() + "。。");
12    }

Yuna工具在推广后,越来越多的部门使用这个工具,处理的时间越来越慢,Yuna是每隔5分钟进行一次抽取的,而当邮件多的时候一次处理可能就花了几分钟,于是我在Yuna2.0版本里使用了生产者消费者模式来处理邮件,首先生产者线程按一定的规则去邮件系统里抽取邮件,然后存放在阻塞队列里,消费者从阻塞队列里取出文章后插入到conflunce里。代码如下:

01public class QuickEmailToWikiExtractor extends AbstractExtractor {
02 
03private ThreadPoolExecutor      threadsPool;
04 
05private ArticleBlockingQueue<ExchangeEmailShallowDTO> emailQueue;
06 
07public QuickEmailToWikiExtractor() {
08        emailQueue= new ArticleBlockingQueue<ExchangeEmailShallowDTO>();
09        int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
10        threadsPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, 10l, TimeUnit.SECONDS,
11                new LinkedBlockingQueue<Runnable>(2000));
12 
13    }
14 
15public void extract() {
16        logger.debug("开始" + getExtractorName() + "。。");
17        long start = System.currentTimeMillis();
18 
19        //抽取所有邮件放到队列里
20        new ExtractEmailTask().start();
21 
22        // 把队列里的文章插入到Wiki
23        insertToWiki();
24 
25        long end = System.currentTimeMillis();
26        double cost = (end - start) / 1000;
27        logger.debug("完成" + getExtractorName() + ",花费时间:" + cost + "秒");
28    }
29 
30    /**
31     * 把队列里的文章插入到Wiki
32     */
33    private void insertToWiki() {
34        //登录wiki,每间隔一段时间需要登录一次
35        confluenceService.login(RuleFactory.USER_NAME, RuleFactory.PASSWORD);
36 
37        while (true) {
38            //2秒内取不到就退出
39            ExchangeEmailShallowDTO email = emailQueue.poll(2, TimeUnit.SECONDS);
40            if (email == null) {
41                break;
42            }
43            threadsPool.submit(new insertToWikiTask(email));
44        }
45    }
46 
47     protected List<Article> extractEmail() {
48        List<ExchangeEmailShallowDTO> allEmails = getEmailService().queryAllEmails();
49        if (allEmails == null) {
50            return null;
51        }
52        for (ExchangeEmailShallowDTO exchangeEmailShallowDTO : allEmails) {
53            emailQueue.offer(exchangeEmailShallowDTO);
54        }
55        return null;
56    }
57 
58    /**
59     * 抽取邮件任务
60     *
61     * @author tengfei.fangtf
62     */
63    public class ExtractEmailTask extends Thread {
64        public void run() {
65            extractEmail();
66        }
67    }
68}

使用了生产者和消费者模式后,邮件的整体处理速度比以前要快了很多。

多生产者和多消费者场景

在多核时代,多线程并发处理速度比单线程处理速度更快,所以我们可以使用多个线程来生产数据,同样可以使用多个消费线程来消费数据。而更复杂的情况是,消费者消费的数据,有可能需要继续处理,于是消费者处理完数据之后,它又要作为生产者把数据放在新的队列里,交给其他消费者继续处理。如下图:

生产者消费者模式

我们在一个长连接服务器中使用了这种模式,生产者1负责将所有客户端发送的消息存放在阻塞队列1里,消费者1从队列里读消息,然后通过消息ID进行hash得到N个队列中的一个,然后根据编号将消息存放在到不同的队列里,每个阻塞队列会分配一个线程来消费阻塞队列里的数据。如果消费者2无法消费消息,就将消息再抛回到阻塞队列1中,交给其他消费者处理。

以下是消息总队列的代码;

01/**
02 * 总消息队列管理
03 *
04 * @author tengfei.fangtf
05 */
06public class MsgQueueManager implements IMsgQueue{
07 
08    private static final Logger              LOGGER
09 = LoggerFactory.getLogger(MsgQueueManager.class);
10 
11    /**
12     * 消息总队列
13     */
14    public final BlockingQueue<Message> messageQueue;
15 
16    private MsgQueueManager() {
17        messageQueue = new LinkedTransferQueue<Message>();
18    }
19 
20    public void put(Message msg) {
21        try {
22            messageQueue.put(msg);
23        catch (InterruptedException e) {
24            Thread.currentThread().interrupt();
25        }
26    }
27 
28    public Message take() {
29        try {
30            return messageQueue.take();
31        catch (InterruptedException e) {
32            Thread.currentThread().interrupt();
33        }
34        return null;
35    }
36 
37}

启动一个消息分发线程。在这个线程里子队列自动去总队列里获取消息。

01/**
02     * 分发消息,负责把消息从大队列塞到小队列里
03     *
04     * @author tengfei.fangtf
05     */
06    static class DispatchMessageTask implements Runnable {
07        @Override
08        public void run() {
09            BlockingQueue<Message> subQueue;
10            for (;;) {
11                //如果没有数据,则阻塞在这里
12                Message msg = MsgQueueFactory.getMessageQueue().take();
13                //如果为空,则表示没有Session机器连接上来,
14需要等待,直到有Session机器连接上来
15                while ((subQueue = getInstance().getSubQueue()) == null) {
16                    try {
17                        Thread.sleep(1000);
18                    catch (InterruptedException e) {
19                        Thread.currentThread().interrupt();
20                    }
21                }
22                //把消息放到小队列里
23                try {
24                    subQueue.put(msg);
25                catch (InterruptedException e) {
26                    Thread.currentThread().interrupt();
27                }
28            }
29        }
30    }

使用Hash算法获取一个子队列。

01/**
02     * 均衡获取一个子队列。
03     *
04     * @return
05     */
06    public BlockingQueue<Message> getSubQueue() {
07        int errorCount = 0;
08        for (;;) {
09            if (subMsgQueues.isEmpty()) {
10                return null;
11            }
12            int index = (int) (System.nanoTime() % subMsgQueues.size());
13            try {
14                return subMsgQueues.get(index);
15            catch (Exception e) {
16                //出现错误表示,在获取队列大小之后,队列进行了一次删除操作
17                LOGGER.error("获取子队列出现错误", e);
18                if ((++errorCount) < 3) {
19                    continue;
20                }
21            }
22        }
23    }

使用的时候我们只需要往总队列里发消息。

1//往消息队列里添加一条消息
2        IMsgQueue messageQueue = MsgQueueFactory.getMessageQueue();
3        Packet msg = Packet.createPacket(Packet64FrameType.
4TYPE_DATA, "{}".getBytes(), (short1);
5        messageQueue.put(msg);

线程池与生产消费者模式

Java中的线程池类其实就是一种生产者和消费者模式的实现方式,但是我觉得其实现方式更加高明。生产者把任务丢给线程池,线程池创建线程并处理任务,如果将要运行的任务数大于线程池的基本线程数就把任务扔到阻塞队列里,这种做法比只使用一个阻塞队列来实现生产者和消费者模式显然要高明很多,因为消费者能够处理直接就处理掉了,这样速度更快,而生产者先存,消费者再取这种方式显然慢一些。

我们的系统也可以使用线程池来实现多生产者消费者模式。比如创建N个不同规模的Java线程池来处理不同性质的任务,比如线程池1将数据读到内存之后,交给线程池2里的线程继续处理压缩数据。线程池1主要处理IO密集型任务,线程池2主要处理CPU密集型任务。

 

小结

本章讲解了生产者消费者模式,并给出了实例。读者可以在平时的工作中思考下哪些场景可以使用生产者消费者模式,我相信这种场景应该非常之多,特别是需要处理任务时间比较长的场景,比如上传附件并处理,用户把文件上传到系统后,系统把文件丢到队列里,然后立刻返回告诉用户上传成功,最后消费者再去队列里取出文件处理。比如调用一个远程接口查询数据,如果远程服务接口查询时需要几十秒的时间,那么它可以提供一个申请查询的接口,这个接口把要申请查询任务放数据库中,然后该接口立刻返回。然后服务器端用线程轮询并获取申请任务进行处理,处理完之后发消息给调用方,让调用方再来调用另外一个接口拿数据。

原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: 聊聊并发(十)生产者消费者模式

  • About 
  • Latest Posts

方 腾飞

花名清英,并发网(ifeve.com)创始人,畅销书《Java并发编程的艺术》作者,蚂蚁金服技术专家。目前工作于支付宝微贷事业部,关注互联网金融,并发编程和敏捷实践。微信公众号aliqinying。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/45027.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

阿里内贸团队敏捷实践-敏捷回顾

回顾review是敏捷开发中的一个必不可少的实践也是把整个敏捷开发过程连接成一个闭环的关键节点本文将阐述我们是如何做敏捷回顾的。 敏捷回顾最高指导原则 无论我们发现了什么考虑到当时的已知情况、个人的技术水平和能力、可用的资源以及手上的状况我们理解并坚信每个人对自己…

java 生产者消费者模式_聊聊并发(十)生产者消费者模式

本文首发于InfoQ 作者&#xff1a;方腾飞 校对&#xff1a;张龙 在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。 为什么要使用生产者和消费者模式 在线程世界里&#xff0c;生产…

基于Trtc的内贸站视频聊天服务【二】

基于Trtc的内贸站视频聊天服务【二】 上一节课和大家聊了一下web端视频聊天的技术演变和发展&#xff0c;需要满足web端视频聊天的基本条件。以及介绍了一下腾讯云提供的Trtc服务&#xff0c;大概说了下腾讯云的sdk。本节课就以实际开发内贸站视频聊天的项目&#xff08;Swan&…

谷歌外贸sem与百度内贸sem的不同

1&#xff0c;国内的话&#xff0c;不用在乎是否使用在家用&#xff0c;起订量问题一般不用特别注意&#xff0c;如果家用零售的话&#xff0c;大家会很自觉地想到淘宝&#xff0c;拼多多。但是进出口的话&#xff0c;必须是商用&#xff0c;批发&#xff0c;大批量货物类型&am…

外贸软件进出口内贸综合型管理解决方案

外贸公司综合型业务模式&#xff0c;指的是公司涉及自营、代理进出口业务、内贸业务、转口业务等等多业务模式&#xff0c;涉及的产品种类多&#xff0c;像这样的综合型外贸公司就需要通过信息化管理实现业财一体化&#xff0c;完善资金流向&#xff0c;简化工作流程&#xff0…

使用Azure OpenAI服务创建聊天机器人

创建聊天机器人步骤&#xff1a; 1、开通 Azure OpenAI 服务 在 Azure 国际版注册账号。注册后创建OpenAI 服务。申请提交后需要等待审核&#xff0c;审核通过后就可以对接接口了。 2、创建 Azure OpenAI 服务 当你的申请通过后&#xff0c;就可以到 Azure 上创建 OpenAI 服…

C#探索之路(9):深入理解C#代码编译的过程以及原理

C#探索之路(9)&#xff1a;深入理解C#代码编译的过程以及原理 文章目录 C#探索之路(9)&#xff1a;深入理解C#代码编译的过程以及原理一、前言&#xff1a;概念解析1、编译器&#xff1a;2、JIT是什么&#xff1f;3、AOT是什么&#xff1f;4、如何理解这个“基于运行时”的概念…

办公必备!不再被格式问题困扰,轻松搞定文档转换!

大家平时在工作中会需要将文档转换为其他格式吗&#xff1f; 日常工作中&#xff0c;经常碰到需要文件格式转换的情况&#xff0c;对于掌握了一些转换技能的朋友说&#xff0c;文件格式转换自然不在话下 对于不熟练的朋友来说&#xff0c;想要轻松转换文件格式&#xff0c;就…

微信dat文件用什么软件打开,如何转成jpg常用格式

微信dat文件是在PC端微信软件产生的图片文件&#xff0c;这个文件实际上是一个图片&#xff0c;微信把聊天过程中产生的图片进行了加密&#xff0c;更改了后缀存储为dat文件。 有时候删除了聊天记录或者被系统清理软件清理了&#xff0c;但还想查看曾经的微信聊天图片。这个时候…

PDF转Word怎么调整格式?这个方法轻松解决

在日常工作中&#xff0c;我们可能会遇到需要将PDF文件转换为Word文档的情况。虽然现在的PDF转Word工具越来越智能化&#xff0c;但仍然有一些格式调整需要我们手动进行。最近就有个小伙伴说接到了一份由客户提供的PDF文档&#xff0c;需要将其中的内容转换为Word文档&#xff…

新闻发布系统(java实现)+论文

java新闻发布系统以及论文&#xff0c;有需要联系QQ:1240952102 java源码以及论文 数据库脚本 以及开发工具齐全 只需安装即可使用 有需要联系QQ:1240952102

2020秋 英文科技论文写作与学术报告-期末

2020秋 英文科技论文写作与学术报告-期末 搜索答案不易&#xff0c;切勿白嫖

开题报告:基于java新冠疫苗在线预约系统 毕业设计论文开题报告模板

开发操作系统&#xff1a;windows10 4G内存 500G 开发环境&#xff1a;JDK1.8 Tomcat8 开发语言&#xff1a;Java 开发框架&#xff1a;springboot 模板引擎&#xff1a;Thymeleaf 开发工具&#xff1a;Idea 数据库&#xff1a;mysql8 数据库管理工具&#xff1a;nav…

大学计划《数字化转型赋能教育创新发展高峰论坛》成功举办

2023年4月8日&#xff0c;由航天科技控股集团股份有限公司&#xff08;简称“航天科技”&#xff09;主办&#xff0c;CFF上海与上海电子信息职业技术学院承办、智慧树网支持的《数字化转型赋能教育创新发展高峰论坛》线上会议顺利召开。此次会议邀请到了众多教育界专家、教学名…

可复现、开放科研、跨学科合作:数据驱动下的科研趋势及应用方案

信息技术的快速发展&#xff0c;催化了数据科学场景下科研组织提高科学研究的可复现性、实现开放科研、开展跨学科领域的交叉研究等协同诉求。本文剖析了此三类诉求的实现难点&#xff0c;并提供了系统化的解决方案。 欢迎进入ModelWhale 官网注册试用&#xff0c;个人专业版与…

教育信息化时代,如何打造中学理科信息化实验操作考场方案

近年来&#xff0c;我国考试招生制度不断改进完善&#xff0c;初步形成了相对完整的考试招生体系。但随着教育事业的逐步发展&#xff0c;国务院明确提出了改革考试形式和内容&#xff1a;完善中学学业水平考试&#xff0c;规范中考学生综合素质评价&#xff0c;加快推进中学院…

论文导读 | 社交网络上的信息传播预测

作者&#xff1a;北京大学苟向阳 编者按&#xff1a; 现代社交网络极大地促进了信息的生成和传播&#xff0c;也加剧了不同信息对用户注意力的竞争。 对于一条信息的传播范围进行预测&#xff0c;能够帮助运营者和用户提前发现潜在的热点&#xff0c;从而为其决策提供指导。 …

风变MTP管理课,助力职场乘风破浪

相信很多人在自己的职场规划中&#xff0c;最终都是奔着管理岗而去的。然而&#xff0c;管理人员也并不是那么容易做的&#xff0c;因为做了管理岗就意味着要有一定的领导力&#xff0c;要会管理员工&#xff0c;要学会把任务分发给员工&#xff0c;学会识人、用人等。所以对于…

基于jsp的新闻发布系统

新闻发布系统 下面就让我们来说一说基于jsp的新闻发布系统&#xff0c;其中使用的技术有JavaBean、fillter、数据库等&#xff0c;能够实现新闻的发布功能&#xff0c;在发布之后能够进行对每一条新闻的删除、修改、或者继续增加新的文章&#xff0c;最后还能够进行查询功能&am…

宋宇-课堂对话领域研究热点与 前沿趋势探究

好家伙。。。 看这种论文容易自闭&#xff0c;&#xff0c;&#xff0c;怎么能写这么好呢。。。 下次看看博士的论文吧还是。。 定义 课堂对话( classroom dialogue /discourse) 是师生间或者生生间围绕教育教 学目标的实现所形成的良性的交流活动。&#xff3b;1&#xff3d…