一.TCP状态时序理解
1.TCP状态理解
**CLOSED:**表示初始状态。
**LISTEN:**该状态表示服务器端的某个SOCKET处于监听状态,可以接受连接。
**SYN_SENT:**这个状态与SYN_RCVD遥相呼应,当客户端SOCKET执行CONNECT连接时,它首先发送SYN报文,随即进入到了SYN_SENT状态,并等待服务端的发送三次握手中的第2个报文。SYN_SENT状态表示客户端已发送SYN报文。
SYN_RCVD: 该状态表示接收到SYN报文,在正常情况下,这个状态是服务器端的SOCKET在建立TCP连接时的三次握手会话过程中的一个中间状态,很短暂。此种状态时,当收到客户端的ACK报文后,会进入到ESTABLISHED状态。
**ESTABLISHED:**表示连接已经建立。
FIN_WAIT_1: FIN_WAIT_1和FIN_WAIT_2状态的真正含义都是表示等待对方的FIN报文。区别是:
FIN_WAIT_1状态是当socket在ESTABLISHED状态时,想主动关闭连接,向对方发送了FIN报文,此时该socket进入到FIN_WAIT_1状态。
FIN_WAIT_2状态是当对方回应ACK后,该socket进入到FIN_WAIT_2状态,正常情况下,对方应马上回应ACK报文,所以FIN_WAIT_1状态一般较难见到,而FIN_WAIT_2状态可用netstat看到。
**FIN_WAIT_2:主动关闭链接的一方,发出FIN收到ACK以后进入该状态
。称之为半连接或半关闭状态。**该状态下的socket只能接收数据,不能发。
TIME_WAIT: 表示收到了对方的FIN报文,并发送出了ACK报文,等2MSL后即可回到CLOSED可用状态。如果FIN_WAIT_1状态下,收到对方同时带 FIN标志和ACK标志的报文时,可以直接进入到TIME_WAIT状态,而无须经过FIN_WAIT_2状态。
CLOSING: 这种状态较特殊,属于一种较罕见的状态。正常情况下,当你发送FIN报文后,按理来说是应该先收到(或同时收到)对方的 ACK报文,再收到对方的FIN报文。但是CLOSING状态表示你发送FIN报文后,并没有收到对方的ACK报文,反而却也收到了对方的FIN报文。什么情况下会出现此种情况呢?如果双方几乎在同时close一个SOCKET的话,那么就出现了双方同时发送FIN报文的情况,也即会出现CLOSING状态,表示双方都正在关闭SOCKET连接。
CLOSE_WAIT: 此种状态表示在等待关闭。当对方关闭一个SOCKET后发送FIN报文给自己,系统会回应一个ACK报文给对方,此时则进入到CLOSE_WAIT状态。接下来呢,察看是否还有数据发送给对方,如果没有可以 close这个SOCKET,发送FIN报文给对方,即关闭连接。所以在CLOSE_WAIT状态下,需要关闭连接。
LAST_ACK: 该状态是被动关闭一方在发送FIN报文后,最后等待对方的ACK报文。当收到ACK报文后,即可以进入到CLOSED可用状态。
2.端口复用
在server的TCP连接没有完全断开之前不允许重新监听是不合理的。因为,TCP连接没有完全断开指的是connfd(127.0.0.1:6666)没有完全断开,而我们重新监听的是lis-tenfd(0.0.0.0:6666),虽然是占用同一个端口,但IP地址不同,connfd对应的是与某个客户端通讯的一个具体的IP地址,而listenfd对应的是wildcard address。解决这个问题的方法是使用setsockopt()设置socket描述符的选项SO_REUSEADDR为1,表示允许创建端口号相同但IP地址不同的多个socket描述符。
在server代码的socket()和bind()调用之间插入如下代码:
int opt = 1;setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
半关闭:
通信双方中,只有一端关闭通信。 --- FIN_WAIT_2close(cfd);shutdown(int fd, int how); how: SHUT_RD 关读端SHUT_WR 关写端SHUT_RDWR 关读写shutdown在关闭多个文件描述符应用的文件时,采用全关闭方法。close,只关闭一个。
3.多路IO转接思想
上次学习的并发服务器技术,分别包括多进程并发服务器和多线程并发服务器,解决了有多个客户端来连接服务器的问题,但是在效率方面,如果没有客户端来连接,那么服务器一直在while
循环里面阻塞监听(accept
),非常影响效率。那么针对这种情况,我们可以在中间建立一个“秘书”,客户端想连接服务器就先通知秘书,秘书再通知服务器,这样服务器就不用一直阻塞在accept
监听里面啦。
这个“秘书”有三种方式实现:select
、poll
和epoll
。通过这种中间“秘书”方式的构建服务器,就是多路IO转接服务器,也叫多任务IO服务器,该类服务器实现的主旨思想是,不再由应用程序自己监视客户端连接,取而代之由内核替应用程序监视文件。
二.select
多路IO转接服务器
1.函数分析
int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);nfds: 监控的文件描述符集里最大文件描述符加1,因为此参数会告诉内核检测前多少个文件描述符的状态readfds: 监控有读数据到达文件描述符集合,传入传出参数writefds: 监控写数据到达文件描述符集合,传入传出参数exceptfds: 监控异常发生达文件描述符集合,如带外数据到达异常,传入传出参数timeout: 定时阻塞监控时间,3种情况1.NULL,永远等下去2.设置timeval,等待固定时间3.设置timeval里时间均为0,检查描述字后立即返回,轮询struct timeval {long tv_sec; /* seconds */long tv_usec; /* microseconds */};
知识点1:文件描述符
在Unix-like
系统中,文件描述符是一个广泛的概念,它用于表示几乎所有类型的文件、管道、目录以及特殊类型的文件,如socket
套接字。因此,socket
套接字确实是文件描述符的一种,它被用来在进程间或不同计算机之间进行网络通信。总之一句话,文件描述符大于套接字,就相当于套接字是文件描述符的一种。
在Linux操作系统中,我们使用位图的方式描述文件描述符,而0、1、2三个文件描述符,已经被系统调用,所以一般3用来描述服务器的监听套接字,后面4、5、6……才用来与客户端建立连接。那么在进行select
函数传参时,需要注意,传递与客户端通信的最大文件描述符+1,才能保证循环遍历所有的套接字。
知识点2:文件描述集合
readfds
、writefds
、exceptfds
三个文件描述符集合,他们都是传入传出参数。在传入时,readfds
里面表示的是需要监控有数据到达需要读的文件描述符;在传出时,其表示在需要监控有数据到达的文件描述符中,监测发生了数据到达事件的文件描述符。同理,在传入时,writefds
里面表示的是需要监控有写数据达到文件的描述符;在传出时,其表示在需要监控有写数据到达的文件描述符中,监测发生了写数据到达事件的文件描述符。同理,在传入时,exceptfds
里面表示的是需要监控是否有异常事件发生的文件描述符;在传出时,其表示在需要监控有异常事件发生的文件描述符中,检测到发生了异常事件的文件描述符。
知识点3:定时阻塞监控时间
struct timeval {long tv_sec; /* 秒:seconds */long tv_usec; /* 微妙:microseconds */};
在其作为参数传递给select
函数时,其有三种不同的含义。
知识点4:文件描述符集合操作函数
void FD_CLR(int fd, fd_set *set); //把文件描述符集合里fd清0int FD_ISSET(int fd, fd_set *set); //测试文件描述符集合里fd是否置1void FD_SET(int fd, fd_set *set); //把文件描述符集合里fd位置1void FD_ZERO(fd_set *set); //把文件描述符集合里所有位清0
知识点5:返回值
> 0
:所有监听集合(3个)中,满足对应事件的总个数0
:没有满足事件的文件描述符-1
:报错errno
2.思路分析
案例练习:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <ctype.h>#define ser_port 9527int main()
{int maxfd = 0;int lfd,cfd;int ret,nread,nwrite;char buf[BUFSIZ];lfd = socket(AF_INET,SOCK_STREAM,0);if(lfd == -1){perror("socket");return -1;}maxfd = lfd;struct sockaddr_in ser_addr,clt_addr;socklen_t clt_addr_len = sizeof(clt_addr);ser_addr.sin_family = AF_INET;ser_addr.sin_port = htons(ser_port);ser_addr.sin_addr.s_addr = htonl(INADDR_ANY);bind(lfd,(struct sockaddr *)&ser_addr,sizeof(ser_addr));listen(lfd,128);fd_set rset,allset;FD_ZERO(&allset);FD_SET(lfd,&allset);while(1){rset = allset; //每次都从新设置的监控集开始ret = select(maxfd+1,&rset,NULL,NULL,NULL);if(ret > 0){if(FD_ISSET(lfd,&rset)) //说明有新的客户端请求建立连接{cfd = accept(lfd,(struct sockaddr *)&clt_addr,&clt_addr_len);FD_SET(cfd,&allset);if(maxfd < cfd)maxfd = cfd;if(--ret == 0) //select只有一个返回,说明是lfd,不需要执行后续的操作continue;}}for(int i = lfd+1; i <= maxfd; i++){if(FD_ISSET(i,&rset)){nread = read(i,buf,sizeof(buf));if(nread == 0) //检测到客户端关闭{close(i);FD_CLR(i,&allset);}else if(nread > 0){for(int j = 0;j<nread;j++){buf[j] = toupper(buf[j]);}write(i,buf,nread);write(STDIN_FILENO,buf,nread);}}}}close(lfd);return 0;
}
3.优缺点
-
缺点:
- 监听上限受文件描述符限制, 最大
1024
- 检测满足条件的fd, 自己添加业务逻辑提高小。 提高了编码难度。
- 监听上限受文件描述符限制, 最大
-
优点:跨平台,win、linux、macOS、Unix、类Unix、mips
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <ctype.h>#include "wrap.h"#define SERV_PORT 6666int main(int argc, char *argv[])
{int i, j, n, maxi;int nready, client[FD_SETSIZE]; /* 自定义数组client, 防止遍历1024个文件描述符 FD_SETSIZE默认为1024 */int maxfd, listenfd, connfd, sockfd;char buf[BUFSIZ], str[INET_ADDRSTRLEN]; /* #define INET_ADDRSTRLEN 16 */struct sockaddr_in clie_addr, serv_addr;socklen_t clie_addr_len;fd_set rset, allset; /* rset 读事件文件描述符集合 allset用来暂存 */listenfd = Socket(AF_INET, SOCK_STREAM, 0);int opt = 1;setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));bzero(&serv_addr, sizeof(serv_addr));serv_addr.sin_family= AF_INET;serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);serv_addr.sin_port= htons(SERV_PORT);Bind(listenfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));Listen(listenfd, 128);maxfd = listenfd; /* 起初 listenfd 即为最大文件描述符 */maxi = -1; /* 将来用作client[]的下标, 初始值指向0个元素之前下标位置 */for (i = 0; i < FD_SETSIZE; i++)client[i] = -1; /* 用-1初始化client[] */FD_ZERO(&allset);FD_SET(listenfd, &allset); /* 构造select监控文件描述符集 */while (1) { rset = allset; /* 每次循环时都从新设置select监控信号集 */nready = select(maxfd+1, &rset, NULL, NULL, NULL); //2 1--lfd 1--connfdif (nready < 0)perr_exit("select error");if (FD_ISSET(listenfd, &rset)) { /* 说明有新的客户端链接请求 */clie_addr_len = sizeof(clie_addr);connfd = Accept(listenfd, (struct sockaddr *)&clie_addr, &clie_addr_len); /* Accept 不会阻塞 */printf("received from %s at PORT %d\n",inet_ntop(AF_INET, &clie_addr.sin_addr, str, sizeof(str)),ntohs(clie_addr.sin_port));for (i = 0; i < FD_SETSIZE; i++)if (client[i] < 0) { /* 找client[]中没有使用的位置 */client[i] = connfd; /* 保存accept返回的文件描述符到client[]里 */break;}if (i == FD_SETSIZE) { /* 达到select能监控的文件个数上限 1024 */fputs("too many clients\n", stderr);exit(1);}FD_SET(connfd, &allset); /* 向监控文件描述符集合allset添加新的文件描述符connfd */if (connfd > maxfd)maxfd = connfd; /* select第一个参数需要 */if (i > maxi)maxi = i; /* 保证maxi存的总是client[]最后一个元素下标 */if (--nready == 0)continue;} for (i = 0; i <= maxi; i++) { /* 检测哪个clients 有数据就绪 */if ((sockfd = client[i]) < 0)continue;if (FD_ISSET(sockfd, &rset)) {if ((n = Read(sockfd, buf, sizeof(buf))) == 0) { /* 当client关闭链接时,服务器端也关闭对应链接 */Close(sockfd);FD_CLR(sockfd, &allset); /* 解除select对此文件描述符的监控 */client[i] = -1;} else if (n > 0) {for (j = 0; j < n; j++)buf[j] = toupper(buf[j]);Write(sockfd, buf, n);Write(STDOUT_FILENO, buf, n);}if (--nready == 0)break; /* 跳出for, 但还在while中 */}}}Close(listenfd);return 0;
}
三.poll
多路IO转接服务器
1.函数分析
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);struct pollfd {int fd; /* 文件描述符 */short events; /* 监控的事件 */short revents; /* 监控事件中满足条件返回的事件 */};POLLIN 普通或带外优先数据可读,即POLLRDNORM | POLLRDBANDPOLLOUT 普通或带外数据可写POLLERR 发生错误nfds 监控数组中有多少文件描述符需要被监控timeout 毫秒级等待-1:阻塞等,#define INFTIM -1 Linux中没有定义此宏0:立即返回,不阻塞进程>0:等待指定毫秒数,如当前系统时间精度不够毫秒,向上取值
2.思路分析
3.read函数返回值补充
read 函数返回值:
>0
:实际读到的字节数=0
: socket中,表示对端关闭。close()-1
:- 如果
errno == EINTR
被异常终端。 需要重启。 - 如果
errno == EAGIN
或EWOULDBLOCK
以非阻塞方式读数据,但是没有数据。 需要,再次读。 - 如果
errno == ECONNRESET
说明连接被 重置。 需要close()
,移除监听队列。 - 其他错误。
- 如果
4.优缺点
优点:
- 自带数组结构,可以将 监听事件集合 和 返回事件集合 分离。
- 拓展 监听上限,超出 1024限制。
缺点:
- 不能跨平台。
Linux
- 无法直接定位满足监听事件的文件描述符, 编码难度较大。
5.突破1024文件描述符限制
可以使用cat命令查看一个进程可以打开的socket描述符上限:
cat /proc/sys/fs/file-max
方式一:如有需要,可以通过修改配置文件的方式修改该上限值。
sudo vi /etc/security/limits.conf
在文件尾部写入以下配置,soft
软限制,hard
硬限制,可以在这里直接修改。如下图所示。
* soft nofile 65536 --> 设置默认值, 可以直接借助命令修改。 【注销用户,使其生效】* hard nofile 100000 --> 命令修改上限。
方式二:可以通过命令的方式修改
ulimit -n num
这里的num
不能超过硬件限制
四.epoll实现多路转接服务器
1.epoll相关函数
-
创建一个epoll句柄,参数size用来告诉内核监听的文件描述符的个数,跟内存大小有关。
int epoll_create(int size); 创建一棵监听红黑树size:创建的红黑树的监听节点数量。(仅供内核参考。)返回值:指向新创建的红黑树的根节点的 fd。 失败: -1 errno
-
控制某个epoll监控的文件描述符上的事件:注册、修改、删除。
#include <sys/epoll.h>int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)epfd: 为epoll_creat的句柄op: 表示动作,用3个宏来表示:EPOLL_CTL_ADD (注册新的fd到epfd),EPOLL_CTL_MOD (修改已经注册的fd的监听事件),EPOLL_CTL_DEL (从epfd删除一个fd);event: 告诉内核需要监听的事件struct epoll_event {__uint32_t events; /* Epoll events */epoll_data_t data; /* User data variable */};typedef union epoll_data {void *ptr;int fd;uint32_t u32;uint64_t u64;} epoll_data_t;EPOLLIN : 表示对应的文件描述符可以读(包括对端SOCKET正常关闭)EPOLLOUT: 表示对应的文件描述符可以写EPOLLPRI: 表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来)EPOLLERR: 表示对应的文件描述符发生错误EPOLLHUP: 表示对应的文件描述符被挂断;EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)而言的EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
-
等待所监控文件描述符上有事件的产生,类似于
select()
调用。int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout); 阻塞监听。epfd:epoll_create 函数的返回值。 epfdevents:传出参数,【数组】, 满足监听条件的 哪些 fd 结构体。maxevents:数组 元素的总个数。 1024struct epoll_event evnets[1024]timeout:-1: 阻塞0: 不阻塞>0: 超时时间 (毫秒)返回值:> 0: 满足监听的 总个数。 可以用作循环上限。0: 没有fd满足监听事件-1:失败。 errno
2.思路分析
基础版:
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <errno.h>
#include <ctype.h>#include "wrap.h"#define MAXLINE 8192
#define SERV_PORT 8000#define OPEN_MAX 5000int main(int argc, char *argv[])
{int i, listenfd, connfd, sockfd;int n, num = 0;ssize_t nready, efd, res;char buf[MAXLINE], str[INET_ADDRSTRLEN];socklen_t clilen;struct sockaddr_in cliaddr, servaddr;listenfd = Socket(AF_INET, SOCK_STREAM, 0);int opt = 1;setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); //端口复用bzero(&servaddr, sizeof(servaddr));servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = htonl(INADDR_ANY);servaddr.sin_port = htons(SERV_PORT);Bind(listenfd, (struct sockaddr *) &servaddr, sizeof(servaddr));Listen(listenfd, 20);efd = epoll_create(OPEN_MAX); //创建epoll模型, efd指向红黑树根节点if (efd == -1)perr_exit("epoll_create error");struct epoll_event tep, ep[OPEN_MAX]; //tep: epoll_ctl参数 ep[] : epoll_wait参数tep.events = EPOLLIN; tep.data.fd = listenfd; //指定lfd的监听时间为"读"res = epoll_ctl(efd, EPOLL_CTL_ADD, listenfd, &tep); //将lfd及对应的结构体设置到树上,efd可找到该树if (res == -1)perr_exit("epoll_ctl error");for ( ; ; ) {/*epoll为server阻塞监听事件, ep为struct epoll_event类型数组, OPEN_MAX为数组容量, -1表永久阻塞*/nready = epoll_wait(efd, ep, OPEN_MAX, -1); if (nready == -1)perr_exit("epoll_wait error");for (i = 0; i < nready; i++) {if (!(ep[i].events & EPOLLIN)) //如果不是"读"事件, 继续循环continue;if (ep[i].data.fd == listenfd) { //判断满足事件的fd是不是lfd clilen = sizeof(cliaddr);connfd = Accept(listenfd, (struct sockaddr *)&cliaddr, &clilen); //接受链接printf("received from %s at PORT %d\n", inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)), ntohs(cliaddr.sin_port));printf("cfd %d---client %d\n", connfd, ++num);tep.events = EPOLLIN; tep.data.fd = connfd;res = epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &tep); //加入红黑树if (res == -1)perr_exit("epoll_ctl error");} else { //不是lfd, sockfd = ep[i].data.fd;n = Read(sockfd, buf, MAXLINE);if (n == 0) { //读到0,说明客户端关闭链接res = epoll_ctl(efd, EPOLL_CTL_DEL, sockfd, NULL); //将该文件描述符从红黑树摘除if (res == -1)perr_exit("epoll_ctl error");Close(sockfd); //关闭与该客户端的链接printf("client[%d] closed connection\n", sockfd);} else if (n < 0) { //出错perror("read n < 0 error: ");res = epoll_ctl(efd, EPOLL_CTL_DEL, sockfd, NULL); //摘除节点Close(sockfd);} else { //实际读到了字节数for (i = 0; i < n; i++)buf[i] = toupper(buf[i]); //转大写,写回给客户端Write(STDOUT_FILENO, buf, n);Writen(sockfd, buf, n);}}}}Close(listenfd);Close(efd);return 0;
}
3.ET模式和LT模式
EPOLL
事件有两种模型:
-
Edge Triggered (ET)
边缘触发只有数据到来才触发,不管缓存区中是否还有数据。 -
Level Triggered (LT)
水平触发只要有数据都会触发。
案例:
#include <stdio.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <errno.h>
#include <unistd.h>#define MAXLINE 10int main(int argc, char *argv[])
{int efd, i;int pfd[2];pid_t pid;char buf[MAXLINE], ch = 'a';pipe(pfd);pid = fork();if (pid == 0) {close(pfd[0]);while (1) {//aaaa\nfor (i = 0; i < MAXLINE/2; i++)buf[i] = ch;buf[i-1] = '\n';ch++;//bbbb\nfor (; i < MAXLINE; i++)buf[i] = ch;buf[i-1] = '\n';ch++;write(pfd[1], buf, sizeof(buf));sleep(2);}close(pfd[1]);} else if (pid > 0) bian{struct epoll_event event;struct epoll_event resevent[10];int res, len;close(pfd[1]);efd = epoll_create(10);/* event.events = EPOLLIN; */event.events = EPOLLIN | EPOLLET; /* ET 边沿触发 ,默认是水平触发 */event.data.fd = pfd[0];epoll_ctl(efd, EPOLL_CTL_ADD, pfd[0], &event);while (1) {res = epoll_wait(efd, resevent, 10, -1);printf("res %d\n", res);if (resevent[0].data.fd == pfd[0]) {len = read(pfd[0], buf, MAXLINE/2); /*这里因为只读取了一半,即aaaa\n,剩下的一半bbbb\n在水平触发时依旧可以wait返回继续读取,边沿触发时则需要等待下次子进程写事件才会读取*/write(STDOUT_FILENO, buf, len);}}close(pfd[0]);close(efd);} else {perror("fork");exit(-1);}return 0;
}
如果是想在网络socket模型中使用ET或LT模式,可以在如下代码中修改:
struct epoll_event event;
event.events = EPOLLIN | EPOLLET; /* ET 边沿触发 ,默认是水平触发 */
event.data.fd = connfd;
epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &event);
即修改epoll_ctl函数的第四个参数即可。
在ET模式下,epoll
只能使用非阻塞的模式:
#include <stdio.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>#define MAXLINE 10
#define SERV_PORT 8000int main(void)
{struct sockaddr_in servaddr, cliaddr;socklen_t cliaddr_len;int listenfd, connfd;char buf[MAXLINE];char str[INET_ADDRSTRLEN];int efd, flag;listenfd = socket(AF_INET, SOCK_STREAM, 0);bzero(&servaddr, sizeof(servaddr));servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = htonl(INADDR_ANY);servaddr.sin_port = htons(SERV_PORT);bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));listen(listenfd, 20);///struct epoll_event event;struct epoll_event res_event[10];int res, len;efd = epoll_create(10);event.events = EPOLLIN | EPOLLET; /* ET 边沿触发,默认是水平触发 *///event.events = EPOLLIN;printf("Accepting connections ...\n");cliaddr_len = sizeof(cliaddr);connfd = accept(listenfd, (struct sockaddr *)&cliaddr, &cliaddr_len);printf("received from %s at PORT %d\n",inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)),ntohs(cliaddr.sin_port));flag = fcntl(connfd, F_GETFL); /* 修改connfd为非阻塞读 */flag |= O_NONBLOCK;fcntl(connfd, F_SETFL, flag);event.data.fd = connfd;epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &event); //将connfd加入监听红黑树while (1) {printf("epoll_wait begin\n");res = epoll_wait(efd, res_event, 10, -1); //最多10个, 阻塞监听printf("epoll_wait end res %d\n", res);if (res_event[0].data.fd == connfd) {while ((len = read(connfd, buf, MAXLINE/2)) >0 ) //非阻塞读, 轮询write(STDOUT_FILENO, buf, len);}}return 0;
}
重点,使用fcntl
函数设置为非阻塞读模式。
4.优缺点
- 优点:高效,突破1024文件描述符。
- 缺点:不能跨平台,Linux。
5.epoll反应堆模型
epoll
反应堆是libevent
库的核心实现思想,libevent
是写多并发服务器必须了解的库,其可以跨平台支持。之所以叫反应堆,是形容他比较快,之所以快,是因为大量的函数回调。
epoll 反应堆模型:
epoll ET模式 + 非阻塞、轮询 + void *ptr。原来: socket、bind、listen -- epoll_create 创建监听 红黑树 -- 返回 epfd -- epoll_ctl() 向树上添加一个监听fd -- while(1)---- epoll_wait 监听 -- 对应监听fd有事件产生 -- 返回 监听满足数组。 -- 判断返回数组元素 -- lfd满足 -- Accept -- cfd 满足 -- read() --- 小->大 -- write回去。反应堆:不但要监听 cfd 的读事件、还要监听cfd的写事件。socket、bind、listen -- epoll_create 创建监听 红黑树 -- 返回 epfd -- epoll_ctl() 向树上添加一个监听fd -- while(1)---- epoll_wait 监听 -- 对应监听fd有事件产生 -- 返回 监听满足数组。 -- 判断返回数组元素 -- lfd满足 -- Accept -- cfd 满足 -- read() --- 小->大 -- cfd从监听红黑树上摘下 -- EPOLLOUT -- 回调函数 -- epoll_ctl() -- EPOLL_CTL_ADD 重新放到红黑上监听写事件-- 等待 epoll_wait 返回 -- 说明 cfd 可写 -- write回去 -- cfd从监听红黑树上摘下 -- EPOLLIN -- epoll_ctl() -- EPOLL_CTL_ADD 重新放到红黑上监听读事件 -- epoll_wait 监听
反应堆代码:
/**epoll基于非阻塞I/O事件驱动*/
#include <stdio.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <time.h>#define MAX_EVENTS 1024 //监听上限数
#define BUFLEN 4096
#define SERV_PORT 8080void recvdata(int fd, int events, void *arg);
void senddata(int fd, int events, void *arg);/* 描述就绪文件描述符相关信息 */struct myevent_s {int fd; //要监听的文件描述符int events; //对应的监听事件void *arg; //泛型参数void (*call_back)(int fd, int events, void *arg); //回调函数int status; //是否在监听:1->在红黑树上(监听), 0->不在(不监听)char buf[BUFLEN];int len;long last_active; //记录每次加入红黑树 g_efd 的时间值
};int g_efd; //全局变量, 保存epoll_create返回的文件描述符
struct myevent_s g_events[MAX_EVENTS+1]; //自定义结构体类型数组. +1-->listen fd/*将结构体 myevent_s 成员变量 初始化*/void eventset(struct myevent_s *ev, int fd, void (*call_back)(int, int, void *), void *arg)
{ev->fd = fd;ev->call_back = call_back;ev->events = 0;ev->arg = arg;ev->status = 0;memset(ev->buf, 0, sizeof(ev->buf));ev->len = 0;ev->last_active = time(NULL); //调用eventset函数的时间return;
}/* 向 epoll监听的红黑树 添加一个 文件描述符 *///eventadd(efd, EPOLLIN, &g_events[MAX_EVENTS]);
void eventadd(int efd, int events, struct myevent_s *ev)
{struct epoll_event epv = {0, {0}};int op;epv.data.ptr = ev;epv.events = ev->events = events; //EPOLLIN 或 EPOLLOUTif (ev->status == 0) { //已经在红黑树 g_efd 里op = EPOLL_CTL_ADD; //将其加入红黑树 g_efd, 并将status置1ev->status = 1;}if (epoll_ctl(efd, op, ev->fd, &epv) < 0) //实际添加/修改printf("event add failed [fd=%d], events[%d]\n", ev->fd, events);elseprintf("event add OK [fd=%d], op=%d, events[%0X]\n", ev->fd, op, events);return ;
}/* 从epoll 监听的 红黑树中删除一个 文件描述符*/void eventdel(int efd, struct myevent_s *ev)
{struct epoll_event epv = {0, {0}};if (ev->status != 1) //不在红黑树上return ;//epv.data.ptr = ev;epv.data.ptr = NULL;ev->status = 0; //修改状态epoll_ctl(efd, EPOLL_CTL_DEL, ev->fd, &epv); //从红黑树 efd 上将 ev->fd 摘除return ;
}/* 当有文件描述符就绪, epoll返回, 调用该函数 与客户端建立链接 */void acceptconn(int lfd, int events, void *arg)
{struct sockaddr_in cin;socklen_t len = sizeof(cin);int cfd, i;if ((cfd = accept(lfd, (struct sockaddr *)&cin, &len)) == -1) {if (errno != EAGAIN && errno != EINTR) {/* 暂时不做出错处理 */}printf("%s: accept, %s\n", __func__, strerror(errno));return ;}do {for (i = 0; i < MAX_EVENTS; i++) //从全局数组g_events中找一个空闲元素if (g_events[i].status == 0) //类似于select中找值为-1的元素break; //跳出 forif (i == MAX_EVENTS) {printf("%s: max connect limit[%d]\n", __func__, MAX_EVENTS);break; //跳出do while(0) 不执行后续代码}int flag = 0;if ((flag = fcntl(cfd, F_SETFL, O_NONBLOCK)) < 0) { //将cfd也设置为非阻塞printf("%s: fcntl nonblocking failed, %s\n", __func__, strerror(errno));break;}/* 给cfd设置一个 myevent_s 结构体, 回调函数 设置为 recvdata */eventset(&g_events[i], cfd, recvdata, &g_events[i]); eventadd(g_efd, EPOLLIN, &g_events[i]); //将cfd添加到红黑树g_efd中,监听读事件} while(0);printf("new connect [%s:%d][time:%ld], pos[%d]\n", inet_ntoa(cin.sin_addr), ntohs(cin.sin_port), g_events[i].last_active, i);return ;
}void recvdata(int fd, int events, void *arg)
{struct myevent_s *ev = (struct myevent_s *)arg;int len;len = recv(fd, ev->buf, sizeof(ev->buf), 0); //读文件描述符, 数据存入myevent_s成员buf中eventdel(g_efd, ev); //将该节点从红黑树上摘除if (len > 0) {ev->len = len;ev->buf[len] = '\0'; //手动添加字符串结束标记printf("C[%d]:%s\n", fd, ev->buf);eventset(ev, fd, senddata, ev); //设置该 fd 对应的回调函数为 senddataeventadd(g_efd, EPOLLOUT, ev); //将fd加入红黑树g_efd中,监听其写事件} else if (len == 0) {close(ev->fd);/* ev-g_events 地址相减得到偏移元素位置 */printf("[fd=%d] pos[%ld], closed\n", fd, ev-g_events);} else {close(ev->fd);printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));}return;
}void senddata(int fd, int events, void *arg)
{struct myevent_s *ev = (struct myevent_s *)arg;int len;len = send(fd, ev->buf, ev->len, 0); //直接将数据 回写给客户端。未作处理eventdel(g_efd, ev); //从红黑树g_efd中移除if (len > 0) {printf("send[fd=%d], [%d]%s\n", fd, len, ev->buf);eventset(ev, fd, recvdata, ev); //将该fd的 回调函数改为 recvdataeventadd(g_efd, EPOLLIN, ev); //从新添加到红黑树上, 设为监听读事件} else {close(ev->fd); //关闭链接printf("send[fd=%d] error %s\n", fd, strerror(errno));}return ;
}/*创建 socket, 初始化lfd */void initlistensocket(int efd, short port)
{struct sockaddr_in sin;int lfd = socket(AF_INET, SOCK_STREAM, 0);fcntl(lfd, F_SETFL, O_NONBLOCK); //将socket设为非阻塞memset(&sin, 0, sizeof(sin)); //bzero(&sin, sizeof(sin))sin.sin_family = AF_INET;sin.sin_addr.s_addr = INADDR_ANY;sin.sin_port = htons(port);bind(lfd, (struct sockaddr *)&sin, sizeof(sin));listen(lfd, 20);/* void eventset(struct myevent_s *ev, int fd, void (*call_back)(int, int, void *), void *arg); */eventset(&g_events[MAX_EVENTS], lfd, acceptconn, &g_events[MAX_EVENTS]);/* void eventadd(int efd, int events, struct myevent_s *ev) */eventadd(efd, EPOLLIN, &g_events[MAX_EVENTS]);return ;
}int main(int argc, char *argv[])
{unsigned short port = SERV_PORT;if (argc == 2)port = atoi(argv[1]); //使用用户指定端口.如未指定,用默认端口g_efd = epoll_create(MAX_EVENTS+1); //创建红黑树,返回给全局 g_efd if (g_efd <= 0)printf("create efd in %s err %s\n", __func__, strerror(errno));initlistensocket(g_efd, port); //初始化监听socketstruct epoll_event events[MAX_EVENTS+1]; //保存已经满足就绪事件的文件描述符数组 printf("server running:port[%d]\n", port);int checkpos = 0, i;while (1) {/* 超时验证,每次测试100个链接,不测试listenfd 当客户端60秒内没有和服务器通信,则关闭此客户端链接 */long now = time(NULL); //当前时间for (i = 0; i < 100; i++, checkpos++) { //一次循环检测100个。 使用checkpos控制检测对象if (checkpos == MAX_EVENTS)checkpos = 0;if (g_events[checkpos].status != 1) //不在红黑树 g_efd 上continue;long duration = now - g_events[checkpos].last_active; //客户端不活跃的世间if (duration >= 60) {close(g_events[checkpos].fd); //关闭与该客户端链接printf("[fd=%d] timeout\n", g_events[checkpos].fd);eventdel(g_efd, &g_events[checkpos]); //将该客户端 从红黑树 g_efd移除}}/*监听红黑树g_efd, 将满足的事件的文件描述符加至events数组中, 1秒没有事件满足, 返回 0*/int nfd = epoll_wait(g_efd, events, MAX_EVENTS+1, 1000);if (nfd < 0) {printf("epoll_wait error, exit\n");break;}for (i = 0; i < nfd; i++) {/*使用自定义结构体myevent_s类型指针, 接收 联合体data的void *ptr成员*/struct myevent_s *ev = (struct myevent_s *)events[i].data.ptr; if ((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)) { //读就绪事件ev->call_back(ev->fd, events[i].events, ev->arg);//lfd EPOLLIN }if ((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)) { //写就绪事件ev->call_back(ev->fd, events[i].events, ev->arg);}}}/* 退出前释放所有资源 */return 0;
}
6.保活机制
客户端和服务器通信,服务器怎么知道客户端掉线?
- 心跳包
- 乒乓包
- 设置TCP属性(探测分节)
另外推荐一个查看代码的工具:
五.线程池
1.线程池思想
struct threadpool_t {pthread_mutex_t lock; /* 用于锁住本结构体 */ pthread_mutex_t thread_counter; /* 记录忙状态线程个数de琐 -- busy_thr_num */pthread_cond_t queue_not_full; /* 当任务队列满时,添加任务的线程阻塞,等待此条件变量 */pthread_cond_t queue_not_empty; /* 任务队列里不为空时,通知等待任务的线程 */pthread_t *threads; /* 存放线程池中每个线程的tid。数组 */pthread_t adjust_tid; /* 存管理线程tid */threadpool_task_t *task_queue; /* 任务队列(数组首地址) */int min_thr_num; /* 线程池最小线程数 */int max_thr_num; /* 线程池最大线程数 */int live_thr_num; /* 当前存活线程个数 */int busy_thr_num; /* 忙状态线程个数 */int wait_exit_thr_num; /* 要销毁的线程个数 */int queue_front; /* task_queue队头下标 */int queue_rear; /* task_queue队尾下标 */int queue_size; /* task_queue队中实际任务数 */int queue_max_size; /* task_queue队列可容纳任务数上限 */int shutdown; /* 标志位,线程池使用状态,true或false */
};typedef struct {void *(*function)(void *); /* 函数指针,回调函数 */void *arg; /* 上面函数的参数 */} threadpool_task_t; /* 各子线程任务结构体 */
2.搭建线程池步骤
线程池模块分析:
1. main(); 创建线程池。向线程池中添加任务。 借助回调处理任务。销毁线程池。2. pthreadpool_create();创建线程池结构体 指针。初始化线程池结构体 { N 个成员变量 }创建 N 个任务线程。创建 1 个管理者线程。失败时,销毁开辟的所有空间。(释放)3. threadpool_thread()进入子线程回调函数。接收参数 void *arg --》 pool 结构体加锁 --》lock --》 整个结构体锁判断条件变量 --》 wait -------------------1704. adjust_thread()循环 10 s 执行一次。进入管理者线程回调函数接收参数 void *arg --》 pool 结构体加锁 --》lock --》 整个结构体锁获取管理线程池要用的到 变量。 task_num, live_num, busy_num根据既定算法,使用上述3变量,判断是否应该 创建、销毁线程池中 指定步长的线程。5. threadpool_add ()总功能:模拟产生任务。 num[20]设置回调函数, 处理任务。 sleep(1) 代表处理完成。内部实现:加锁初始化 任务队列结构体成员。 回调函数 function, arg利用环形队列机制,实现添加任务。 借助队尾指针挪移 % 实现。唤醒阻塞在 条件变量上的线程。解锁6. 从 3. 中的wait之后继续执行,处理任务。加锁获取 任务处理回调函数,及参数利用环形队列机制,实现处理任务。 借助队头指针挪移 % 实现。唤醒阻塞在 条件变量 上的 server。解锁加锁 改忙线程数++解锁执行处理任务的线程加锁 改忙线程数——解锁7. 创建 销毁线程管理者线程根据 task_num, live_num, busy_num 根据既定算法,使用上述3变量,判断是否应该 创建、销毁线程池中 指定步长的线程。如果满足 创建条件pthread_create(); 回调 任务线程函数。 live_num++如果满足 销毁条件wait_exit_thr_num = 10; signal 给 阻塞在条件变量上的线程 发送 假条件满足信号 跳转至 --170 wait阻塞线程会被 假信号 唤醒。判断: wait_exit_thr_num > 0 pthread_exit();
3.代码
threadpool.c
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <signal.h>
#include <errno.h>
#include "threadpool.h"#define DEFAULT_TIME 10 /*10s检测一次*/
#define MIN_WAIT_TASK_NUM 10 /*如果queue_size > MIN_WAIT_TASK_NUM 添加新的线程到线程池*/
#define DEFAULT_THREAD_VARY 10 /*每次创建和销毁线程的个数*/
#define true 1
#define false 0typedef struct {void *(*function)(void *); /* 函数指针,回调函数 */void *arg; /* 上面函数的参数 */
} threadpool_task_t; /* 各子线程任务结构体 *//* 描述线程池相关信息 */struct threadpool_t {pthread_mutex_t lock; /* 用于锁住本结构体 */ pthread_mutex_t thread_counter; /* 记录忙状态线程个数de琐 -- busy_thr_num */pthread_cond_t queue_not_full; /* 当任务队列满时,添加任务的线程阻塞,等待此条件变量 */pthread_cond_t queue_not_empty; /* 任务队列里不为空时,通知等待任务的线程 */pthread_t *threads; /* 存放线程池中每个线程的tid。数组 */pthread_t adjust_tid; /* 存管理线程tid */threadpool_task_t *task_queue; /* 任务队列(数组首地址) */int min_thr_num; /* 线程池最小线程数 */int max_thr_num; /* 线程池最大线程数 */int live_thr_num; /* 当前存活线程个数 */int busy_thr_num; /* 忙状态线程个数 */int wait_exit_thr_num; /* 要销毁的线程个数 */int queue_front; /* task_queue队头下标 */int queue_rear; /* task_queue队尾下标 */int queue_size; /* task_queue队中实际任务数 */int queue_max_size; /* task_queue队列可容纳任务数上限 */int shutdown; /* 标志位,线程池使用状态,true或false */
};void *threadpool_thread(void *threadpool);void *adjust_thread(void *threadpool);int is_thread_alive(pthread_t tid);
int threadpool_free(threadpool_t *pool);//threadpool_create(3,100,100);
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
{int i;threadpool_t *pool = NULL; /* 线程池 结构体 */do {if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) { printf("malloc threadpool fail");break; /*跳出do while*/}pool->min_thr_num = min_thr_num;pool->max_thr_num = max_thr_num;pool->busy_thr_num = 0;pool->live_thr_num = min_thr_num; /* 活着的线程数 初值=最小线程数 */pool->wait_exit_thr_num = 0;pool->queue_size = 0; /* 有0个产品 */pool->queue_max_size = queue_max_size; /* 最大任务队列数 */pool->queue_front = 0;pool->queue_rear = 0;pool->shutdown = false; /* 不关闭线程池 *//* 根据最大线程上限数, 给工作线程数组开辟空间, 并清零 */pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num); if (pool->threads == NULL) {printf("malloc threads fail");break;}memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);/* 给 任务队列 开辟空间 */pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);if (pool->task_queue == NULL) {printf("malloc task_queue fail");break;}/* 初始化互斥琐、条件变量 */if (pthread_mutex_init(&(pool->lock), NULL) != 0|| pthread_mutex_init(&(pool->thread_counter), NULL) != 0|| pthread_cond_init(&(pool->queue_not_empty), NULL) != 0|| pthread_cond_init(&(pool->queue_not_full), NULL) != 0){printf("init the lock or cond fail");break;}/* 启动 min_thr_num 个 work thread */for (i = 0; i < min_thr_num; i++) {pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool); /*pool指向当前线程池*/printf("start thread 0x%x...\n", (unsigned int)pool->threads[i]);}pthread_create(&(pool->adjust_tid), NULL, adjust_thread, (void *)pool); /* 创建管理者线程 */return pool;} while (0);threadpool_free(pool); /* 前面代码调用失败时,释放poll存储空间 */return NULL;
}/* 向线程池中 添加一个任务 */
//threadpool_add(thp, process, (void*)&num[i]); /* 向线程池中添加任务 process: 小写---->大写*/int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg)
{pthread_mutex_lock(&(pool->lock));/* ==为真,队列已经满, 调wait阻塞 */while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)) {pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));}if (pool->shutdown) {pthread_cond_broadcast(&(pool->queue_not_empty));pthread_mutex_unlock(&(pool->lock));return 0;}/* 清空 工作线程 调用的回调函数 的参数arg */if (pool->task_queue[pool->queue_rear].arg != NULL) {pool->task_queue[pool->queue_rear].arg = NULL;}/*添加任务到任务队列里*/pool->task_queue[pool->queue_rear].function = function;pool->task_queue[pool->queue_rear].arg = arg;pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size; /* 队尾指针移动, 模拟环形 */pool->queue_size++;/*添加完任务后,队列不为空,唤醒线程池中 等待处理任务的线程*/pthread_cond_signal(&(pool->queue_not_empty));pthread_mutex_unlock(&(pool->lock));return 0;
}/* 线程池中各个工作线程 */
void *threadpool_thread(void *threadpool)
{threadpool_t *pool = (threadpool_t *)threadpool;threadpool_task_t task;while (true) {/* Lock must be taken to wait on conditional variable *//*刚创建出线程,等待任务队列里有任务,否则阻塞等待任务队列里有任务后再唤醒接收任务*/pthread_mutex_lock(&(pool->lock));/*queue_size == 0 说明没有任务,调 wait 阻塞在条件变量上, 若有任务,跳过该while*/while ((pool->queue_size == 0) && (!pool->shutdown)) { printf("thread 0x%x is waiting\n", (unsigned int)pthread_self());pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));/*清除指定数目的空闲线程,如果要结束的线程个数大于0,结束线程*/if (pool->wait_exit_thr_num > 0) {pool->wait_exit_thr_num--;/*如果线程池里线程个数大于最小值时可以结束当前线程*/if (pool->live_thr_num > pool->min_thr_num) {printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());pool->live_thr_num--;pthread_mutex_unlock(&(pool->lock));pthread_exit(NULL);}}}/*如果指定了true,要关闭线程池里的每个线程,自行退出处理---销毁线程池*/if (pool->shutdown) {pthread_mutex_unlock(&(pool->lock));printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());pthread_detach(pthread_self());pthread_exit(NULL); /* 线程自行结束 */}/*从任务队列里获取任务, 是一个出队操作*/task.function = pool->task_queue[pool->queue_front].function;task.arg = pool->task_queue[pool->queue_front].arg;pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size; /* 出队,模拟环形队列 */pool->queue_size--;/*通知可以有新的任务添加进来*/pthread_cond_broadcast(&(pool->queue_not_full));/*任务取出后,立即将 线程池琐 释放*/pthread_mutex_unlock(&(pool->lock));/*执行任务*/ printf("thread 0x%x start working\n", (unsigned int)pthread_self());pthread_mutex_lock(&(pool->thread_counter)); /*忙状态线程数变量琐*/pool->busy_thr_num++; /*忙状态线程数+1*/pthread_mutex_unlock(&(pool->thread_counter));(*(task.function))(task.arg); /*执行回调函数任务*///task.function(task.arg); /*执行回调函数任务*//*任务结束处理*/ printf("thread 0x%x end working\n", (unsigned int)pthread_self());pthread_mutex_lock(&(pool->thread_counter));pool->busy_thr_num--; /*处理掉一个任务,忙状态数线程数-1*/pthread_mutex_unlock(&(pool->thread_counter));}pthread_exit(NULL);
}/* 管理线程 */
void *adjust_thread(void *threadpool)
{int i;threadpool_t *pool = (threadpool_t *)threadpool;while (!pool->shutdown) {sleep(DEFAULT_TIME); /*定时 对线程池管理*/pthread_mutex_lock(&(pool->lock));int queue_size = pool->queue_size; /* 关注 任务数 */int live_thr_num = pool->live_thr_num; /* 存活 线程数 */pthread_mutex_unlock(&(pool->lock));pthread_mutex_lock(&(pool->thread_counter));int busy_thr_num = pool->busy_thr_num; /* 忙着的线程数 */pthread_mutex_unlock(&(pool->thread_counter));/* 创建新线程 算法: 任务数大于最小线程池个数, 且存活的线程数少于最大线程个数时 如:30>=10 && 40<100*/if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num) {pthread_mutex_lock(&(pool->lock)); int add = 0;/*一次增加 DEFAULT_THREAD 个线程*/for (i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY&& pool->live_thr_num < pool->max_thr_num; i++) {if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])) {pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);add++;pool->live_thr_num++;}}pthread_mutex_unlock(&(pool->lock));}/* 销毁多余的空闲线程 算法:忙线程X2 小于 存活的线程数 且 存活的线程数 大于 最小线程数时*/if ((busy_thr_num * 2) < live_thr_num && live_thr_num > pool->min_thr_num) {/* 一次销毁DEFAULT_THREAD个线程, 隨機10個即可 */pthread_mutex_lock(&(pool->lock));pool->wait_exit_thr_num = DEFAULT_THREAD_VARY; /* 要销毁的线程数 设置为10 */pthread_mutex_unlock(&(pool->lock));for (i = 0; i < DEFAULT_THREAD_VARY; i++) {/* 通知处在空闲状态的线程, 他们会自行终止*/pthread_cond_signal(&(pool->queue_not_empty));}}}return NULL;
}int threadpool_destroy(threadpool_t *pool)
{int i;if (pool == NULL) {return -1;}pool->shutdown = true;/*先销毁管理线程*/pthread_join(pool->adjust_tid, NULL);for (i = 0; i < pool->live_thr_num; i++) {/*通知所有的空闲线程*/pthread_cond_broadcast(&(pool->queue_not_empty));}for (i = 0; i < pool->live_thr_num; i++) {pthread_join(pool->threads[i], NULL);}threadpool_free(pool);return 0;
}int threadpool_free(threadpool_t *pool)
{if (pool == NULL) {return -1;}if (pool->task_queue) {free(pool->task_queue);}if (pool->threads) {free(pool->threads);pthread_mutex_lock(&(pool->lock));pthread_mutex_destroy(&(pool->lock));pthread_mutex_lock(&(pool->thread_counter));pthread_mutex_destroy(&(pool->thread_counter));pthread_cond_destroy(&(pool->queue_not_empty));pthread_cond_destroy(&(pool->queue_not_full));}free(pool);pool = NULL;return 0;
}int threadpool_all_threadnum(threadpool_t *pool)
{int all_threadnum = -1; // 总线程数pthread_mutex_lock(&(pool->lock));all_threadnum = pool->live_thr_num; // 存活线程数pthread_mutex_unlock(&(pool->lock));return all_threadnum;
}int threadpool_busy_threadnum(threadpool_t *pool)
{int busy_threadnum = -1; // 忙线程数pthread_mutex_lock(&(pool->thread_counter));busy_threadnum = pool->busy_thr_num; pthread_mutex_unlock(&(pool->thread_counter));return busy_threadnum;
}int is_thread_alive(pthread_t tid)
{int kill_rc = pthread_kill(tid, 0); //发0号信号,测试线程是否存活if (kill_rc == ESRCH) {return false;}return true;
}/*测试*/ #if 1/* 线程池中的线程,模拟处理业务 */
void *process(void *arg)
{printf("thread 0x%x working on task %d\n ",(unsigned int)pthread_self(),(int)arg);sleep(1); //模拟 小---大写printf("task %d is end\n",(int)arg);return NULL;
}int main(void)
{/*threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);*/threadpool_t *thp = threadpool_create(3,100,100); /*创建线程池,池里最小3个线程,最大100,队列最大100*/printf("pool inited");//int *num = (int *)malloc(sizeof(int)*20);int num[20], i;for (i = 0; i < 20; i++) {num[i] = i;printf("add task %d\n",i);/*int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg) */threadpool_add(thp, process, (void*)&num[i]); /* 向线程池中添加任务 */}sleep(10); /* 等子线程完成任务 */threadpool_destroy(thp);return 0;
}#endif