✨个人主页: 熬夜学编程的小林
💗系列专栏: 【C语言详解】 【数据结构详解】【C++详解】【Linux系统编程】
目录
1、引言
2、进程池的基本概念
3、管道在进程池中的应用
4、进程池的实现
4.1、master类定义
4.2、测试信道
4.3、通过channle控制子进程
4.3.1、普通版本
4.3.2、重定向版本
4.4、回收管道和子进程
4.5、修复bug
5、进程池完整代码
5.1、makefile
5.2、Task.hpp
5.3、ProcessPool.cc
1、引言
在现代的软件开发中,处理大量并发任务是一项常见的需求。进程池作为一种有效的并发处理模型,能够预先创建多个进程并管理它们,以响应不断到来的任务。本文将介绍如何使用C++和Linux系统下的管道(Pipes)机制来构建一个简易的进程池。
2、进程池的基本概念
进程池是一种技术,它预先创建并维护一定数量的进程,这些进程在需要时执行特定的任务。进程池减少了进程创建和销毁的开销,提高了系统资源的利用率,并简化了并发任务的管理。
3、管道在进程池中的应用
管道是Linux系统中用于进程间通信(IPC)的一种简单机制。它允许一个进程(写端)将数据写入管道,并由另一个进程(读端)读取。在本例中,我们将使用管道来从主进程向子进程发送任务命令。
4、进程池的实现
会用到的头文件
#include <iostream>
#include <string>
#include <vector>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <ctime>
#include <cstdlib>
进程池基本结构
4.1、master类定义
// master信道
class Channel
{
public:// 构造函数Channel(int wfd, pid_t id, const std::string &name): _wfd(wfd), _subprocessid(id), _name(name){}// 获取成员变量函数int GetWfd() const { return _wfd; }pid_t GetProcessId() const { return _subprocessid; }std::string GetName() const { return _name; }~Channel(){}private:int _wfd; // 写端fdpid_t _subprocessid; // 子进程pidstd::string _name; // 信道名字
};
4.2、测试信道
1、创建管道
2、创建子进程
3、构建一个channel名称
4、测试是否创建成功
void work(int fd)
{while(true){sleep(1);}
}
int main(int argc,char* argv[])
{// 通过命令行传参创建几个子进程,不传个数直接报错if(argc != 2){std::cerr << "Usage: " << argv[0] << "processnum" << std::endl;return 1;}int num = std::stoi(argv[1]);std::vector<Channel> channels;// 1.创建信道和子进程for(int i = 0;i < num;i++){// 1.创建管道int pipefd[2] = {0};int n = pipe(pipefd);// n < 0创建失败if(n < 0) exit(0);// 2.创建子进程pid_t id = fork();// child -- readif(id == 0){// 关闭写端close(pipefd[1]);//work(pipefd[0]);close(pipefd[0]);exit(0);}// 3.构建一个channel名称std::string channel_name = "channel-" + std::to_string(i);// father -- writeclose(pipefd[0]);channels.push_back(Channel(pipefd[1],id,channel_name));}// for testfor(auto& channel : channels){std::cout << "====================================" << std::endl;std::cout << "channel_name->" << channel.GetName() << std::endl;std::cout << "channel_wfd->" << channel.GetWfd() << std::endl;std::cout << "channel_id->" << channel.GetProcessId() << std::endl;}sleep(100);return 0;
}
将创建信道和子进程封装成函数
// 创建信道和子进程
void CreateChannelAndSub(int num, std::vector<Channel> *channels)
{// Bugfor (int i = 0; i < num; i++){// 1.创建管道int pipefd[2] = {0};int n = pipe(pipefd);// n < 0创建失败if (n < 0)exit(0);// 2.创建子进程pid_t id = fork();// child -- readif (id == 0){// 关闭写端close(pipefd[1]);//work(pipefd[0]);close(pipefd[0]);exit(0);}// 3.构建一个channel名称std::string channel_name = "channel-" + std::to_string(i);// father -- writeclose(pipefd[0]);channels->push_back(Channel(pipefd[1], id, channel_name));}
}
4.3、通过channle控制子进程
4.3.1、普通版本
1、选择一个任务
2、选择一个信道和进程
3、发送任务
4、测试
1、创建一些任务(此处使用函数指针)
// 任务个数
#define TaskNum 3typedef void (*task_t)();//函数指针重命名 void Print()
{std::cout << "I am a Print task" << std::endl;
}
void DownLoad()
{std::cout << "I am a Download task" << std::endl;
}
void Flush()
{std::cout << "I am a Flush task" << std::endl;
}// 创建任务数组
task_t tasks[TaskNum];
2、调用任务的相关函数
// 加载任务
void LoadTask()
{srand(time(nullptr) ^ getpid() ^1777);tasks[0] = Print;tasks[1] = DownLoad;tasks[2] = Flush;
}// 通过数组下标执行某一条任务
void ExecuteTask(int number)
{if(number < 0 || number > 2) return;tasks[number]();
}// 通过随机数选择任务编号
int SelectTask()
{return rand() % TaskNum;
}
3、选择信道函数(轮询方式)
从第一个信道开始,依次进行使用。
// 0 1 2 channelnum
int NextChannel(int channelnum)
{static int next = 0;int channel = next;next++;next %= channelnum;return channel;
}
4、发送任务函数
发送任务实质是向写端写入执行任务的编号。
void SendTaskCommand(const Channel &channel, int taskcommand)
{write(channel.GetWfd(), &taskcommand, sizeof(taskcommand));
}
5、控制子进程主体函数
// 控制一次子进程
void ctrlProcessOnce(std::vector<Channel> &channels)
{sleep(1);// a.选择一个任务int taskcommand = SelectTask();// b.选择一个信道和进程int channel_index = NextChannel(channels.size());// c.发送任务SendTaskCommand(channels[channel_index], taskcommand);std::cout << std::endl;std::cout << "taskcommand: " << taskcommand << " channel: "<< channels[channel_index].GetName() << " sub process: " << channels[channel_index].GetProcessId() << std::endl;
}
// 通过times参数控制执行多少次,不传参则一直执行
void ctrlProcess(std::vector<Channel> &channels, int times = -1)
{if (times > 0){while (times--){ctrlProcessOnce(channels);}}else{while (true){ctrlProcessOnce(channels);}}
}
6、主函数
int main(int argc, char *argv[])
{if (argc != 2){std::cerr << "Usage: " << argv[0] << "processnum" << std::endl;return 1;}int num = std::stoi(argv[1]);// 加载任务LoadTask();std::vector<Channel> channels;// 1.创建信道和子进程CreateChannelAndSub(num, &channels);// 2.通过channle控制子进程ctrlProcess(channels);return 0;
}
4.3.2、重定向版本
我们可以将管道的读端,重定向到标准输入,然后从标准输入读取数据。
work()函数
void work()
{while (true){int command = 0;// 从标准输入读数据int n = read(0, &command, sizeof(command));if (n == sizeof(int)){std::cout << "pid is : " << getpid() << " handler task" << std::endl;ExecuteTask(command);}else if (n == 0){std::cout << "sub process : " << getpid() << " quit" << std::endl;break;}}
}
创建信道和子进程
// 创建信道和子进程(回调方式)
void CreateChannelAndSub(int num, std::vector<Channel> *channels,task_t task)
{// Bugfor (int i = 0; i < num; i++){// 1.创建管道int pipefd[2] = {0};int n = pipe(pipefd);// n < 0创建失败if (n < 0)exit(0);// 2.创建子进程pid_t id = fork();// child -- readif (id == 0){// 关闭写端close(pipefd[1]);dup2(pipefd[0], 0); // 将管道的读端,重定向到标准输入task();close(pipefd[0]);exit(0);}// 3.构建一个channel名称std::string channel_name = "channel-" + std::to_string(i);// father -- writeclose(pipefd[0]);channels->push_back(Channel(pipefd[1], id, channel_name));}
}
主函数
int main(int argc, char *argv[])
{if (argc != 2){std::cerr << "Usage: " << argv[0] << "processnum" << std::endl;return 1;}int num = std::stoi(argv[1]);// 加载任务LoadTask();std::vector<Channel> channels;// 1.创建信道和子进程,回调版本,调用什么函数,传什么函数名CreateChannelAndSub(num, &channels,work);// 2.通过channle控制子进程,执行5次ctrlProcess(channels,5);// 3. 回收管道和子进程. a. 关闭所有的写端 b. 回收子进程CleanUpChannel(channels);return 0;
}
可以实现一样的效果,但是我们可以将任务单独放在一个文件中,想要调用哪个任务,实现一个任务函数,传这个任务函数名即可。
4.4、回收管道和子进程
1、关闭所有的写端
2、回收子进程
1、关闭所有写端
在结构体内部实现关闭写端函数即可,即关闭写文件描述符即可。
void CloseChannel()
{close(_wfd);
}
2、回收子进程
通过父进程把子进程回收即可,即waitpid()。
void Wait()
{pid_t rid = waitpid(_subprocessid, nullptr, 0);if (rid > 0){std::cout << "wait " << rid << " success" << std::endl;}
}
3、清理信道函数
void CleanUpChannel(std::vector<Channel> &channels)
{for (auto &channel : channels){channel.CloseChannel();}for (auto &channel : channels){channel.Wait();}
}
4、主函数
int main(int argc, char *argv[])
{if (argc != 2){std::cerr << "Usage: " << argv[0] << "processnum" << std::endl;return 1;}int num = std::stoi(argv[1]);// 加载任务LoadTask();std::vector<Channel> channels;// 1.创建信道和子进程CreateChannelAndSub(num, &channels);// 2.通过channle控制子进程,执行5次ctrlProcess(channels,5);// 3. 回收管道和子进程. a. 关闭所有的写端 b. 回收子进程CleanUpChannel(channels);return 0;
}
对上面的清理函数做一个小小的修改!!!
关闭写端之后就回收子进程。
void CleanUpChannel(std::vector<Channel> &channels)
{for (auto &channel : channels){channel.CloseChannel();channel.Wait();}
}
现象是执行完5次之后,子进程阻塞了,没有被关闭。
为什么会出现上面的情况,子进程阻塞了呢???
因为关闭写端文件描述符,只是关闭了父进程的文件描述符(即引用计数-1),也就是父进程没有在写数据了,但是子进程还在读取数据,读进程就会阻塞。
通过上面我们知道,从第一个到最后一个,管道的引用计数是越来越少的,最后一个管道的引用计数为1,当我们逆向关闭管道的时候,也能够正确回收子进程。
void CleanUpChannel(std::vector<Channel> &channels)
{// 逆向关闭管道int num = channels.size() - 1;while(num >=0){channels[num].CloseChannel();channels[num--].Wait();}
}
4.5、修复bug
从上面的代码测试可以知道,如果我们关闭写端文件描述符之后再回收子进程,会出现阻塞情况,那么如何解决这个bug呢?
其实很简单,当我们创建第二个子进程的时候,我们就已经保存了前面所有打开的写端描述符,因此我们可以再第二次创建子进程之后,每次都先关闭写端文件描述符。
创建信道和子进程
void CreateChannelAndSub(int num, std::vector<Channel> *channels)
{// Bugfor (int i = 0; i < num; i++){// 1.创建管道int pipefd[2] = {0};int n = pipe(pipefd);// n < 0创建失败if (n < 0)exit(0);// 2.创建子进程pid_t id = fork();// child -- readif (id == 0){// fix bugif(!channels->empty()){// 第二次之后,关闭子进程开始创建的写端管道for(auto &channel : *channels) channel.CloseChannel();}// 关闭写端close(pipefd[1]);//work(pipefd[0]);close(pipefd[0]);exit(0);}// 3.构建一个channel名称std::string channel_name = "channel-" + std::to_string(i);// father -- writeclose(pipefd[0]);channels->push_back(Channel(pipefd[1], id, channel_name));}
}
5、进程池完整代码
5.1、makefile
makefile
processpool:ProcessPool.ccg++ -o $@ $^ -std=c++11
.PHONY:clean
clean:rm -rf processpool
5.2、Task.hpp
.hpp可以将函数声明定义放在一个文件。
Task.hpp
#pragma once#include <iostream>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>// 任务个数
#define TaskNum 3typedef void (*task_t)();//函数指针重命名 void Print()
{std::cout << "I am a Print task" << std::endl;
}
void DownLoad()
{std::cout << "I am a Download task" << std::endl;
}
void Flush()
{std::cout << "I am a Flush task" << std::endl;
}// 创建任务数组
task_t tasks[TaskNum]; // 加载任务
void LoadTask()
{srand(time(nullptr) ^ getpid() ^1777);tasks[0] = Print;tasks[1] = DownLoad;tasks[2] = Flush;
}// 通过数组下标执行某一条任务
void ExecuteTask(int number)
{if(number < 0 || number > 2) return;tasks[number]();
}// 通过随机数选择任务编号
int SelectTask()
{return rand() % TaskNum;
}void work()
{while (true){int command = 0;// 从标准输入读数据int n = read(0, &command, sizeof(command));if (n == sizeof(int)){std::cout << "pid is : " << getpid() << " handler task" << std::endl;ExecuteTask(command);}else if (n == 0){std::cout << "sub process : " << getpid() << " quit" << std::endl;break;}}
}
5.3、ProcessPool.cc
ProcessPool.cc
#include <iostream>
#include <string>
#include <vector>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include "Task.hpp"// void work(int fd)
// {
// while(true)
// {
// sleep(1);
// }
// }// void work(int rfd)
// {
// while (true)
// {
// int command = 0;
// int n = read(rfd, &command, sizeof(command));
// if (n == sizeof(int))
// {
// std::cout << "pid is : " << getpid() << " handler task" << std::endl;
// ExecuteTask(command);
// }
// else if (n == 0)
// {
// std::cout << "sub process : " << getpid() << " quit" << std::endl;
// break;
// }
// }
// }
// master信道
class Channel
{
public:Channel(int wfd, pid_t id, const std::string &name): _wfd(wfd), _subprocessid(id), _name(name){}int GetWfd() const { return _wfd; }pid_t GetProcessId() const { return _subprocessid; }std::string GetName() const { return _name; }void CloseChannel(){close(_wfd);}void Wait(){pid_t rid = waitpid(_subprocessid, nullptr, 0);if (rid > 0){std::cout << "wait " << rid << " success" << std::endl;}}~Channel(){}private:int _wfd; // 写端fdpid_t _subprocessid; // 子进程pidstd::string _name; // 信道名字
};// 形参类型和命名规范
// const &: 输出
// & : 输入输出型参数
// * : 输出型参数
// // 创建信道和子进程
// void CreateChannelAndSub(int num, std::vector<Channel> *channels)
// {
// // Bug
// for (int i = 0; i < num; i++)
// {
// // 1.创建管道
// int pipefd[2] = {0};
// int n = pipe(pipefd);
// // n < 0创建失败
// if (n < 0)
// exit(0);// // 2.创建子进程
// pid_t id = fork();// // child -- read
// if (id == 0)
// {
// // fix bug
// if(!channels->empty())
// {
// // 第二次之后,关闭子进程开始创建的写端管道
// for(auto &channel : *channels)
// channel.CloseChannel();
// }
// // 关闭写端
// close(pipefd[1]);
// //
// work(pipefd[0]);
// close(pipefd[0]);
// exit(0);
// }
// // 3.构建一个channel名称
// std::string channel_name = "channel-" + std::to_string(i);
// // father -- write
// close(pipefd[0]);
// channels->push_back(Channel(pipefd[1], id, channel_name));
// }
// }// 创建信道和子进程(回调方式)
void CreateChannelAndSub(int num, std::vector<Channel> *channels,task_t task)
{// Bugfor (int i = 0; i < num; i++){// 1.创建管道int pipefd[2] = {0};int n = pipe(pipefd);// n < 0创建失败if (n < 0)exit(0);// 2.创建子进程pid_t id = fork();// child -- readif (id == 0){// fix bugif(!channels->empty()){// 第二次之后,关闭子进程开始创建的写端管道for(auto &channel : *channels) channel.CloseChannel();}// 关闭写端close(pipefd[1]);dup2(pipefd[0], 0); // 将管道的读端,重定向到标准输入task();close(pipefd[0]);exit(0);}// 3.构建一个channel名称std::string channel_name = "channel-" + std::to_string(i);// father -- writeclose(pipefd[0]);channels->push_back(Channel(pipefd[1], id, channel_name));}
}// 0 1 2 channelnum
int NextChannel(int channelnum)
{static int next = 0;int channel = next;next++;next %= channelnum;return channel;
}void SendTaskCommand(const Channel &channel, int taskcommand)
{write(channel.GetWfd(), &taskcommand, sizeof(taskcommand));
}// 控制一次子进程
void ctrlProcessOnce(std::vector<Channel> &channels)
{sleep(1);// a.选择一个任务int taskcommand = SelectTask();// b.选择一个信道和进程int channel_index = NextChannel(channels.size());// c.发送任务SendTaskCommand(channels[channel_index], taskcommand);std::cout << std::endl;std::cout << "taskcommand: " << taskcommand << " channel: "<< channels[channel_index].GetName() << " sub process: " << channels[channel_index].GetProcessId() << std::endl;
}// void ctrlProcess(std::vector<Channel> &channels)
// {
// while (true)
// {
// sleep(1);
// // a.选择一个任务
// int taskcommand = SelectTask();
// // b.选择一个信道和进程
// int channel_index = NextChannel(channels.size());
// // c.发送任务
// SendTaskCommand(channels[channel_index], taskcommand);
// std::cout << std::endl;
// std::cout << "taskcommand: " << taskcommand << " channel: "
// << channels[channel_index].GetName() << " sub process: " << channels[channel_index].GetProcessId() << std::endl;
// }
// }void ctrlProcess(std::vector<Channel> &channels, int times = -1)
{if (times > 0){while (times--){ctrlProcessOnce(channels);}}else{while (true){ctrlProcessOnce(channels);}}
}void CleanUpChannel(std::vector<Channel> &channels)
{// 逆向关闭管道int num = channels.size() - 1;while(num >=0){channels[num].CloseChannel();channels[num--].Wait();}// for (auto &channel : channels)// {// channel.CloseChannel();// channel.Wait();// }// for (auto &channel : channels)// {// channel.Wait();// }
}int main(int argc, char *argv[])
{if (argc != 2){std::cerr << "Usage: " << argv[0] << "processnum" << std::endl;return 1;}int num = std::stoi(argv[1]);// 加载任务LoadTask();std::vector<Channel> channels;// 1.创建信道和子进程,普通版本//CreateChannelAndSub(num, &channels);// 1.创建信道和子进程,回调版本,调用什么函数,传什么函数名CreateChannelAndSub(num, &channels,work);// 2.通过channle控制子进程,执行5次ctrlProcess(channels,5);// 3. 回收管道和子进程. a. 关闭所有的写端 b. 回收子进程CleanUpChannel(channels);return 0;
}// ./processpool 5 测试代码一
// int main(int argc,char* argv[])
// {
// // 通过命令行传参创建几个子进程,不传个数直接报错
// if(argc != 2)
// {
// std::cerr << "Usage: " << argv[0] << "processnum" << std::endl;
// return 1;
// }
// int num = std::stoi(argv[1]);// std::vector<Channel> channels;
// // 1.创建信道和子进程
// for(int i = 0;i < num;i++)
// {
// // 1.创建管道
// int pipefd[2] = {0};
// int n = pipe(pipefd);
// // n < 0创建失败
// if(n < 0) exit(0);// // 2.创建子进程
// pid_t id = fork();// // child -- read
// if(id == 0)
// {
// // 关闭写端
// close(pipefd[1]);
// //
// work(pipefd[0]);
// close(pipefd[0]);
// exit(0);
// }
// // 3.构建一个channel名称
// std::string channel_name = "channel-" + std::to_string(i);
// // father -- write
// close(pipefd[0]);
// channels.push_back(Channel(pipefd[1],id,channel_name));
// }
// // for test
// for(auto& channel : channels)
// {
// std::cout << "====================================" << std::endl;
// std::cout << "channel_name->" << channel.GetName() << std::endl;
// std::cout << "channel_wfd->" << channel.GetWfd() << std::endl;
// std::cout << "channel_id->" << channel.GetProcessId() << std::endl;
// }
// sleep(100);
// return 0;
// }