Linux使用匿名管道实现进程池得以高效通信

                                               🎬慕斯主页修仙—别有洞天

                                              ♈️今日夜电波:Nonsense—Sabrina Carpenter

                                                                0:50━━━━━━️💟──────── 2:43
                                                                    🔄   ◀️   ⏸   ▶️    ☰  

                                      💗关注👍点赞🙌收藏您的每一次鼓励都是对我莫大的支持😍


目录

思路梳理

匿名管道知识回忆

匿名管道实现进程池思路

池化技术怎么提高效率?

具体实现

进程以及管道的创建操作

分发任务操作

回收资源操作

解决上述所提到Bug

总体代码及代码效果

Makefile

Task.hpp

mulpipe.cpp

实现效果


思路梳理

匿名管道知识回忆

        上一篇的文章中,详细介绍了管道的知识点,下面还是复习一下对于匿名管道相关的知识点。如下是匿名管道的一个示例图,这表现了两个“有血缘关系”的两个进程之间通过匿名管道进行通信的过程,我们通过控制struct files *fd_array[]中读或者写的struct files的开关来实现两个进程间的通信:

        我们主要通过如下的接口创建匿名管道来实现以上的匿名管道通信:

#include <unistd.h>
功能:创建一无名管道
原型
int pipe(int fd[2]);
参数
fd:文件描述符数组,其中fd[0]表示读端, fd[1]表示写端
返回值:成功返回0,失败返回错误代码

匿名管道实现进程池思路

        说大白话(●—●):我们使用一个父进程创建很多的管道,再创建对应数量的进程,然后这些管道分别与其他的进程进行直接的连接。这样我们就提前创建和联系好了一定数量的进程。我们想让父进程向其中一个子进程发消息就可通过选择管道直接发消息,而如果不发消息其它进程只会等待。通过信息的发送,我们可以就通过选择管道从而让子进程分别执行对应的任务。大致的图解如下:

池化技术怎么提高效率?

        池化技术通过资源共享和优化来提高效率,具体表现在以下几个方面:

  • 资源复用:池化技术通过重用已创建的资源,减少了频繁创建和销毁资源的开销。例如,线程池中的线程可以被多个任务重复使用,这样可以避免每次任务执行时都创建新线程的开销。
  • 减少等待时间:池化技术可以减少请求的等待时间。因为资源是预先分配好的,当有新的请求到来时,可以立即使用池中的资源,而不需要等待资源的创建过程。
  • 提高响应速度:由于资源已经准备好,池化技术可以快速响应请求,提高了处理速度。这对于需要快速响应的系统来说尤其重要。
  • 统一管理:池化技术提供了对资源的集中管理,这有助于监控系统资源的使用情况,及时回收不再使用的资源,避免资源浪费。
  • 优化性能:在大数据处理等场景中,池化技术可以通过合并多个请求来减少数据处理的时间和空间复杂度,从而提高数据处理的性能。

具体实现

进程以及管道的创建操作

        创建一个channel类用来来存储管道的读文件描述符ctrlfd以及进程描述符workerid,完成初始化操作以及后续的销毁操作。

static int number = 1;//标识对应的进程和管道class channel
{
public:channel(int fd, pid_t id) : ctrlfd(fd), workerid(id){name = "channel-" + std::to_string(number++);}public:int ctrlfd;pid_t workerid;std::string name;
};

        再根据先描述在组织的原则,我们在主函数使用一个std::vector<channel> channels来管理上面的结构体,可以根据需求进行增删查改等等操作。在完成这些预备操作后,创建对应数量的进程以及管道。

        特别注意:如下函数中的std::vector<int> old;以及如下代码是为了解决父子进程继承而产生的一些bug,这个将在最后解释:

        	std::vector<int> old;if(!old.empty()){for(auto fd : old){close(fd);}PrintFd(old);}old.push_back(pipefd[1]);

        如下函数的操作为创建管道以及进程,可以先忽略上述所提到的解决bug的代码,通过循环创建对应的子进程、关闭父子进程对应的读写文件,并且存储到vector<channel> *c(也就是主函数的中channels),下面的work()函数为子进程要做的工作,可以理解了后面的分发任务操作再来理解。

const int num = 5;//全局定义创建管道以及进程数
void CreateChannels(std::vector<channel> *c)
{std::vector<int> old;for (int i = 0; i < num; i++){// 1. 定义并创建管道int pipefd[2];int n = pipe(pipefd);assert(n == 0);(void)n;// 2. 创建进程pid_t id = fork();assert(id != -1);// 3. 构建单向通信信道if (id == 0) // child{if(!old.empty()){for(auto fd : old){close(fd);}PrintFd(old);}close(pipefd[1]);//关闭写dup2(pipefd[0], 0);//重定向写入Work();//子进程工作exit(0); // 会自动关闭自己打开的所有的fd}// fatherclose(pipefd[0]);c->push_back(channel(pipefd[1], id));old.push_back(pipefd[1]);// childid, pipefd[1]}
}

        因为前面我们已经重定向了管道的写入作为子进程的写入,接下来通过read就会读取对应的操作,需要注意的是:我们是通过一个int类型的变量来控制要完成的任务,后续再task.hpp中会定义对应要完成的任务。通过read的返回值来判断是要执行任务还是退出,我们在管道的知识中知道,read会等待write,也就是等待数据的输入。当写进程退出,读进程会跟着退出,我们根据以上特性来判断是要执行任务还是等待任务还是退出进程:

void Work()
{while (true){int code = 0;ssize_t n = read(0, &code, sizeof(code));if (n == sizeof(code)){if (!init.CheckSafe(code))continue;init.RunTask(code);}else if (n == 0){break;}else{// do nothing}}std::cout << "child quit" << std::endl;
}

分发任务操作

        当我们创建好管道以及进程后,可以注意到接下来的都是没有经过 if (id == 0) 控制下的程序,也就是说接下来的程序都是父进程运行的程序,因此我们就可以让父进程来分配任务使得子进程完成任务。接下来,我们创建一个task.hpp来模拟创建的任务以及对应的封装、任务分发等等操作如下:

#pragma once#include <iostream>
#include <functional>
#include <vector>
#include <ctime>
#include <unistd.h>
#include <stdlib.h>// using task_t = std::function<void()>;
typedef std::function<void()> task_t;void Download()
{std::cout << "我是一个下载任务"<< " 处理者: " << getpid() << std::endl;
}void PrintLog()
{std::cout << "我是一个打印日志的任务"<< " 处理者: " << getpid() << std::endl;
}void PushVideoStream()
{std::cout << "这是一个推送视频流的任务"<< " 处理者: " << getpid() << std::endl;
}// void ProcessExit()
// {
//     exit(0);
// }class Init
{
public:// 任务码const static int g_download_code = 0;const static int g_printlog_code = 1;const static int g_push_videostream_code = 2;// 任务集合std::vector<task_t> tasks;public:Init(){tasks.push_back(Download);tasks.push_back(PrintLog);tasks.push_back(PushVideoStream);srand(time(nullptr) ^ getpid());}bool CheckSafe(int code){if (code >= 0 && code < tasks.size())return true;elsereturn false;}void RunTask(int code){return tasks[code]();}int SelectTask(){return rand() % tasks.size();}std::string ToDesc(int code){switch (code){case g_download_code:return "Download";case g_printlog_code:return "PrintLog";case g_push_videostream_code:return "PushVideoStream";default:return "Unknow";}}
};Init init; // 定义对象

        传入主函数用于管理的channels,flag用于控制进程执行完任务后是否需要退出(1表示要退出,0表示不退出),num为要执行任务的次数,需要注意的是num是要在flag为1的前提下才能有效的,如果不传则程序只会执行一次。在选择完任务以及进程后,通过write来向指定的管道写入。具体读写操作可看:Linux进程间通信(IPC)机制之一:管道(Pipes)详解:匿名管道的特性与情况

void SendCommand(const std::vector<channel> &c, bool flag, int num = -1)
{int pos = 0;while (true){// 1. 选择任务int command = init.SelectTask();// 2. 选择信道(进程)const auto &channel = c[pos++];pos %= c.size();// debugstd::cout << "send command " << init.ToDesc(command) << "[" << command << "]"<< " in "<< channel.name << " worker is : " << channel.workerid << std::endl;// 3. 发送任务write(channel.ctrlfd, &command, sizeof(command));// 4. 判断是否要退出if (!flag){num--;if (num <= 0)break;}sleep(1);}std::cout << "SendCommand done..." << std::endl;
}

回收资源操作

        通过close关闭对应的管道,waitpid等待子进程的结束。

const int num = 5;//全局定义创建管道以及进程数
void ReleaseChannels(std::vector<channel> c)
{// version 2// int num = c.size() - 1;// for (; num >= 0; num--)// {//     close(c[num].ctrlfd);//     waitpid(c[num].workerid, nullptr, 0);// }// version 1for (const auto &channel : c){close(channel.ctrlfd);waitpid(channel.workerid, nullptr, 0);}// for (const auto &channel : c)// {//     pid_t rid = waitpid(channel.workerid, nullptr, 0);//     if (rid == channel.workerid)//     {//         std::cout << "wait child: " << channel.workerid << " success" << std::endl;//     }// }
}

解决上述所提到Bug

        如下代码:

        	std::vector<int> old;if(!old.empty()){for(auto fd : old){close(fd);}PrintFd(old);}old.push_back(pipefd[1]);

        这是个什么Bug呢?如果不加上上这段代码,那么关闭进程及管道就必须从最后面生成的进程和管道向前关闭。为啥呢?这是因为当我们父进程依次创建子进程,其中的对于管道的读写操作struct file也被继承了下来,也就是说有着上一个生成的子进程会被下一个子进程的写操作struct file指向,而下下个生成的子进程会指向前面两个子进程,以此类推...因此,我们需要在创建的时候关闭新创建子进程对应的写操作。

        具体的子进程继承写操作的struct file例子如下:

总体代码及代码效果

Makefile

processpool:mulpipe.cppg++ -o $@ $^ -std=c++11
.PHONY:clean
clean:rm -f processpool

Task.hpp

#pragma once#include <iostream>
#include <functional>
#include <vector>
#include <ctime>
#include <unistd.h>
#include <stdlib.h>// using task_t = std::function<void()>;
typedef std::function<void()> task_t;void Download()
{std::cout << "我是一个下载任务"<< " 处理者: " << getpid() << std::endl;
}void PrintLog()
{std::cout << "我是一个打印日志的任务"<< " 处理者: " << getpid() << std::endl;
}void PushVideoStream()
{std::cout << "这是一个推送视频流的任务"<< " 处理者: " << getpid() << std::endl;
}// void ProcessExit()
// {
//     exit(0);
// }class Init
{
public:// 任务码const static int g_download_code = 0;const static int g_printlog_code = 1;const static int g_push_videostream_code = 2;// 任务集合std::vector<task_t> tasks;public:Init(){tasks.push_back(Download);tasks.push_back(PrintLog);tasks.push_back(PushVideoStream);srand(time(nullptr) ^ getpid());}bool CheckSafe(int code){if (code >= 0 && code < tasks.size())return true;elsereturn false;}void RunTask(int code){return tasks[code]();}int SelectTask(){return rand() % tasks.size();}std::string ToDesc(int code){switch (code){case g_download_code:return "Download";case g_printlog_code:return "PrintLog";case g_push_videostream_code:return "PushVideoStream";default:return "Unknow";}}
};Init init; // 定义对象

mulpipe.cpp

#include <iostream>
#include <string>
#include <vector>
#include <cassert>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include "Task.hpp"const int num = 5;
static int number = 1;class channel
{
public:channel(int fd, pid_t id) : ctrlfd(fd), workerid(id){name = "channel-" + std::to_string(number++);}public:int ctrlfd;pid_t workerid;std::string name;
};void Work()
{while (true){int code = 0;ssize_t n = read(0, &code, sizeof(code));if (n == sizeof(code)){if (!init.CheckSafe(code))continue;init.RunTask(code);}else if (n == 0){break;}else{// do nothing}}std::cout << "child quit" << std::endl;
}void PrintFd(const std::vector<int> &fds)
{std::cout << getpid() << " close fds: ";for(auto fd : fds){std::cout << fd << " ";}std::cout << std::endl;
}// 传参形式:
// 1. 输入参数:const &
// 2. 输出参数:*
// 3. 输入输出参数:&
void CreateChannels(std::vector<channel> *c)
{// bugstd::vector<int> old;for (int i = 0; i < num; i++){// 1. 定义并创建管道int pipefd[2];int n = pipe(pipefd);assert(n == 0);(void)n;// 2. 创建进程pid_t id = fork();assert(id != -1);// 3. 构建单向通信信道if (id == 0) // child{if(!old.empty()){for(auto fd : old){close(fd);}PrintFd(old);}close(pipefd[1]);//关闭写dup2(pipefd[0], 0);//重定向写入Work();exit(0); // 会自动关闭自己打开的所有的fd}// fatherclose(pipefd[0]);c->push_back(channel(pipefd[1], id));old.push_back(pipefd[1]);// childid, pipefd[1]}
}void PrintDebug(const std::vector<channel> &c)
{for (const auto &channel : c){std::cout << channel.name << ", " << channel.ctrlfd << ", " << channel.workerid << std::endl;}
}void SendCommand(const std::vector<channel> &c, bool flag, int num = -1)
{int pos = 0;while (true){// 1. 选择任务int command = init.SelectTask();// 2. 选择信道(进程)const auto &channel = c[pos++];pos %= c.size();// debugstd::cout << "send command " << init.ToDesc(command) << "[" << command << "]"<< " in "<< channel.name << " worker is : " << channel.workerid << std::endl;// 3. 发送任务write(channel.ctrlfd, &command, sizeof(command));// 4. 判断是否要退出if (!flag){num--;if (num <= 0)break;}sleep(1);}std::cout << "SendCommand done..." << std::endl;
}
void ReleaseChannels(std::vector<channel> c)
{// version 2// int num = c.size() - 1;// for (; num >= 0; num--)// {//     close(c[num].ctrlfd);//     waitpid(c[num].workerid, nullptr, 0);// }// version 1for (const auto &channel : c){close(channel.ctrlfd);waitpid(channel.workerid, nullptr, 0);}// for (const auto &channel : c)// {//     pid_t rid = waitpid(channel.workerid, nullptr, 0);//     if (rid == channel.workerid)//     {//         std::cout << "wait child: " << channel.workerid << " success" << std::endl;//     }// }
}
int main()
{std::vector<channel> channels;// 1. 创建信道,创建进程CreateChannels(&channels);// 2. 开始发送任务const bool g_always_loop = true;// SendCommand(channels, g_always_loop);SendCommand(channels, !g_always_loop, 10);// 3. 回收资源,想让子进程退出,并且释放管道,只要关闭写端ReleaseChannels(channels);return 0;
}

实现效果


                   感谢你耐心的看到这里ღ( ´・ᴗ・` )比心,如有哪里有错误请踢一脚作者o(╥﹏╥)o! 

                                       

                                                                        给个三连再走嘛~  

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/247573.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ubuntu 22.04 安装mysql-8.0.34

ubuntu 22.04 安装mysql-8.0.34 1、基础安装配置 更新软件包&#xff1a; sudo apt update查看可用软件包&#xff1a; sudo apt search mysql-server安装最新版本&#xff1a; sudo apt install -y mysql-server或者&#xff0c;安装指定版本&#xff1a; sudo apt inst…

如何实现无公网ip远程SSH连接家中本地的树莓派

文章目录 如何通过 SSH 连接到树莓派步骤1. 在 Raspberry Pi 上启用 SSH步骤2. 查找树莓派的 IP 地址步骤3. SSH 到你的树莓派步骤 4. 在任何地点访问家中的树莓派4.1 安装 Cpolar4.2 cpolar进行token认证4.3 配置cpolar服务开机自启动4.4 查看映射到公网的隧道地址4.5 ssh公网…

qemu + vscode图形化调试linux kernel

一、背景 使用命令行连接gdb 在调试时&#xff0c;虽然可以通过tui enable 显示源码&#xff0c;但还是存在设置断点麻烦&#xff08;需要对着源码设置&#xff09;&#xff0c;terminal显示代码不方便&#xff0c;不利于我们学习&#xff1b;另外在gdb 下p命令显示结构体内容…

【command】使用nr简化npm run命令

参考文章 添加 alias nrnpm run通过alias启动命令可以帮助我们节省运行项目输入命令的时间 $ cd ~ $ vim .bash_profile $ source ~/.bashrc

ChatGPT与文心一言:智能回复与语言准确性的较量

在当今数字化时代&#xff0c;随着人们对智能化技术的需求不断增长&#xff0c;智能回复工具也成为了日常生活中不可或缺的一部分。ChatGPT和文心一言作为两个备受瞩目的智能回复工具&#xff0c;在智能回复、语言准确性以及知识库丰富度等方面各有卓越之处。 本文将对这两者进…

14.5 Flash查询和添加数据库数据

14.5 Flash查询和添加数据库数据 在Flash与数据库通讯的实际应用中&#xff0c;如何实现用户的登录与注册是经常遇到的一个问题。登录实际上就是ASP根据Flash提供的数据查询数据库的过程&#xff0c;而注册则是ASP将Flash提供的数据写入数据库的过程。 1.启动Access2003&…

LeetCode 热题 100 | 子串

目录 1 560. 和为 K 的子数组 2 239. 滑动窗口最大值 3 76. 最小覆盖子串 菜鸟做题第二周&#xff0c;语言是 C 1 560. 和为 K 的子数组 题眼&#xff1a;“子数组是数组中元素的连续非空序列。” 解决本问题的关键就在于如何翻译问题。子数组 s 的和可以看作数组 i 的…

C++ 类与对象(上)

目录 本节目标 1.面向过程和面向对象初步认识 2.类的引入 3.类的定义 4.类的访问限定符及封装 4.1 访问限定符 4.2 封装 5. 类的作用域 6. 类的实例化 7.类对象模型 7.1 如何计算类对象的大小 7.2 类对象的存储方式猜测 7.3 结构体内存对齐规则 8.this指针 8.1 thi…

Swift 周报 第四十六期

文章目录 前言卖不动了&#xff1f;iPhone 15 系列跌破 5000 元大关StoreKit 和审核指南更新将你的 App 提交到 Apple Vision Pro 的 App Store 提案通过的提案 Swift论坛推荐博文话题讨论关于我们 前言 本期是 Swift 编辑组整理周报的第四十六期&#xff0c;每个模块已初步成…

用C语言实现贪吃蛇游戏!!!

前言 大家好呀&#xff0c;我是Humble&#xff0c;不知不觉在CSND分享自己学过的C语言知识已经有三个多月了&#xff0c;从开始的C语言常见语法概念说到C语言的数据结构今天用C语言实现贪吃蛇已经有30余篇博客的内容&#xff0c;也希望这些内容可以帮助到各位正在阅读的小伙伴…

Spark3内核源码与优化

文章目录 一、Spark内核原理1、Spark 内核概述1.1 简介1.2 Spark 核心组件1.3 Spark 通用运行流程概述 2、Spark 部署模式2.1 YARN Cluster 模式(重点)2.2 YARN Client 模式2.3 Standalone Cluster 模式2.4 Standalone Client 模式 3、Spark 通讯架构3.1 Spark 通信架构概述3.2…

Springboot响应数据详解

功能接口 Controller下每一个暴露在外的方法都是一个功能接口 功能接口的请求路径是RequestMapping定义的路径&#xff0c;浏览器需要请求该功能则需要发出该路径下的请求。 RestController RestControllerControllerResponseBody(响应数据的注解) ResponseBody 类型&#…

视频号下载提取器:如何轻松获取视频号的视频

在数字化的世界中&#xff0c;我们每天重复刷着形形色色的短视频&#xff0c;你们知道他们每天接收到的媒体内容就是经过不断的处理和编辑呈现在我们观看的产物。 其中&#xff0c;视频内容由于其生动形象的表现形式&#xff0c;已经成为人们获取信息、娱乐和学习的重要途径。然…

[GXYCTF2019]BabyUpload1

尝试各种文件&#xff0c;黑名单过滤后缀ph&#xff0c;content-type限制image/jpeg 内容过滤<?&#xff0c;木马改用<script languagephp>eval($_POST[cmdjs]);</script> 上传.htaccess将上传的文件当作php解析 蚁剑连接得到flag

城市开发区视频系统建设方案:打造视频基座、加强图像数据治理

一、背景需求 随着城市建设的步伐日益加快&#xff0c;开发区已经成为了我国工业化、城镇化和对外开放的重要载体。自贸区、开发区和产业园的管理工作自然也变得至关重要。在城市经开区的展览展示馆、进出口商品展示交易中心等地&#xff0c;数千路监控摄像头遍布各角落&#…

嵌入式第十二天!(指针数组、指针和二维数组的关系、二级指针)

1. 指针数组&#xff1a; int *a[5]; char *str[5]; 指针数组主要用来操作字符串数组&#xff0c;通常将指针数组的每个元素存放字符串的首地址实现对多个字符串的操作。 二维数组主要用来存储字符串数组&#xff0c;通过每行存储一个字符串&#xff0c;多行存储多个字符串所组…

Phoncent博客GPT写作工具

对于许多人来说&#xff0c;写作并不是一件轻松的事情。有时候&#xff0c;我们可能会遇到写作灵感枯竭、写作思路混乱、语言表达困难等问题。为了解决这些问题&#xff0c;Phoncent博客推出了一款创新的工具——GPT写作工具&#xff0c;它利用了GPT技术&#xff0c;为用户提供…

linux安装docker-compose

前言 如果你的docker版本是23&#xff0c;请移步到linux安装新版docker&#xff08;23&#xff09;和docker-compose这篇博客 查看docker版本命令&#xff1a; docker --version今天安装docker-compose的时候&#xff0c;找了很多教程&#xff0c;但是本地一直报错&#xff0…

掌握使用 React 和 Ant Design 的个人博客艺术之美

文章目录 前言在React的海洋中起航安装 Create React App安装Ant Design 打造个性化的博客风格通过路由实现多页面美化与样式定制部署与分享总结 前言 在当今数字时代&#xff0c;个人博客成为表达观点、分享经验和展示技能的独特平台。在这个互联网浪潮中&#xff0c;选择使用…

每日一道面试题:Java中序列化与反序列化

写在开头 哈喽大家好&#xff0c;在高铁上码字的感觉是真不爽啊&#xff0c;小桌板又拥挤&#xff0c;旁边的小朋友也比较的吵闹&#xff0c;影响思绪&#xff0c;但这丝毫不影响咱学习的劲头&#xff01;哈哈哈&#xff0c;在这喧哗的车厢中&#xff0c;思考着这样的一个问题…