管道
管道是Unix中最古老的进程间通信的形式。
我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”
匿名管道
#include <unistd.h>
功能:创建一无名管道
原型
int pipe(int fd[2]);
参数
fd:文件描述符数组,其中fd[0]表示读端, fd[1]表示写端
返回值:成功返回0,失败返回错误代码
管道文件时一种内存级文件,没有名称,故为匿名管道
父进程打开文件然后fork创建子进程,子进程会继承父进程文件描述符表中的内容。
然后两个进程即可看到同一份文件。
一般而言管道只能用来进行单向的数据通信。
为什么让父进程以读和写的方式打开同一个文件?
为了让子进程看到读写端。
int fds[2];int n=pipe(fds);
代码实现
//1.创建管道文件打开读写端int fds[2];int n=pipe(fds);assert(n==0);//2.forkpid_t id=fork();assert(id>=0);if(id==0){//子进程写入close(fds[0]);const char* s="子进程,正在向父进程发消息";int cnt=0;while(true){cnt++;char buffer[1024];snprintf(buffer,sizeof buffer,"child->parent say: %s[%d][%d]",s,cnt,getpid());write(fds[1],buffer,sizeof(buffer));//系统接口不需要考虑'\0'sleep(1);}close(fds[1]);exit(0);}close(fds[1]);while(true){char buffer[1024];ssize_t s=read(fds[0],buffer,sizeof(buffer)-1);if(s>0)//s代表读到的字节数buffer[s]=0;cout<<"Get Message # "<<buffer<<"| mypid: "<<getpid()<<endl;}n=waitpid(id,nullptr,0);assert(n==id);close(fds[0]);
命名管道
命名管道可以从命令行上创建,命令行方法是使用下面这个命令:
mkfifo filename
命名管道也可以从程序里创建,相关函数有:
int mkfifo(const char *filename,mode_t mode);
创建命名管道:
int main(int argc, char *argv[])
{mkfifo("p2", 0644);return 0;
}
comm.hpp
#pragma once#include<iostream>
#include<cassert>
#include<cstdio>
#include<cstring>
#include<string>
#include<unistd.h>
#include<sys/wait.h>
#include<sys/types.h>
#include<sys/stat.h>
#include<cerrno>
#include<fcntl.h>
using namespace std;
#define NAMED_PIPE "/tmp/mypipe.test"bool createFifo(const string& path)
{int n=mkfifo(path.c_str(),0666);if(n==0) return true;else{cout<<"errno: "<<errno<<" err string: "<<strerror(errno)<<endl;return false;}
}
void removeFifo(const string& path)
{int n=unlink(path.c_str());assert(n==0);
}
server.cc
#include"comm.hpp"using namespace std;int main()
{bool r=createFifo(NAMED_PIPE);assert(r);cout<<"server begin"<<endl;int rfd=open(NAMED_PIPE,O_RDONLY);cout<<"server end"<<endl;if(rfd<0) exit(1);//readchar buffer[1024];while (true){ssize_t s=read(rfd,buffer,sizeof(buffer)-1);if(s>0){buffer[s]=0;cout<<"client->server# "<<buffer;}else if(s==0){cout<<"client quit , me too"<<endl;break;}else{cout<<"err string--------"<<strerror(errno)<<endl;break;}}close(rfd);removeFifo(NAMED_PIPE);return 0;
}
clent.cc
#include"comm.hpp"using namespace std;int main()
{cout<<"client begin"<<endl;int wfd = open(NAMED_PIPE, O_WRONLY);cout<<"client end"<<endl;if (wfd < 0)exit(1);// writechar buffer[1024];while (true){cout << "Please Say# ";fgets(buffer, sizeof(buffer), stdin);if(strlen(buffer)>0) buffer[strlen(buffer)-1]=0;ssize_t n = write(wfd, buffer, strlen(buffer));assert(n == strlen(buffer));}close(wfd);return 0;
}
管道的读写特征
1. (读慢,写快)如果管道中没有了数据,读端在读,默认会直接阻塞当前正在读取的进程
2.(读快,写慢)管道是一个固定大小的缓冲区,写端写满时会阻塞,等待对方读取
3.(写端关闭,读到0)读端将数据读完后读到0结束进程。
4.读关闭,在写就没有意义了,OS会给写进程发送信号(13),将其终止。
管道的特征
1.管道的声明周期随进程
2.管道可以用来进行具有血缘关系的进程之间进行通信,常用于父子通信
3.管道是面向字节流的。按设置的最大字节数去读。
4.管道通信---半双工
sleep 1000 | sleep 2000
|:即为匿名管道
sleep的父进程为bash
综合案例
基于匿名管道的进程池设计
#include<iostream>
#include<vector>
#include<cassert>
#include<cstdio>
#include<cstring>
#include<ctime>
#include<string>
#include<unistd.h>
#include<sys/wait.h>
#include<sys/types.h>using namespace std;
#define PROCESS_NUM 5
#define MakeSeed() srand((unsigned long)time(nullptr))
typedef void(*func_t)();
void downLoadTask()
{cout<<getpid()<<" : downLoadTask()\n"<<endl;sleep(1);
}
void ioTask()
{cout<<getpid()<<" : ioTask()\n"<<endl;sleep(1);
}
void flushTask()
{cout<<getpid()<<" : flustTask()\n"<<endl;sleep(1);
}class subEp
{
public:subEp(pid_t subId,int writeFd):subId_(subId),writeFd_(writeFd){char nameBuffer[1024];snprintf(nameBuffer,sizeof nameBuffer,"process-%d[pid(%d)-fd(%d)]",num++,subId_,writeFd_);name_=nameBuffer;}
public:static int num;string name_;pid_t subId_;int writeFd_;
};
int subEp::num=0;void sendTask(const subEp& process,int taskNum)
{cout<<"send task num "<<taskNum<<" send to -> "<<process.name_<<endl;ssize_t n=write(process.writeFd_,&taskNum,sizeof(taskNum));assert(n==sizeof(int));
}
int recvTask(int readFd)
{int code=0;ssize_t s=read(readFd,&code,sizeof code);//assert(s==sizeof(int));if(s==4) return code;else if(s<=0) return -1;else return 0;
}
void createSubProcess(vector<subEp>* subs,vector<func_t>& funcMap)
{for(int i=0;i<PROCESS_NUM;++i){int fds[2];int n=pipe(fds);//父进程打开的文件是会被子进程共享的pid_t id=fork();if(id==0){//子进程close(fds[1]);while (true){//1.获取命令玛,如果没有,子进程应阻塞int commandCode = recvTask(fds[0]);//2.完成任务if(commandCode>0 && commandCode<funcMap.size()) funcMap[commandCode]();else if(commandCode==-1) break;;}exit(0);}close(fds[0]);subEp sub(id,fds[1]);subs->push_back(sub);}
}
void loadTaskFunc(vector<func_t>* out)
{assert(out);out->push_back(downLoadTask);out->push_back(ioTask);out->push_back(flushTask);
}
void loadBalanceContrl(const vector<subEp>& subs,const vector<func_t> &funcMap,int count)
{int processnum=subs.size();int tasknum=funcMap.size();//int cnt=subs.size();bool forever=(count==0)?true:false;while(true){//1.选择一个子进程--->vector<subEp> -> indexint subIdx=rand()%processnum;//2.选择一个任务---> vector<func_t>--->indexint taskIdx=rand()%tasknum;//3.任务发送给选择的进程sendTask(subs[subIdx],taskIdx);sleep(1);if(!forever){count--;if(count==0) break;}}//write quit -> read 0for(int i=0;i<processnum;++i) close(subs[i].writeFd_);
}
void waitProcess(vector<subEp> processes)
{int processnum=processes.size();for(int i=0;i<processnum;++i){waitpid(processes[i].subId_,nullptr,0);cout<<"wait sub process success ..."<<processes[i].subId_<<endl;}
}
int main()
{MakeSeed();//1.建立子进程并建立和子进程通信的信道vector<func_t> funcMap;loadTaskFunc(&funcMap);vector<subEp> subs;createSubProcess(&subs,funcMap);//2.父进程控制子进程int taskNum=3;loadBalanceContrl(subs,funcMap,taskNum);//3.回收子进程信息waitProcess(subs);return 0;
}