Linux网络编程5——多路IO转接

一.TCP状态时序理解

1.TCP状态理解

在这里插入图片描述

**CLOSED:**表示初始状态。

**LISTEN:**该状态表示服务器端的某个SOCKET处于监听状态,可以接受连接。

**SYN_SENT:**这个状态与SYN_RCVD遥相呼应,当客户端SOCKET执行CONNECT连接时,它首先发送SYN报文,随即进入到了SYN_SENT状态,并等待服务端的发送三次握手中的第2个报文。SYN_SENT状态表示客户端已发送SYN报文。

SYN_RCVD: 该状态表示接收到SYN报文,在正常情况下,这个状态是服务器端的SOCKET在建立TCP连接时的三次握手会话过程中的一个中间状态,很短暂。此种状态时,当收到客户端的ACK报文后,会进入到ESTABLISHED状态。

**ESTABLISHED:**表示连接已经建立。

FIN_WAIT_1: FIN_WAIT_1和FIN_WAIT_2状态的真正含义都是表示等待对方的FIN报文。区别是:

FIN_WAIT_1状态是当socket在ESTABLISHED状态时,想主动关闭连接,向对方发送了FIN报文,此时该socket进入到FIN_WAIT_1状态。

FIN_WAIT_2状态是当对方回应ACK后,该socket进入到FIN_WAIT_2状态,正常情况下,对方应马上回应ACK报文,所以FIN_WAIT_1状态一般较难见到,而FIN_WAIT_2状态可用netstat看到。

**FIN_WAIT_2:主动关闭链接的一方,发出FIN收到ACK以后进入该状态。称之为半连接或半关闭状态。**该状态下的socket只能接收数据,不能发。

TIME_WAIT: 表示收到了对方的FIN报文,并发送出了ACK报文,等2MSL后即可回到CLOSED可用状态。如果FIN_WAIT_1状态下,收到对方同时带 FIN标志和ACK标志的报文时,可以直接进入到TIME_WAIT状态,而无须经过FIN_WAIT_2状态。

CLOSING: 这种状态较特殊,属于一种较罕见的状态。正常情况下,当你发送FIN报文后,按理来说是应该先收到(或同时收到)对方的 ACK报文,再收到对方的FIN报文。但是CLOSING状态表示你发送FIN报文后,并没有收到对方的ACK报文,反而却也收到了对方的FIN报文。什么情况下会出现此种情况呢?如果双方几乎在同时close一个SOCKET的话,那么就出现了双方同时发送FIN报文的情况,也即会出现CLOSING状态,表示双方都正在关闭SOCKET连接。

CLOSE_WAIT: 此种状态表示在等待关闭。当对方关闭一个SOCKET后发送FIN报文给自己,系统会回应一个ACK报文给对方,此时则进入到CLOSE_WAIT状态。接下来呢,察看是否还有数据发送给对方,如果没有可以 close这个SOCKET,发送FIN报文给对方,即关闭连接。所以在CLOSE_WAIT状态下,需要关闭连接。

LAST_ACK: 该状态是被动关闭一方在发送FIN报文后,最后等待对方的ACK报文。当收到ACK报文后,即可以进入到CLOSED可用状态。

2.端口复用

在server的TCP连接没有完全断开之前不允许重新监听是不合理的。因为,TCP连接没有完全断开指的是connfd(127.0.0.1:6666)没有完全断开,而我们重新监听的是lis-tenfd(0.0.0.0:6666),虽然是占用同一个端口,但IP地址不同,connfd对应的是与某个客户端通讯的一个具体的IP地址,而listenfd对应的是wildcard address。解决这个问题的方法是使用setsockopt()设置socket描述符的选项SO_REUSEADDR为1,表示允许创建端口号相同但IP地址不同的多个socket描述符。

在server代码的socket()和bind()调用之间插入如下代码:

	int opt = 1;setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

半关闭:

通信双方中,只有一端关闭通信。  --- FIN_WAIT_2close(cfd);shutdown(int fd, int how);	how: 	SHUT_RD	关读端SHUT_WR	关写端SHUT_RDWR 关读写shutdown在关闭多个文件描述符应用的文件时,采用全关闭方法。close,只关闭一个。

3.多路IO转接思想

​ 上次学习的并发服务器技术,分别包括多进程并发服务器和多线程并发服务器,解决了有多个客户端来连接服务器的问题,但是在效率方面,如果没有客户端来连接,那么服务器一直在while循环里面阻塞监听(accept),非常影响效率。那么针对这种情况,我们可以在中间建立一个“秘书”,客户端想连接服务器就先通知秘书,秘书再通知服务器,这样服务器就不用一直阻塞在accept监听里面啦。

​ 这个“秘书”有三种方式实现:selectpollepoll。通过这种中间“秘书”方式的构建服务器,就是多路IO转接服务器,也叫多任务IO服务器,该类服务器实现的主旨思想是,不再由应用程序自己监视客户端连接,取而代之由内核替应用程序监视文件。

二.select多路IO转接服务器

1.函数分析

int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);nfds: 		监控的文件描述符集里最大文件描述符加1,因为此参数会告诉内核检测前多少个文件描述符的状态readfds:	监控有读数据到达文件描述符集合,传入传出参数writefds:	监控写数据到达文件描述符集合,传入传出参数exceptfds:	监控异常发生达文件描述符集合,如带外数据到达异常,传入传出参数timeout:	定时阻塞监控时间,3种情况1.NULL,永远等下去2.设置timeval,等待固定时间3.设置timeval里时间均为0,检查描述字后立即返回,轮询struct timeval {long tv_sec; /* seconds */long tv_usec; /* microseconds */};

知识点1:文件描述符

​ 在Unix-like系统中,文件描述符是一个广泛的概念,它用于表示几乎所有类型的文件、管道、目录以及特殊类型的文件,如socket套接字。因此,socket套接字确实是文件描述符的一种,它被用来在进程间或不同计算机之间进行网络通信。总之一句话,文件描述符大于套接字,就相当于套接字是文件描述符的一种。

在这里插入图片描述

​ 在Linux操作系统中,我们使用位图的方式描述文件描述符,而0、1、2三个文件描述符,已经被系统调用,所以一般3用来描述服务器的监听套接字,后面4、5、6……才用来与客户端建立连接。那么在进行select函数传参时,需要注意,传递与客户端通信的最大文件描述符+1,才能保证循环遍历所有的套接字。

知识点2:文件描述集合

readfdswritefdsexceptfds三个文件描述符集合,他们都是传入传出参数。在传入时,readfds里面表示的是需要监控有数据到达需要读的文件描述符;在传出时,其表示在需要监控有数据到达的文件描述符中,监测发生了数据到达事件的文件描述符。同理,在传入时,writefds里面表示的是需要监控有写数据达到文件的描述符;在传出时,其表示在需要监控有写数据到达的文件描述符中,监测发生了写数据到达事件的文件描述符。同理,在传入时,exceptfds里面表示的是需要监控是否有异常事件发生的文件描述符;在传出时,其表示在需要监控有异常事件发生的文件描述符中,检测到发生了异常事件的文件描述符。

知识点3:定时阻塞监控时间

	struct timeval {long tv_sec; /* 秒:seconds */long tv_usec; /* 微妙:microseconds */};

在其作为参数传递给select函数时,其有三种不同的含义。

知识点4:文件描述符集合操作函数

	void FD_CLR(int fd, fd_set *set); 	//把文件描述符集合里fd清0int FD_ISSET(int fd, fd_set *set); 	//测试文件描述符集合里fd是否置1void FD_SET(int fd, fd_set *set); 	//把文件描述符集合里fd位置1void FD_ZERO(fd_set *set); 			//把文件描述符集合里所有位清0

在这里插入图片描述

知识点5:返回值

  • > 0 :所有监听集合(3个)中,满足对应事件的总个数
  • 0:没有满足事件的文件描述符
  • -1:报错errno

2.思路分析

在这里插入图片描述

案例练习:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <ctype.h>#define ser_port 9527int main()
{int maxfd = 0;int lfd,cfd;int ret,nread,nwrite;char buf[BUFSIZ];lfd = socket(AF_INET,SOCK_STREAM,0);if(lfd == -1){perror("socket");return -1;}maxfd = lfd;struct sockaddr_in ser_addr,clt_addr;socklen_t clt_addr_len = sizeof(clt_addr);ser_addr.sin_family = AF_INET;ser_addr.sin_port = htons(ser_port);ser_addr.sin_addr.s_addr = htonl(INADDR_ANY);bind(lfd,(struct sockaddr *)&ser_addr,sizeof(ser_addr));listen(lfd,128);fd_set rset,allset;FD_ZERO(&allset);FD_SET(lfd,&allset);while(1){rset = allset;     //每次都从新设置的监控集开始ret = select(maxfd+1,&rset,NULL,NULL,NULL);if(ret > 0){if(FD_ISSET(lfd,&rset))           //说明有新的客户端请求建立连接{cfd = accept(lfd,(struct sockaddr *)&clt_addr,&clt_addr_len);FD_SET(cfd,&allset);if(maxfd < cfd)maxfd = cfd;if(--ret == 0)         //select只有一个返回,说明是lfd,不需要执行后续的操作continue;}}for(int i = lfd+1; i <= maxfd; i++){if(FD_ISSET(i,&rset)){nread = read(i,buf,sizeof(buf));if(nread == 0)    //检测到客户端关闭{close(i);FD_CLR(i,&allset);}else if(nread > 0){for(int j = 0;j<nread;j++){buf[j] = toupper(buf[j]);}write(i,buf,nread);write(STDIN_FILENO,buf,nread);}}}}close(lfd);return 0;
}

3.优缺点

  • 缺点:

    • 监听上限受文件描述符限制, 最大 1024
    • 检测满足条件的fd, 自己添加业务逻辑提高小。 提高了编码难度。
  • 优点:跨平台,win、linux、macOS、Unix、类Unix、mips

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <ctype.h>#include "wrap.h"#define SERV_PORT 6666int main(int argc, char *argv[])
{int i, j, n, maxi;int nready, client[FD_SETSIZE];                 /* 自定义数组client, 防止遍历1024个文件描述符  FD_SETSIZE默认为1024 */int maxfd, listenfd, connfd, sockfd;char buf[BUFSIZ], str[INET_ADDRSTRLEN];         /* #define INET_ADDRSTRLEN 16 */struct sockaddr_in clie_addr, serv_addr;socklen_t clie_addr_len;fd_set rset, allset;                            /* rset 读事件文件描述符集合 allset用来暂存 */listenfd = Socket(AF_INET, SOCK_STREAM, 0);int opt = 1;setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));bzero(&serv_addr, sizeof(serv_addr));serv_addr.sin_family= AF_INET;serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);serv_addr.sin_port= htons(SERV_PORT);Bind(listenfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));Listen(listenfd, 128);maxfd = listenfd;                                           /* 起初 listenfd 即为最大文件描述符 */maxi = -1;                                                  /* 将来用作client[]的下标, 初始值指向0个元素之前下标位置 */for (i = 0; i < FD_SETSIZE; i++)client[i] = -1;                                         /* 用-1初始化client[] */FD_ZERO(&allset);FD_SET(listenfd, &allset);                                  /* 构造select监控文件描述符集 */while (1) {   rset = allset;                                          /* 每次循环时都从新设置select监控信号集 */nready = select(maxfd+1, &rset, NULL, NULL, NULL);  //2  1--lfd  1--connfdif (nready < 0)perr_exit("select error");if (FD_ISSET(listenfd, &rset)) {                        /* 说明有新的客户端链接请求 */clie_addr_len = sizeof(clie_addr);connfd = Accept(listenfd, (struct sockaddr *)&clie_addr, &clie_addr_len);       /* Accept 不会阻塞 */printf("received from %s at PORT %d\n",inet_ntop(AF_INET, &clie_addr.sin_addr, str, sizeof(str)),ntohs(clie_addr.sin_port));for (i = 0; i < FD_SETSIZE; i++)if (client[i] < 0) {                            /* 找client[]中没有使用的位置 */client[i] = connfd;                         /* 保存accept返回的文件描述符到client[]里 */break;}if (i == FD_SETSIZE) {                              /* 达到select能监控的文件个数上限 1024 */fputs("too many clients\n", stderr);exit(1);}FD_SET(connfd, &allset);                            /* 向监控文件描述符集合allset添加新的文件描述符connfd */if (connfd > maxfd)maxfd = connfd;                                 /* select第一个参数需要 */if (i > maxi)maxi = i;                                       /* 保证maxi存的总是client[]最后一个元素下标 */if (--nready == 0)continue;} for (i = 0; i <= maxi; i++) {                               /* 检测哪个clients 有数据就绪 */if ((sockfd = client[i]) < 0)continue;if (FD_ISSET(sockfd, &rset)) {if ((n = Read(sockfd, buf, sizeof(buf))) == 0) {    /* 当client关闭链接时,服务器端也关闭对应链接 */Close(sockfd);FD_CLR(sockfd, &allset);                        /* 解除select对此文件描述符的监控 */client[i] = -1;} else if (n > 0) {for (j = 0; j < n; j++)buf[j] = toupper(buf[j]);Write(sockfd, buf, n);Write(STDOUT_FILENO, buf, n);}if (--nready == 0)break;                                          /* 跳出for, 但还在while中 */}}}Close(listenfd);return 0;
}

三.poll多路IO转接服务器

1.函数分析

#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);struct pollfd {int fd; /* 文件描述符 */short events; /* 监控的事件 */short revents; /* 监控事件中满足条件返回的事件 */};POLLIN			普通或带外优先数据可读,即POLLRDNORM | POLLRDBANDPOLLOUT			普通或带外数据可写POLLERR 		发生错误nfds 			监控数组中有多少文件描述符需要被监控timeout 		毫秒级等待-1:阻塞等,#define INFTIM -1 				Linux中没有定义此宏0:立即返回,不阻塞进程>0:等待指定毫秒数,如当前系统时间精度不够毫秒,向上取值

在这里插入图片描述

2.思路分析

在这里插入图片描述

3.read函数返回值补充

read 函数返回值:

  • >0:实际读到的字节数
  • =0: socket中,表示对端关闭。close()
  • -1
    • 如果 errno == EINTR 被异常终端。 需要重启。
    • 如果errno == EAGINEWOULDBLOCK 以非阻塞方式读数据,但是没有数据。 需要,再次读。
    • 如果 errno == ECONNRESET 说明连接被 重置。 需要 close(),移除监听队列。
    • 其他错误。

4.优缺点

优点:

  • 自带数组结构,可以将 监听事件集合 和 返回事件集合 分离。
  • 拓展 监听上限,超出 1024限制。

缺点:

  • 不能跨平台。 Linux
  • 无法直接定位满足监听事件的文件描述符, 编码难度较大。

5.突破1024文件描述符限制

可以使用cat命令查看一个进程可以打开的socket描述符上限:

	cat /proc/sys/fs/file-max

方式一:如有需要,可以通过修改配置文件的方式修改该上限值。

	sudo vi /etc/security/limits.conf

​ 在文件尾部写入以下配置,soft软限制,hard硬限制,可以在这里直接修改。如下图所示。

	* soft nofile 65536      --> 设置默认值, 可以直接借助命令修改。 【注销用户,使其生效】* hard nofile 100000    --> 命令修改上限。

方式二:可以通过命令的方式修改

ulimit -n num

这里的num不能超过硬件限制

四.epoll实现多路转接服务器

1.epoll相关函数

  1. 创建一个epoll句柄,参数size用来告诉内核监听的文件描述符的个数,跟内存大小有关。

    	int epoll_create(int size);						创建一棵监听红黑树size:创建的红黑树的监听节点数量。(仅供内核参考。)返回值:指向新创建的红黑树的根节点的 fd。 失败: -1 errno
    
  2. 控制某个epoll监控的文件描述符上的事件:注册、修改、删除。

    	#include <sys/epoll.h>int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)epfd:	为epoll_creat的句柄op:		表示动作,用3个宏来表示:EPOLL_CTL_ADD (注册新的fd到epfd)EPOLL_CTL_MOD (修改已经注册的fd的监听事件)EPOLL_CTL_DEL (从epfd删除一个fd);event:	告诉内核需要监听的事件struct epoll_event {__uint32_t events; /* Epoll events */epoll_data_t data; /* User data variable */};typedef union epoll_data {void *ptr;int fd;uint32_t u32;uint64_t u64;} epoll_data_t;EPOLLIN :	表示对应的文件描述符可以读(包括对端SOCKET正常关闭)EPOLLOUT:	表示对应的文件描述符可以写EPOLLPRI:	表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来)EPOLLERR:	表示对应的文件描述符发生错误EPOLLHUP:	表示对应的文件描述符被挂断;EPOLLET: 	将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)而言的EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
    
  3. 等待所监控文件描述符上有事件的产生,类似于select()调用。

    int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout); 	 阻塞监听。epfd:epoll_create 函数的返回值。 epfdevents:传出参数,【数组】, 满足监听条件的 哪些 fd 结构体。maxevents:数组 元素的总个数。 1024struct epoll_event evnets[1024]timeout:-1: 阻塞0: 不阻塞>0: 超时时间 (毫秒)返回值:> 0: 满足监听的 总个数。 可以用作循环上限。0: 没有fd满足监听事件-1:失败。 errno
    

2.思路分析

在这里插入图片描述

基础版:

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <errno.h>
#include <ctype.h>#include "wrap.h"#define MAXLINE 8192
#define SERV_PORT 8000#define OPEN_MAX 5000int main(int argc, char *argv[])
{int i, listenfd, connfd, sockfd;int  n, num = 0;ssize_t nready, efd, res;char buf[MAXLINE], str[INET_ADDRSTRLEN];socklen_t clilen;struct sockaddr_in cliaddr, servaddr;listenfd = Socket(AF_INET, SOCK_STREAM, 0);int opt = 1;setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));      //端口复用bzero(&servaddr, sizeof(servaddr));servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = htonl(INADDR_ANY);servaddr.sin_port = htons(SERV_PORT);Bind(listenfd, (struct sockaddr *) &servaddr, sizeof(servaddr));Listen(listenfd, 20);efd = epoll_create(OPEN_MAX);               //创建epoll模型, efd指向红黑树根节点if (efd == -1)perr_exit("epoll_create error");struct epoll_event tep, ep[OPEN_MAX];       //tep: epoll_ctl参数  ep[] : epoll_wait参数tep.events = EPOLLIN; tep.data.fd = listenfd;           //指定lfd的监听时间为"读"res = epoll_ctl(efd, EPOLL_CTL_ADD, listenfd, &tep);    //将lfd及对应的结构体设置到树上,efd可找到该树if (res == -1)perr_exit("epoll_ctl error");for ( ; ; ) {/*epoll为server阻塞监听事件, ep为struct epoll_event类型数组, OPEN_MAX为数组容量, -1表永久阻塞*/nready = epoll_wait(efd, ep, OPEN_MAX, -1); if (nready == -1)perr_exit("epoll_wait error");for (i = 0; i < nready; i++) {if (!(ep[i].events & EPOLLIN))      //如果不是"读"事件, 继续循环continue;if (ep[i].data.fd == listenfd) {    //判断满足事件的fd是不是lfd            clilen = sizeof(cliaddr);connfd = Accept(listenfd, (struct sockaddr *)&cliaddr, &clilen);    //接受链接printf("received from %s at PORT %d\n", inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)), ntohs(cliaddr.sin_port));printf("cfd %d---client %d\n", connfd, ++num);tep.events = EPOLLIN; tep.data.fd = connfd;res = epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &tep);      //加入红黑树if (res == -1)perr_exit("epoll_ctl error");} else {                                                    //不是lfd, sockfd = ep[i].data.fd;n = Read(sockfd, buf, MAXLINE);if (n == 0) {                                           //读到0,说明客户端关闭链接res = epoll_ctl(efd, EPOLL_CTL_DEL, sockfd, NULL);  //将该文件描述符从红黑树摘除if (res == -1)perr_exit("epoll_ctl error");Close(sockfd);                                      //关闭与该客户端的链接printf("client[%d] closed connection\n", sockfd);} else if (n < 0) {                                     //出错perror("read n < 0 error: ");res = epoll_ctl(efd, EPOLL_CTL_DEL, sockfd, NULL);  //摘除节点Close(sockfd);} else {                                                //实际读到了字节数for (i = 0; i < n; i++)buf[i] = toupper(buf[i]);                       //转大写,写回给客户端Write(STDOUT_FILENO, buf, n);Writen(sockfd, buf, n);}}}}Close(listenfd);Close(efd);return 0;
}

3.ET模式和LT模式

EPOLL事件有两种模型:

  • Edge Triggered (ET) 边缘触发只有数据到来才触发,不管缓存区中是否还有数据。

  • Level Triggered (LT) 水平触发只要有数据都会触发。

案例:

#include <stdio.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <errno.h>
#include <unistd.h>#define MAXLINE 10int main(int argc, char *argv[])
{int efd, i;int pfd[2];pid_t pid;char buf[MAXLINE], ch = 'a';pipe(pfd);pid = fork();if (pid == 0) {close(pfd[0]);while (1) {//aaaa\nfor (i = 0; i < MAXLINE/2; i++)buf[i] = ch;buf[i-1] = '\n';ch++;//bbbb\nfor (; i < MAXLINE; i++)buf[i] = ch;buf[i-1] = '\n';ch++;write(pfd[1], buf, sizeof(buf));sleep(2);}close(pfd[1]);} else if (pid > 0) bian{struct epoll_event event;struct epoll_event resevent[10];int res, len;close(pfd[1]);efd = epoll_create(10);/* event.events = EPOLLIN; */event.events = EPOLLIN | EPOLLET;		/* ET 边沿触发 ,默认是水平触发 */event.data.fd = pfd[0];epoll_ctl(efd, EPOLL_CTL_ADD, pfd[0], &event);while (1) {res = epoll_wait(efd, resevent, 10, -1);printf("res %d\n", res);if (resevent[0].data.fd == pfd[0]) {len = read(pfd[0], buf, MAXLINE/2);  /*这里因为只读取了一半,即aaaa\n,剩下的一半bbbb\n在水平触发时依旧可以wait返回继续读取,边沿触发时则需要等待下次子进程写事件才会读取*/write(STDOUT_FILENO, buf, len);}}close(pfd[0]);close(efd);} else {perror("fork");exit(-1);}return 0;
}

如果是想在网络socket模型中使用ET或LT模式,可以在如下代码中修改:

struct epoll_event event;
event.events = EPOLLIN | EPOLLET;		/* ET 边沿触发 ,默认是水平触发 */
event.data.fd = connfd;
epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &event);

即修改epoll_ctl函数的第四个参数即可。


在ET模式下,epoll只能使用非阻塞的模式:

#include <stdio.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>#define MAXLINE 10
#define SERV_PORT 8000int main(void)
{struct sockaddr_in servaddr, cliaddr;socklen_t cliaddr_len;int listenfd, connfd;char buf[MAXLINE];char str[INET_ADDRSTRLEN];int efd, flag;listenfd = socket(AF_INET, SOCK_STREAM, 0);bzero(&servaddr, sizeof(servaddr));servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = htonl(INADDR_ANY);servaddr.sin_port = htons(SERV_PORT);bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));listen(listenfd, 20);///struct epoll_event event;struct epoll_event res_event[10];int res, len;efd = epoll_create(10);event.events = EPOLLIN | EPOLLET;     /* ET 边沿触发,默认是水平触发 *///event.events = EPOLLIN;printf("Accepting connections ...\n");cliaddr_len = sizeof(cliaddr);connfd = accept(listenfd, (struct sockaddr *)&cliaddr, &cliaddr_len);printf("received from %s at PORT %d\n",inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)),ntohs(cliaddr.sin_port));flag = fcntl(connfd, F_GETFL);          /* 修改connfd为非阻塞读 */flag |= O_NONBLOCK;fcntl(connfd, F_SETFL, flag);event.data.fd = connfd;epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &event);      //将connfd加入监听红黑树while (1) {printf("epoll_wait begin\n");res = epoll_wait(efd, res_event, 10, -1);        //最多10个, 阻塞监听printf("epoll_wait end res %d\n", res);if (res_event[0].data.fd == connfd) {while ((len = read(connfd, buf, MAXLINE/2)) >0 )    //非阻塞读, 轮询write(STDOUT_FILENO, buf, len);}}return 0;
}

重点,使用fcntl函数设置为非阻塞读模式。

4.优缺点

  • 优点:高效,突破1024文件描述符。
  • 缺点:不能跨平台,Linux。

5.epoll反应堆模型

epoll反应堆是libevent库的核心实现思想,libevent是写多并发服务器必须了解的库,其可以跨平台支持。之所以叫反应堆,是形容他比较快,之所以快,是因为大量的函数回调。

epoll 反应堆模型:

epoll ET模式 + 非阻塞、轮询 + void *ptr。原来:	socket、bind、listen -- epoll_create 创建监听 红黑树 --  返回 epfd -- epoll_ctl() 向树上添加一个监听fd -- while(1)---- epoll_wait 监听 -- 对应监听fd有事件产生 -- 返回 监听满足数组。 -- 判断返回数组元素 -- lfd满足 -- Accept -- cfd 满足 -- read() --- 小->大 -- write回去。反应堆:不但要监听 cfd 的读事件、还要监听cfd的写事件。socket、bind、listen -- epoll_create 创建监听 红黑树 --  返回 epfd -- epoll_ctl() 向树上添加一个监听fd -- while(1)---- epoll_wait 监听 -- 对应监听fd有事件产生 -- 返回 监听满足数组。 -- 判断返回数组元素 -- lfd满足 -- Accept -- cfd 满足 -- read() --- 小->大 -- cfd从监听红黑树上摘下 -- EPOLLOUT -- 回调函数 -- epoll_ctl() -- EPOLL_CTL_ADD 重新放到红黑上监听写事件-- 等待 epoll_wait 返回 -- 说明 cfd 可写 -- write回去 -- cfd从监听红黑树上摘下 -- EPOLLIN -- epoll_ctl() -- EPOLL_CTL_ADD 重新放到红黑上监听读事件 -- epoll_wait 监听

反应堆代码:

/**epoll基于非阻塞I/O事件驱动*/
#include <stdio.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <time.h>#define MAX_EVENTS  1024                                    //监听上限数
#define BUFLEN 4096
#define SERV_PORT   8080void recvdata(int fd, int events, void *arg);
void senddata(int fd, int events, void *arg);/* 描述就绪文件描述符相关信息 */struct myevent_s {int fd;                                                 //要监听的文件描述符int events;                                             //对应的监听事件void *arg;                                              //泛型参数void (*call_back)(int fd, int events, void *arg);       //回调函数int status;                                             //是否在监听:1->在红黑树上(监听), 0->不在(不监听)char buf[BUFLEN];int len;long last_active;                                       //记录每次加入红黑树 g_efd 的时间值
};int g_efd;                                                  //全局变量, 保存epoll_create返回的文件描述符
struct myevent_s g_events[MAX_EVENTS+1];                    //自定义结构体类型数组. +1-->listen fd/*将结构体 myevent_s 成员变量 初始化*/void eventset(struct myevent_s *ev, int fd, void (*call_back)(int, int, void *), void *arg)
{ev->fd = fd;ev->call_back = call_back;ev->events = 0;ev->arg = arg;ev->status = 0;memset(ev->buf, 0, sizeof(ev->buf));ev->len = 0;ev->last_active = time(NULL);                       //调用eventset函数的时间return;
}/* 向 epoll监听的红黑树 添加一个 文件描述符 *///eventadd(efd, EPOLLIN, &g_events[MAX_EVENTS]);
void eventadd(int efd, int events, struct myevent_s *ev)
{struct epoll_event epv = {0, {0}};int op;epv.data.ptr = ev;epv.events = ev->events = events;       //EPOLLIN 或 EPOLLOUTif (ev->status == 0) {                                          //已经在红黑树 g_efd 里op = EPOLL_CTL_ADD;                 //将其加入红黑树 g_efd, 并将status置1ev->status = 1;}if (epoll_ctl(efd, op, ev->fd, &epv) < 0)                       //实际添加/修改printf("event add failed [fd=%d], events[%d]\n", ev->fd, events);elseprintf("event add OK [fd=%d], op=%d, events[%0X]\n", ev->fd, op, events);return ;
}/* 从epoll 监听的 红黑树中删除一个 文件描述符*/void eventdel(int efd, struct myevent_s *ev)
{struct epoll_event epv = {0, {0}};if (ev->status != 1)                                        //不在红黑树上return ;//epv.data.ptr = ev;epv.data.ptr = NULL;ev->status = 0;                                             //修改状态epoll_ctl(efd, EPOLL_CTL_DEL, ev->fd, &epv);                //从红黑树 efd 上将 ev->fd 摘除return ;
}/*  当有文件描述符就绪, epoll返回, 调用该函数 与客户端建立链接 */void acceptconn(int lfd, int events, void *arg)
{struct sockaddr_in cin;socklen_t len = sizeof(cin);int cfd, i;if ((cfd = accept(lfd, (struct sockaddr *)&cin, &len)) == -1) {if (errno != EAGAIN && errno != EINTR) {/* 暂时不做出错处理 */}printf("%s: accept, %s\n", __func__, strerror(errno));return ;}do {for (i = 0; i < MAX_EVENTS; i++)                                //从全局数组g_events中找一个空闲元素if (g_events[i].status == 0)                                //类似于select中找值为-1的元素break;                                                  //跳出 forif (i == MAX_EVENTS) {printf("%s: max connect limit[%d]\n", __func__, MAX_EVENTS);break;                                                      //跳出do while(0) 不执行后续代码}int flag = 0;if ((flag = fcntl(cfd, F_SETFL, O_NONBLOCK)) < 0) {             //将cfd也设置为非阻塞printf("%s: fcntl nonblocking failed, %s\n", __func__, strerror(errno));break;}/* 给cfd设置一个 myevent_s 结构体, 回调函数 设置为 recvdata */eventset(&g_events[i], cfd, recvdata, &g_events[i]);   eventadd(g_efd, EPOLLIN, &g_events[i]);                         //将cfd添加到红黑树g_efd中,监听读事件} while(0);printf("new connect [%s:%d][time:%ld], pos[%d]\n", inet_ntoa(cin.sin_addr), ntohs(cin.sin_port), g_events[i].last_active, i);return ;
}void recvdata(int fd, int events, void *arg)
{struct myevent_s *ev = (struct myevent_s *)arg;int len;len = recv(fd, ev->buf, sizeof(ev->buf), 0);            //读文件描述符, 数据存入myevent_s成员buf中eventdel(g_efd, ev);        //将该节点从红黑树上摘除if (len > 0) {ev->len = len;ev->buf[len] = '\0';                                //手动添加字符串结束标记printf("C[%d]:%s\n", fd, ev->buf);eventset(ev, fd, senddata, ev);                     //设置该 fd 对应的回调函数为 senddataeventadd(g_efd, EPOLLOUT, ev);                      //将fd加入红黑树g_efd中,监听其写事件} else if (len == 0) {close(ev->fd);/* ev-g_events 地址相减得到偏移元素位置 */printf("[fd=%d] pos[%ld], closed\n", fd, ev-g_events);} else {close(ev->fd);printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));}return;
}void senddata(int fd, int events, void *arg)
{struct myevent_s *ev = (struct myevent_s *)arg;int len;len = send(fd, ev->buf, ev->len, 0);                    //直接将数据 回写给客户端。未作处理eventdel(g_efd, ev);                                //从红黑树g_efd中移除if (len > 0) {printf("send[fd=%d], [%d]%s\n", fd, len, ev->buf);eventset(ev, fd, recvdata, ev);                     //将该fd的 回调函数改为 recvdataeventadd(g_efd, EPOLLIN, ev);                       //从新添加到红黑树上, 设为监听读事件} else {close(ev->fd);                                      //关闭链接printf("send[fd=%d] error %s\n", fd, strerror(errno));}return ;
}/*创建 socket, 初始化lfd */void initlistensocket(int efd, short port)
{struct sockaddr_in sin;int lfd = socket(AF_INET, SOCK_STREAM, 0);fcntl(lfd, F_SETFL, O_NONBLOCK);                                            //将socket设为非阻塞memset(&sin, 0, sizeof(sin));                                               //bzero(&sin, sizeof(sin))sin.sin_family = AF_INET;sin.sin_addr.s_addr = INADDR_ANY;sin.sin_port = htons(port);bind(lfd, (struct sockaddr *)&sin, sizeof(sin));listen(lfd, 20);/* void eventset(struct myevent_s *ev, int fd, void (*call_back)(int, int, void *), void *arg);  */eventset(&g_events[MAX_EVENTS], lfd, acceptconn, &g_events[MAX_EVENTS]);/* void eventadd(int efd, int events, struct myevent_s *ev) */eventadd(efd, EPOLLIN, &g_events[MAX_EVENTS]);return ;
}int main(int argc, char *argv[])
{unsigned short port = SERV_PORT;if (argc == 2)port = atoi(argv[1]);                           //使用用户指定端口.如未指定,用默认端口g_efd = epoll_create(MAX_EVENTS+1);                 //创建红黑树,返回给全局 g_efd if (g_efd <= 0)printf("create efd in %s err %s\n", __func__, strerror(errno));initlistensocket(g_efd, port);                      //初始化监听socketstruct epoll_event events[MAX_EVENTS+1];            //保存已经满足就绪事件的文件描述符数组 printf("server running:port[%d]\n", port);int checkpos = 0, i;while (1) {/* 超时验证,每次测试100个链接,不测试listenfd 当客户端60秒内没有和服务器通信,则关闭此客户端链接 */long now = time(NULL);                          //当前时间for (i = 0; i < 100; i++, checkpos++) {         //一次循环检测100个。 使用checkpos控制检测对象if (checkpos == MAX_EVENTS)checkpos = 0;if (g_events[checkpos].status != 1)         //不在红黑树 g_efd 上continue;long duration = now - g_events[checkpos].last_active;       //客户端不活跃的世间if (duration >= 60) {close(g_events[checkpos].fd);                           //关闭与该客户端链接printf("[fd=%d] timeout\n", g_events[checkpos].fd);eventdel(g_efd, &g_events[checkpos]);                   //将该客户端 从红黑树 g_efd移除}}/*监听红黑树g_efd, 将满足的事件的文件描述符加至events数组中, 1秒没有事件满足, 返回 0*/int nfd = epoll_wait(g_efd, events, MAX_EVENTS+1, 1000);if (nfd < 0) {printf("epoll_wait error, exit\n");break;}for (i = 0; i < nfd; i++) {/*使用自定义结构体myevent_s类型指针, 接收 联合体data的void *ptr成员*/struct myevent_s *ev = (struct myevent_s *)events[i].data.ptr;  if ((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)) {           //读就绪事件ev->call_back(ev->fd, events[i].events, ev->arg);//lfd  EPOLLIN  }if ((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)) {         //写就绪事件ev->call_back(ev->fd, events[i].events, ev->arg);}}}/* 退出前释放所有资源 */return 0;
}

6.保活机制

客户端和服务器通信,服务器怎么知道客户端掉线?

  • 心跳包
  • 乒乓包
  • 设置TCP属性(探测分节)

另外推荐一个查看代码的工具:

在这里插入图片描述

五.线程池

1.线程池思想

在这里插入图片描述


struct threadpool_t {pthread_mutex_t lock;               /* 用于锁住本结构体 */    pthread_mutex_t thread_counter;     /* 记录忙状态线程个数de琐 -- busy_thr_num */pthread_cond_t queue_not_full;      /* 当任务队列满时,添加任务的线程阻塞,等待此条件变量 */pthread_cond_t queue_not_empty;     /* 任务队列里不为空时,通知等待任务的线程 */pthread_t *threads;                 /* 存放线程池中每个线程的tid。数组 */pthread_t adjust_tid;               /* 存管理线程tid */threadpool_task_t *task_queue;      /* 任务队列(数组首地址) */int min_thr_num;                    /* 线程池最小线程数 */int max_thr_num;                    /* 线程池最大线程数 */int live_thr_num;                   /* 当前存活线程个数 */int busy_thr_num;                   /* 忙状态线程个数 */int wait_exit_thr_num;              /* 要销毁的线程个数 */int queue_front;                    /* task_queue队头下标 */int queue_rear;                     /* task_queue队尾下标 */int queue_size;                     /* task_queue队中实际任务数 */int queue_max_size;                 /* task_queue队列可容纳任务数上限 */int shutdown;                       /* 标志位,线程池使用状态,true或false */
};typedef struct {void *(*function)(void *);          /* 函数指针,回调函数 */void *arg;                          /* 上面函数的参数 */} threadpool_task_t;                    /* 各子线程任务结构体 */

2.搭建线程池步骤

线程池模块分析:

1. main();		创建线程池。向线程池中添加任务。 借助回调处理任务。销毁线程池。2. pthreadpool_create();创建线程池结构体 指针。初始化线程池结构体 {  N 个成员变量 }创建 N 个任务线程。创建 1 个管理者线程。失败时,销毁开辟的所有空间。(释放)3. threadpool_thread()进入子线程回调函数。接收参数 void *arg  --》 pool 结构体加锁 --》lock --》 整个结构体锁判断条件变量 --》 wait  -------------------1704. adjust_thread()循环 10 s 执行一次。进入管理者线程回调函数接收参数 void *arg  --》 pool 结构体加锁 --》lock --》 整个结构体锁获取管理线程池要用的到 变量。	task_num, live_num, busy_num根据既定算法,使用上述3变量,判断是否应该 创建、销毁线程池中 指定步长的线程。5. threadpool_add ()总功能:模拟产生任务。   num[20]设置回调函数, 处理任务。  sleep(1) 代表处理完成。内部实现:加锁初始化 任务队列结构体成员。   回调函数 function, arg利用环形队列机制,实现添加任务。 借助队尾指针挪移 % 实现。唤醒阻塞在 条件变量上的线程。解锁6.  从 3. 中的wait之后继续执行,处理任务。加锁获取 任务处理回调函数,及参数利用环形队列机制,实现处理任务。 借助队头指针挪移 % 实现。唤醒阻塞在 条件变量 上的 server。解锁加锁 改忙线程数++解锁执行处理任务的线程加锁 改忙线程数——解锁7. 创建 销毁线程管理者线程根据 task_num, live_num, busy_num  根据既定算法,使用上述3变量,判断是否应该 创建、销毁线程池中 指定步长的线程。如果满足 创建条件pthread_create();   回调 任务线程函数。		live_num++如果满足 销毁条件wait_exit_thr_num = 10;  signal 给 阻塞在条件变量上的线程 发送 假条件满足信号    跳转至  --170 wait阻塞线程会被 假信号 唤醒。判断: wait_exit_thr_num  > 0 pthread_exit();          

3.代码

threadpool.c

#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <signal.h>
#include <errno.h>
#include "threadpool.h"#define DEFAULT_TIME 10                 /*10s检测一次*/
#define MIN_WAIT_TASK_NUM 10            /*如果queue_size > MIN_WAIT_TASK_NUM 添加新的线程到线程池*/ 
#define DEFAULT_THREAD_VARY 10          /*每次创建和销毁线程的个数*/
#define true 1
#define false 0typedef struct {void *(*function)(void *);          /* 函数指针,回调函数 */void *arg;                          /* 上面函数的参数 */
} threadpool_task_t;                    /* 各子线程任务结构体 *//* 描述线程池相关信息 */struct threadpool_t {pthread_mutex_t lock;               /* 用于锁住本结构体 */    pthread_mutex_t thread_counter;     /* 记录忙状态线程个数de琐 -- busy_thr_num */pthread_cond_t queue_not_full;      /* 当任务队列满时,添加任务的线程阻塞,等待此条件变量 */pthread_cond_t queue_not_empty;     /* 任务队列里不为空时,通知等待任务的线程 */pthread_t *threads;                 /* 存放线程池中每个线程的tid。数组 */pthread_t adjust_tid;               /* 存管理线程tid */threadpool_task_t *task_queue;      /* 任务队列(数组首地址) */int min_thr_num;                    /* 线程池最小线程数 */int max_thr_num;                    /* 线程池最大线程数 */int live_thr_num;                   /* 当前存活线程个数 */int busy_thr_num;                   /* 忙状态线程个数 */int wait_exit_thr_num;              /* 要销毁的线程个数 */int queue_front;                    /* task_queue队头下标 */int queue_rear;                     /* task_queue队尾下标 */int queue_size;                     /* task_queue队中实际任务数 */int queue_max_size;                 /* task_queue队列可容纳任务数上限 */int shutdown;                       /* 标志位,线程池使用状态,true或false */
};void *threadpool_thread(void *threadpool);void *adjust_thread(void *threadpool);int is_thread_alive(pthread_t tid);
int threadpool_free(threadpool_t *pool);//threadpool_create(3,100,100);  
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
{int i;threadpool_t *pool = NULL;          /* 线程池 结构体 */do {if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) {  printf("malloc threadpool fail");break;                                      /*跳出do while*/}pool->min_thr_num = min_thr_num;pool->max_thr_num = max_thr_num;pool->busy_thr_num = 0;pool->live_thr_num = min_thr_num;               /* 活着的线程数 初值=最小线程数 */pool->wait_exit_thr_num = 0;pool->queue_size = 0;                           /* 有0个产品 */pool->queue_max_size = queue_max_size;          /* 最大任务队列数 */pool->queue_front = 0;pool->queue_rear = 0;pool->shutdown = false;                         /* 不关闭线程池 *//* 根据最大线程上限数, 给工作线程数组开辟空间, 并清零 */pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num); if (pool->threads == NULL) {printf("malloc threads fail");break;}memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);/* 给 任务队列 开辟空间 */pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);if (pool->task_queue == NULL) {printf("malloc task_queue fail");break;}/* 初始化互斥琐、条件变量 */if (pthread_mutex_init(&(pool->lock), NULL) != 0|| pthread_mutex_init(&(pool->thread_counter), NULL) != 0|| pthread_cond_init(&(pool->queue_not_empty), NULL) != 0|| pthread_cond_init(&(pool->queue_not_full), NULL) != 0){printf("init the lock or cond fail");break;}/* 启动 min_thr_num 个 work thread */for (i = 0; i < min_thr_num; i++) {pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);   /*pool指向当前线程池*/printf("start thread 0x%x...\n", (unsigned int)pool->threads[i]);}pthread_create(&(pool->adjust_tid), NULL, adjust_thread, (void *)pool);     /* 创建管理者线程 */return pool;} while (0);threadpool_free(pool);      /* 前面代码调用失败时,释放poll存储空间 */return NULL;
}/* 向线程池中 添加一个任务 */
//threadpool_add(thp, process, (void*)&num[i]);   /* 向线程池中添加任务 process: 小写---->大写*/int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg)
{pthread_mutex_lock(&(pool->lock));/* ==为真,队列已经满, 调wait阻塞 */while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)) {pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));}if (pool->shutdown) {pthread_cond_broadcast(&(pool->queue_not_empty));pthread_mutex_unlock(&(pool->lock));return 0;}/* 清空 工作线程 调用的回调函数 的参数arg */if (pool->task_queue[pool->queue_rear].arg != NULL) {pool->task_queue[pool->queue_rear].arg = NULL;}/*添加任务到任务队列里*/pool->task_queue[pool->queue_rear].function = function;pool->task_queue[pool->queue_rear].arg = arg;pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size;       /* 队尾指针移动, 模拟环形 */pool->queue_size++;/*添加完任务后,队列不为空,唤醒线程池中 等待处理任务的线程*/pthread_cond_signal(&(pool->queue_not_empty));pthread_mutex_unlock(&(pool->lock));return 0;
}/* 线程池中各个工作线程 */
void *threadpool_thread(void *threadpool)
{threadpool_t *pool = (threadpool_t *)threadpool;threadpool_task_t task;while (true) {/* Lock must be taken to wait on conditional variable *//*刚创建出线程,等待任务队列里有任务,否则阻塞等待任务队列里有任务后再唤醒接收任务*/pthread_mutex_lock(&(pool->lock));/*queue_size == 0 说明没有任务,调 wait 阻塞在条件变量上, 若有任务,跳过该while*/while ((pool->queue_size == 0) && (!pool->shutdown)) {  printf("thread 0x%x is waiting\n", (unsigned int)pthread_self());pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));/*清除指定数目的空闲线程,如果要结束的线程个数大于0,结束线程*/if (pool->wait_exit_thr_num > 0) {pool->wait_exit_thr_num--;/*如果线程池里线程个数大于最小值时可以结束当前线程*/if (pool->live_thr_num > pool->min_thr_num) {printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());pool->live_thr_num--;pthread_mutex_unlock(&(pool->lock));pthread_exit(NULL);}}}/*如果指定了true,要关闭线程池里的每个线程,自行退出处理---销毁线程池*/if (pool->shutdown) {pthread_mutex_unlock(&(pool->lock));printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());pthread_detach(pthread_self());pthread_exit(NULL);     /* 线程自行结束 */}/*从任务队列里获取任务, 是一个出队操作*/task.function = pool->task_queue[pool->queue_front].function;task.arg = pool->task_queue[pool->queue_front].arg;pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size;       /* 出队,模拟环形队列 */pool->queue_size--;/*通知可以有新的任务添加进来*/pthread_cond_broadcast(&(pool->queue_not_full));/*任务取出后,立即将 线程池琐 释放*/pthread_mutex_unlock(&(pool->lock));/*执行任务*/ printf("thread 0x%x start working\n", (unsigned int)pthread_self());pthread_mutex_lock(&(pool->thread_counter));                            /*忙状态线程数变量琐*/pool->busy_thr_num++;                                                   /*忙状态线程数+1*/pthread_mutex_unlock(&(pool->thread_counter));(*(task.function))(task.arg);                                           /*执行回调函数任务*///task.function(task.arg);                                              /*执行回调函数任务*//*任务结束处理*/ printf("thread 0x%x end working\n", (unsigned int)pthread_self());pthread_mutex_lock(&(pool->thread_counter));pool->busy_thr_num--;                                       /*处理掉一个任务,忙状态数线程数-1*/pthread_mutex_unlock(&(pool->thread_counter));}pthread_exit(NULL);
}/* 管理线程 */
void *adjust_thread(void *threadpool)
{int i;threadpool_t *pool = (threadpool_t *)threadpool;while (!pool->shutdown) {sleep(DEFAULT_TIME);                                    /*定时 对线程池管理*/pthread_mutex_lock(&(pool->lock));int queue_size = pool->queue_size;                      /* 关注 任务数 */int live_thr_num = pool->live_thr_num;                  /* 存活 线程数 */pthread_mutex_unlock(&(pool->lock));pthread_mutex_lock(&(pool->thread_counter));int busy_thr_num = pool->busy_thr_num;                  /* 忙着的线程数 */pthread_mutex_unlock(&(pool->thread_counter));/* 创建新线程 算法: 任务数大于最小线程池个数, 且存活的线程数少于最大线程个数时 如:30>=10 && 40<100*/if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num) {pthread_mutex_lock(&(pool->lock));  int add = 0;/*一次增加 DEFAULT_THREAD 个线程*/for (i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY&& pool->live_thr_num < pool->max_thr_num; i++) {if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])) {pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);add++;pool->live_thr_num++;}}pthread_mutex_unlock(&(pool->lock));}/* 销毁多余的空闲线程 算法:忙线程X2 小于 存活的线程数 且 存活的线程数 大于 最小线程数时*/if ((busy_thr_num * 2) < live_thr_num  &&  live_thr_num > pool->min_thr_num) {/* 一次销毁DEFAULT_THREAD个线程, 隨機10個即可 */pthread_mutex_lock(&(pool->lock));pool->wait_exit_thr_num = DEFAULT_THREAD_VARY;      /* 要销毁的线程数 设置为10 */pthread_mutex_unlock(&(pool->lock));for (i = 0; i < DEFAULT_THREAD_VARY; i++) {/* 通知处在空闲状态的线程, 他们会自行终止*/pthread_cond_signal(&(pool->queue_not_empty));}}}return NULL;
}int threadpool_destroy(threadpool_t *pool)
{int i;if (pool == NULL) {return -1;}pool->shutdown = true;/*先销毁管理线程*/pthread_join(pool->adjust_tid, NULL);for (i = 0; i < pool->live_thr_num; i++) {/*通知所有的空闲线程*/pthread_cond_broadcast(&(pool->queue_not_empty));}for (i = 0; i < pool->live_thr_num; i++) {pthread_join(pool->threads[i], NULL);}threadpool_free(pool);return 0;
}int threadpool_free(threadpool_t *pool)
{if (pool == NULL) {return -1;}if (pool->task_queue) {free(pool->task_queue);}if (pool->threads) {free(pool->threads);pthread_mutex_lock(&(pool->lock));pthread_mutex_destroy(&(pool->lock));pthread_mutex_lock(&(pool->thread_counter));pthread_mutex_destroy(&(pool->thread_counter));pthread_cond_destroy(&(pool->queue_not_empty));pthread_cond_destroy(&(pool->queue_not_full));}free(pool);pool = NULL;return 0;
}int threadpool_all_threadnum(threadpool_t *pool)
{int all_threadnum = -1;                 // 总线程数pthread_mutex_lock(&(pool->lock));all_threadnum = pool->live_thr_num;     // 存活线程数pthread_mutex_unlock(&(pool->lock));return all_threadnum;
}int threadpool_busy_threadnum(threadpool_t *pool)
{int busy_threadnum = -1;                // 忙线程数pthread_mutex_lock(&(pool->thread_counter));busy_threadnum = pool->busy_thr_num;    pthread_mutex_unlock(&(pool->thread_counter));return busy_threadnum;
}int is_thread_alive(pthread_t tid)
{int kill_rc = pthread_kill(tid, 0);     //发0号信号,测试线程是否存活if (kill_rc == ESRCH) {return false;}return true;
}/*测试*/ #if 1/* 线程池中的线程,模拟处理业务 */
void *process(void *arg)
{printf("thread 0x%x working on task %d\n ",(unsigned int)pthread_self(),(int)arg);sleep(1);                           //模拟 小---大写printf("task %d is end\n",(int)arg);return NULL;
}int main(void)
{/*threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);*/threadpool_t *thp = threadpool_create(3,100,100);   /*创建线程池,池里最小3个线程,最大100,队列最大100*/printf("pool inited");//int *num = (int *)malloc(sizeof(int)*20);int num[20], i;for (i = 0; i < 20; i++) {num[i] = i;printf("add task %d\n",i);/*int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg) */threadpool_add(thp, process, (void*)&num[i]);   /* 向线程池中添加任务 */}sleep(10);                                          /* 等子线程完成任务 */threadpool_destroy(thp);return 0;
}#endif

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/971.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Linux网络编程】数据链路层 | MAC帧 | ARP协议

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站 &#x1f308;个人主页&#xff1a; 南桥几晴秋 &#x1f308;C专栏&#xff1a; 南桥谈C &#x1f308;C语言专栏&#xff1a; C语言学习系…

React Fiber框架中的Render渲染阶段——workLoop(performUnitOfWork【beginWork与completeWork】)

触发渲染过程——renderRoot renderRoot 是一个函数&#xff0c;用于触发渲染工作。它通常会调用并递归地执行一系列的渲染任务&#xff0c;直到完成整个更新过程。这个过程包括执行 Fiber 树中的 beginWork 和 completeWork&#xff0c;以及渲染新状态或 DOM。 function ren…

STM32裸机开发转FreeRTOS教程

目录 1. 简介2. RTOS设置&#xff08;1&#xff09;分配内存&#xff08;2&#xff09;查看任务剩余空间&#xff08;3&#xff09;使用osDelay 3. 队列的使用&#xff08;1&#xff09;创建队列&#xff08;1&#xff09;直接传值和指针传值&#xff08;2&#xff09;发送/接收…

Elasticsearch快速入门

Elasticsearch是由elastic公司开发的一套搜索引擎技术&#xff0c;它是elastic技术栈中的一部分,提供核心的数据存储、搜索、分析功能 elasticsearch之所以有如此高性能的搜索表现&#xff0c;正是得益于底层的倒排索引技术。那么什么是倒排索引呢&#xff1f; Elasticsearch…

新版AndroidStudio通过系统快捷创建带BottomNavigationView的项目踩坑记录

选择上面这个玩意创建的项目 坑点1 &#xff1a;配置的写法和不一样了 镜像的写法&#xff1a; 新的settings.gradle.kts中配置镜像的代码&#xff1a; pluginManagement {repositories {mavenCentral()google {content {includeGroupByRegex("com\\.android.*")…

Unity 自定义批量打包工具

打包配置项 using UnityEngine; using System.Collections.Generic;namespace MYTOOL.Build {/// <summary>/// 批量打包配置文件/// </summary>[CreateAssetMenu]public class BatchBuildProfile : ScriptableObject{public List<BuildTask> tasks new Li…

【JVM-2.3】深入解析JVisualVM:Java性能监控与调优利器

在Java应用的开发和运维过程中&#xff0c;性能监控与调优是不可或缺的环节。无论是排查内存泄漏、分析CPU瓶颈&#xff0c;还是优化线程使用&#xff0c;开发者都需要借助一些强大的工具来辅助诊断。JVisualVM 正是这样一款由Oracle提供的免费工具&#xff0c;它集成了多种性能…

基于大语言模型的组合优化

摘要&#xff1a;组合优化&#xff08;Combinatorial Optimization, CO&#xff09;对于提高工程应用的效率和性能至关重要。随着问题规模的增大和依赖关系的复杂化&#xff0c;找到最优解变得极具挑战性。在处理现实世界的工程问题时&#xff0c;基于纯数学推理的算法存在局限…

计算机网络 (40)域名系统DNS

前言 计算机网络域名系统DNS&#xff08;Domain Name System&#xff09;是互联网的基础技术之一&#xff0c;它负责将人类可读的域名转换为计算机用来通信的数字IP地址。 一、基本概念 DNS的主要目的是将域名解析或翻译为IP地址&#xff0c;使得用户可以通过简单易记的域名来访…

说一说mongodb组合索引的匹配规则

一、背景 有一张1000多万条记录的大表&#xff0c;需要做归档至历史表&#xff0c;出现了大量慢查询。 查询条件是 "classroomId": {$in: ["xxx", "xxx", ..... "xxx","xxx", "xxx" ] }耗时近5秒&#xff0c;且…

C# OpenCV机器视觉:转速测量

在一个看似平常却又暗藏神秘能量的日子里&#xff0c;阿杰正在他那充满科技感的实验室里&#xff0c;对着一堆奇奇怪怪的仪器发呆。突然&#xff0c;手机铃声如一道凌厉的剑气划破寂静&#xff0c;原来是工厂的赵厂长打来的紧急电话&#xff1a;“阿杰啊&#xff0c;咱们工厂新…

【RedisStack】Linux安装指南

【RedisStack】Linux安装指南.md 前言下载解压创建启动文件设置密码把密码设置到环境变量启动/停止相关命令测试&验证官网资料参考资料 前言 Redis Stack是使用Redis的最佳起点。我们将我们必须提供的最好的技术捆绑在一起&#xff0c;形成一个易于使用的软件包。Redis St…

2025-微服务—SpringCloud-1~3

2025-微服务—SpringCloud 第一章、从Boot和Cloud版本选型开始说起1、Springboot版本2、Springcloud版本3、Springcloud Alibaba4、本次讲解定稿版 第二章 关于Cloud各种组件的停更/升级/替换1、微服务介绍2、SpringCloud是什么&#xff1f;能干吗&#xff1f;产生背景&#xf…

深度学习-卷积神经网络反向传播梯度公式推导

这篇文章非常棒&#xff0c;单样本单通道的反向传播梯度公式推导我都理解了。为了防止找不到原网页&#xff0c;所以特复制于此 参考&#xff1a; https://zhuanlan.zhihu.com/p/640697443

MongoDB实践

MongoDB 是什么&#xff1f;— MongoDB 手册 v8.0 现在有一个名为city的集合&#xff0c;里面的结构如下图 一、增删改查操作 1.查询find db.getCollection("city").find({})db.city.find({})db.city.find({city:"广州" });db.city.find({city_id:17,ci…

mycat介绍与操作步骤

文章目录 1.分库分表2.mycat 入门2.1 概述2.2 案例&#xff1a;水平分表1&#xff09;准备工作2&#xff09;配置3&#xff09;启动并测试 3.mycat 配置详解3.1 schema.xml3.2 rule.xml3.3 server.xml 4.mycat 分片&#xff1a;垂直拆分1&#xff09;准备工作2&#xff09;配置…

苹果手机(IOS系统)出现安全延迟进行中如何关闭?

苹果手机&#xff08;IOS系统&#xff09;出现安全延迟进行中如何关闭&#xff1f; 一、设置二、隐私与安全性三、失窃设备保护关闭 一、设置 二、隐私与安全性 三、失窃设备保护关闭

线形回归与小批量梯度下降实例

1、准备数据集 import numpy as np import matplotlib.pyplot as pltfrom torch.utils.data import DataLoader from torch.utils.data import TensorDataset######################################################################### #################准备若干个随机的x和…

【Unity3D日常开发】Unity3D中打开Window文件对话框打开文件(PC版)

推荐阅读 CSDN主页GitHub开源地址Unity3D插件分享QQ群&#xff1a;398291828小红书小破站 大家好&#xff0c;我是佛系工程师☆恬静的小魔龙☆&#xff0c;不定时更新Unity开发技巧&#xff0c;觉得有用记得一键三连哦。 一、前言 这篇文章继续讲如何使用Unity3D打开Window文…

iOS 逆向学习 - Inter-Process Communication:进程间通信

iOS 逆向学习 - Inter-Process Communication&#xff1a;进程间通信 一、进程间通信概要二、iOS 进程间通信机制详解1. URL Schemes2. Pasteboard3. App Groups 和 Shared Containers4. XPC Services 三、不同进程间通信机制的差异四、总结 一、进程间通信概要 进程间通信&am…