上一篇中讲解了在进程池文件传输的过程如何实现零拷贝,具体的方法包括使用mmap,sendfile,splice等等。
【Linux C | 网络编程】进程池零拷贝传输的实现详解(四)
这篇内容主要讲解进程池如何退出。
1.进程池的简单退出
processData_t *workerList;//需要改成全局变量
int workerNum;
void sigFunc(int signum){printf("signum = %d\n", signum);for(int i = 0; i < workerNum; ++i){kill(workerList[i].pid,SIGUSR1);}for(int i = 0; i < workerNum; ++i){wait(NULL);}puts("process pool is over!");exit(0);
}
int main(){
//..makeChild(workerList,workerNum);signal(SIGUSR1,sigFunc);//注意fork和signal的顺序
}
2.使用管道通知工作进程终止
异步执行与同步执行的区别
-
异步执行:异步操作是指当一个任务开始执行后,控制流可以继续执行下一个任务,而不需要等待当前任务完成。异步操作通常会通过回调函数、事件驱动机制或者Promise等方式来处理结果或通知任务的完成状态。在异步操作中,调用者通常不会立即等待操作的完成。
-
同步执行:同步操作是指一个任务开始执行后,调用者会一直等待任务完成,然后才能继续执行下一个任务。同步操作的执行顺序是按照代码的顺序依次执行的,直到当前任务完成。
异步拉起同步的应用场景和解释
异步拉起同步通常用来描述这样一种情况:
-
异步任务的启动:首先,某个操作或任务以异步的方式启动,这意味着调用者可以在任务启动后继续执行其他操作,而不必等待任务完成。
-
同步任务的结束:随后,在异步任务执行完毕后,系统或程序需要在某个点上进行同步操作,即等待这个异步任务完成并获取其结果,然后才能继续执行依赖于该结果的下一步操作。
进程池退出中的异步拉起同步的具体流程
-
异步提交任务:主进程使用进程池异步地提交多个任务给子进程执行。这些任务可以是函数、方法或者其他可调用对象。
-
任务执行:进程池管理器负责分配任务给空闲的子进程,并在子进程完成任务后收集返回的结果。
-
等待任务完成:一旦主进程不再提交新任务到进程池,它需要等待所有已提交的任务都完成。这时候,主进程通常会发出一个信号或者事件,表示进程池应该开始进入退出状态。
-
异步转同步:进程池收到退出信号后,开始等待所有任务完成。此时,进程池中的子进程继续处理它们的任务,主进程则通过某种机制(例如等待事件或轮询任务状态)来异步地等待所有子进程的任务完成。
-
进程池退出:一旦所有任务完成,进程池中的子进程会被优雅地退出。主进程也可以在这个时候执行一些清理工作,例如关闭文件、释放资源等。
简单点总结就是:
信号 + 匿名管道 =》 异步拉起同步
1. SIGUSR1 父进程获取信号后,在信号处理函数中,通过匿名管道进入epoll
2. 在epoll循环中,有两种方式让子进程退出
A. 在父进程中,直接调用kill函数,给子进程发送SIGUSR1信号(粗暴)
B. 在父进程中,通过sendFd函数,通知子进程退出(温和)在子进程中,通过recvFd函数,获取到退出标志位,然后退出,可以确保每一个任务都能够执行完毕。
粗暴的退出代码:
#include "process_pool.h"//退出使用的匿名管道
int exitPipe[2];void sighandler(int signum)
{printf("signum %d is coming.\n", signum);int one = 1;//父进程收到信号,往管道写端写入数据write(exitPipe[1], &one, sizeof(one));
}int main(int argc, char ** argv)
{//ip port processnumARGS_CHECK(argc, 4);int processNum = atoi(argv[3]);process_data * pProcess = calloc(processNum, sizeof(process_data));//让父子进程都忽略掉SIGPIPE信号//signal(SIGPIPE, SIG_IGN);//创建N个子进程makeChild(pProcess, processNum);//makechild函数之后,都是父进程的操作//在父进程中注册信号处理函数signal(SIGUSR1, sighandler);//只在父进程中创建退出的管道pipe(exitPipe);//创建监听的服务器int listenfd = tcpInit(argv[1], atoi(argv[2]));//创建epoll的实例int epfd = epoll_create1(0);ERROR_CHECK(epfd, -1, "epfd");//epoll监听ListenfdepollAddReadEvent(epfd, listenfd);//epoll监听进程池退出的管道读端exitPipe[0],epollAddReadEvent(epfd, exitPipe[0]);///epoll监听父子进程间通信的管道for(int i = 0; i < processNum; ++i) {epollAddReadEvent(epfd, pProcess[i].pipefd);}//定义保存就绪的文件描述符的数组struct epoll_event eventArr[10] = {0};int nready = 0;while(1){nready = epoll_wait(epfd, eventArr, sizeof(eventArr), -1);for(int i = 0; i < nready; ++i) {int fd = eventArr[i].data.fd; //新客户端到来if(fd == listenfd) {struct sockaddr_in clientaddr;socklen_t len = sizeof(clientaddr);int peerfd = accept(listenfd, (struct sockaddr*)&clientaddr, &len);ERROR_CHECK(peerfd, -1, "accept");printf("client %s:%d connected.\n",inet_ntoa(clientaddr.sin_addr), ntohs(clientaddr.sin_port));//将peerfd发送给一个空闲的子进程for(int j = 0; j < processNum; ++j) {if(pProcess[j].status == FREE) {sendFd(pProcess[j].pipefd, peerfd);pProcess[j].status = BUSY;break;}}//如果要断开与客户端的连接,这里还得执行一次close(peerfd);} else if(fd == exitPipe[0]) { //匿名管道读端有数据,说明父进程收到退出的信号,循环给子进程发出退出信号int howmany = 0;read(fd, &howmany, sizeof(howmany));//处理进程池退出的情况//第一种方式: 父进程给子进程发送SIGUSR1信号for(int j = 0; j < processNum; ++j) {kill(pProcess[j].pid, SIGUSR1);}for(int j = 0; j < processNum; ++j) {wait(NULL);}goto end;} else {//管道发生了事件: 子进程已经执行完任务了int howmany = 0;read(fd, &howmany, sizeof(howmany));for(int j = 0; j < processNum; ++j) {if(pProcess[j].pipefd == fd) {pProcess[j].status = FREE;printf("child %d is not busy.\n", pProcess[j].pid);break;}}}}}
end:printf("exit process pool.\n");free(pProcess);close(exitPipe[0]);close(exitPipe[1]);close(listenfd);close(epfd);return 0;
}
3.优雅退出
int sendFd(int pipeFd, int fdToSend, int exitFlag){struct msghdr hdr;bzero(&hdr,sizeof(struct msghdr));struct iovec iov[1];iov[0].iov_base = &exitFlag;iov[0].iov_len = sizeof(int);hdr.msg_iov = iov;hdr.msg_iovlen = 1;//...
}
int recvFd(int pipeFd, int *pFd, int *exitFlag){struct msghdr hdr;bzero(&hdr,sizeof(struct msghdr));struct iovec iov[1];iov[0].iov_base = exitFlag;iov[0].iov_len = sizeof(int);hdr.msg_iov = iov;hdr.msg_iovlen = 1;//.....
}