Linux知识点 – 高级IO(一)
文章目录
- Linux知识点 -- 高级IO(一)
- 一、5种IO模型
- 1.IO再理解
- 2.阻塞IO
- 3.非阻塞轮询式IO
- 4.信号驱动IO
- 5.IO多路转接
- 6.异步IO
- 7.同步通信vs异步通信
- 8.阻塞vs非阻塞
- 二、非阻塞IO
- 1.设置非阻塞的方法
- 2.非阻塞IO实现
- 三、IO多路转接 -- select
- 1.select接口
- 2.select实现
- 3.select的优缺点
一、5种IO模型
1.IO再理解
通信的本质就是IO;
关于IO的效率问题(以读取为例):
- 当我们read/recv的时候,如果底层缓冲区没有数据,read/recv会进行阻塞;
- 当我们read/recv的时候,如果底层缓冲区有数据,read/recv会进行拷贝;
因此,IO可以理解为等 + 数据拷贝;
低效的IO:单位时间,大部分的IO类接口其实都在等;
高效的IO:单位时间,让IO接口等的比重降低;
2.阻塞IO
IO接口在缓冲区数据准备好之前,会一直阻塞,等待数据的就绪;是最普通且最常见的IO模型;
3.非阻塞轮询式IO
如果内核还未将数据准备好,系统调用依然会直接返回,并且返回EWOULEBLOCK错误码,表示数据还未准备好,该进程不会阻塞等待数据;
非阻塞IO往往需要程序员循环的方式反复尝试读写文件描述符,这个过程称为轮询;这对CPU来说是较大的浪费,一般只有特定场景下才使用;
4.信号驱动IO
进程调用sigaction检查信号的状态,然后立即返回,当内核将数据准备好的时候,使用SIGIO信号通知进程,进程再调用IO接口进行IO操作;
5.IO多路转接
IO多路转接是指IO接口能够同时等待多个文件描述符的就绪状态;
6.异步IO
进程在调用了IO接口后,若无数据准备好,就立即返回,在内核将数据准备好之后,直接拷贝到缓冲区中,通过信号通知该进程,拷贝完毕;
如果一个进程(线程)全程参与了IO(等+拷贝),我们就称之为同步IO;
7.同步通信vs异步通信
- 所谓同步,就是在发出一个调用时,在没有得到结果之前,该调用就不返回,但是一旦调用返回,就得到返回值了;换句话说,就是由调用者主动等待这个调用的结果;
- 异步则是相反,调用在发出之后,这个调用就直接返回了,所以没有返回结果;换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果;而是在调用发出后,被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用;
8.阻塞vs非阻塞
阻塞和非阻塞关注的是程序在等待调用结果(消息, 返回值)时的状态;
- 阻塞调用是指调用结果返回之前,当前线程会被挂起,调用线程只有在得到结果之后才会返回;
- 非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程;
二、非阻塞IO
1.设置非阻塞的方法
在设置IO接口的状态或网络套接字状态的时候,有一个NONBLOCK状态,这就是非阻塞状态;
有两种方式设置套接字为非阻塞:
- 打开套接字的时候,就指定为非阻塞接口;
- 使用统一的接口进行非阻塞设置:fcntl
**fcntl接口可以对文件描述符设置非阻塞模式;
fd为想设置的文件描述符,cmd参数对该fd进行设置的命令;fcntl函数有5种功能:
设置非阻塞状态使用第三种命令F_GETFL或F_SETFL:设置fd的状态标记;
2.非阻塞IO实现
由于标准输入的文件描述符是默认阻塞状态的,因此可以用它来进行实验,代码如下:
阻塞IO
#include <iostream>
#include <cstring>
#include <ctime>
#include <cassert>
#include <cerrno>
#include <fcntl.h>
#include <unistd.h>using namespace std;int main()
{char buffer[1024];while(true){sleep(1);ssize_t s = read(0, buffer, sizeof(buffer) - 1);if(s > 0){buffer[s] = 0;cout << "echo# " << buffer << " errno[---]: " << errno << " errstring: " << strerror(errno) << endl;}}return 0;
}
运行结果:
当我们不从键盘输入数据的时候,进程就会一直阻塞;
非阻塞IO
- 使用fcntl接口,设置文件描述符为非阻塞时,需要先从底层获取该fd的文件读写标志位,再对该标志位加上非阻塞的标志;
- 非阻塞的时候,我们是以出错的形式返回,告知上层数据没有就绪;
我们如何甄别是真的出错了,还是仅仅是数据没有就绪呢?需要通过errno的错误返回值来判别;
如果errno的值时EWOULDBLOCK或EAGAIN,就代表底层数据没就绪;
如果errno的值时EINTR,就代表当前IO可能被中断;
#include <iostream>
#include <cstring>
#include <ctime>
#include <cassert>
#include <cerrno>
#include <fcntl.h>
#include <unistd.h>using namespace std;//将文件描述符设置为非阻塞
bool SetNonBlock(int fd)
{int fl = fcntl(fd, F_GETFL); // 在底层获取当前fd对应的文件读写标志位if(fl < 0){return false;}fcntl(fd, F_SETFL, fl | O_NONBLOCK); // 设置非阻塞return true;
}int main()
{SetNonBlock(0); // 设置标准输入为非阻塞,只要设置一次,后续就都是非阻塞了char buffer[1024];while(true){sleep(1);errno = 0;// 非阻塞的时候,我们是以出错的形式返回,告知上层数据没有就绪:// 我们如何甄别是真的出错了,还是仅仅是数据没有就绪呢?// 数据就绪了的话,我们就正常读取就行ssize_t s = read(0, buffer, sizeof(buffer) - 1);//出错,不仅仅是错误返回值,errno变量也会被设置,表明出错原因if(s > 0){buffer[s] = 0;cout << "echo# " << buffer << " errno[---]: " << errno << " errstring: " << strerror(errno) << endl;}else{// 如果失败的errno值是11,就代表其实没错,只不过是底层数据没就绪if(errno == EWOULDBLOCK || errno == EAGAIN){cout << "当前0号fd的数据没有就绪,请下次再来试一试" << endl;continue;}else if(errno == EINTR){cout << "当前IO可能被中断,请下次再来试一试" << endl;continue;}else{//进行差错处理}}}return 0;
}
运行结果:
可以看到,设置套接字为非阻塞后,当进程检测到缓冲区没有数据就绪时,进程不会阻塞,而是会一直循环执行,并轮询检测缓冲区,直到数据就绪;
三、IO多路转接 – select
1.select接口
系统提供select函数来实现多路复用输入/输出模型:
- select系统调用是用来让我们的程序监视多个文件描述符的状态变化的;
- 程序会停在select这里等待,直到被监视的文件描述符有一个或多个发生了状态改变;
select解决的问题是等的问题,帮助用户一次等待多个文件sock;当某些sock就绪了,select就要通知用户,就绪的sock有哪些,然后用户再调用recv/recvfrom/read接口进行数据读取;
参数:
- nfds:需要select等待的最大文件描述符值 + 1;
后面四个参数全都是输入输出型参数;
-
readfds,writefds,exceptfds这三个参数:
在输入时,用户告诉内核,需要帮忙关心哪些sock的哪一种事件;
在输出时,内核告诉用户,内核所关心的sock中,哪些sock上的哪类时间已经就绪了;
这三个参数都是fd_set类型的,这是一种位图结构,代表文件描述符集,需要使用匹配的方法对fd_set类型进行操作:
-
timeout:
类型是struct timeval结构体,可以用于获取时间:
两个成员分别是单位为秒和微妙的值;
根据timeout参数能选择slect的等待方式:- 阻塞式:设为nullptr
- 非阻塞式:设为{0, 0}
- 一定时间内返回:设置timeout中的时间,比如设为{5, 0},select在5s内进行阻塞等待,时间一到,立马返回;
此时timeout参数也有输出性,等待时间内如果有fd就绪,timeout可以输出距离下一次timeout还剩余多长时间;
-
返回值:若返回值为0,代表timeout返回;若返回值为-1,代表select错误;其他返回值代表select返回成功;
以readfds参数为例,分析一下select过程:
- readfds参数作为输入时,用户告诉内核,在readfds的比特位中,比特位的位置表示文件描述符的值,比特位的内容表示是否关心该fd的可读取状态;
- readfds参数作为输出时,内核告诉用户,用户让内核关心的多个fd有结果了,比特位的位置表示文件描述符的值,比特位的内容表示该fd的读取是否就绪;若已就绪,后续用户可以直接读取该fd指向文件的内容,而不会被阻塞;
2.select实现
Log.hpp
#pragma once#include <iostream>
#include <cstdio>
#include <cstdarg>
#include <ctime>
#include <string>// 日志是有日志级别的
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4const char *gLevelMap[] = {"DEBUG","NORMAL","WARNING","ERROR","FATAL"
};#define LOGFILE "./http.log"// 完整的日志功能,至少: 日志等级 时间 支持用户自定义(日志内容, 文件行,文件名)
void logMessage(int level, const char *format, ...)
{
#ifndef DEBUG_SHOWif(level == DEBUG) return;
#endif// va_list ap;// va_start(ap, format);// while()// int x = va_arg(ap, int);// va_end(ap); //ap=nullptrchar stdBuffer[1024]; //标准部分time_t timestamp = time(nullptr);// struct tm *localtime = localtime(×tamp);snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%ld] ", gLevelMap[level], timestamp);char logBuffer[1024]; //自定义部分va_list args;va_start(args, format);// vprintf(format, args);vsnprintf(logBuffer, sizeof logBuffer, format, args);va_end(args);//FILE *fp = fopen(LOGFILE, "a");printf("%s%s\n", stdBuffer, logBuffer);//fprintf(fp, "%s%s\n", stdBuffer, logBuffer);//fclose(fp);
}
Sock.hpp
#pragma once#include <iostream>
#include <string>
#include <cstring>
#include <cerrno>
#include <cassert>
#include <unistd.h>
#include <memory>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <ctype.h>
#include "Log.hpp"class Sock
{
private:const static int gbacklog = 20;public:Sock() {}static int Socket(){int listensock = socket(AF_INET, SOCK_STREAM, 0);if (listensock < 0){logMessage(FATAL, "create socket error, %d:%s", errno, strerror(errno));exit(2);}logMessage(NORMAL, "create socket success, listensock: %d", listensock);return listensock;}static void Bind(int sock, uint16_t port, std::string ip = "0.0.0.0"){struct sockaddr_in local;memset(&local, 0, sizeof local);local.sin_family = AF_INET;local.sin_port = htons(port);inet_pton(AF_INET, ip.c_str(), &local.sin_addr);if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0){logMessage(FATAL, "bind error, %d:%s", errno, strerror(errno));exit(3);}}static void Listen(int sock){if (listen(sock, gbacklog) < 0){logMessage(FATAL, "listen error, %d:%s", errno, strerror(errno));exit(4);}logMessage(NORMAL, "init server success");}// 一般经验// const std::string &: 输入型参数// std::string *: 输出型参数// std::string &: 输入输出型参数static int Accept(int listensock, std::string *ip, uint16_t *port){struct sockaddr_in src;socklen_t len = sizeof(src);int servicesock = accept(listensock, (struct sockaddr *)&src, &len);if (servicesock < 0){logMessage(ERROR, "accept error, %d:%s", errno, strerror(errno));return -1;}if(port) *port = ntohs(src.sin_port);if(ip) *ip = inet_ntoa(src.sin_addr);return servicesock;}static bool Connect(int sock, const std::string &server_ip, const uint16_t &server_port){struct sockaddr_in server;memset(&server, 0, sizeof(server));server.sin_family = AF_INET;server.sin_port = htons(server_port);server.sin_addr.s_addr = inet_addr(server_ip.c_str());if(connect(sock, (struct sockaddr*)&server, sizeof(server)) == 0) return true;else return false;}~Sock() {}
};
main.cc
#include "selectServer.hpp"
#include<memory>int main()
{// 1. fd_set是一个固定大小位图,直接决定了select能同时关心的fd的个数是有上限的!// std::cout << sizeof(fd_set) * 8 << std::endl;std::unique_ptr<SelectServer> svr(new SelectServer);svr->Start();return 0;
}
selectServer.hpp
这段代码只是完成了用select接口同时等待多个文件描述符就绪,文件描述符就绪后的读取工作还未完成;
#ifndef __SELECT_SVR_H__
#define __SELECT_SVR_H__#include <iostream>
#include <string>
#include <vector>
#include <sys/select.h>
#include <sys/time.h>
#include "Log.hpp"
#include "Sock.hpp"using namespace std;class SelectServer
{
public:SelectServer(const uint16_t &port = 8080): _port(port){_listensock = Sock::Socket();Sock::Bind(_listensock, _port);Sock::Listen(_listensock);logMessage(DEBUG, "%s", "create base socket success");}void Start(){fd_set rfds;FD_ZERO(&rfds);// 将rfds清零while(true){//struct timeval timeout = {0, 0};// 如何看待listensock? 获取新连接,我们把它依旧看做成为IO,input事件,如果没有连接到来呢?阻塞//不能直接调用accept了FD_SET(_listensock, &rfds); // 将listensock添加到读文件描述符集中//int n = select(_listensock + 1, &rfds, nullptr, nullptr, &timeout);int n = select(_listensock + 1, &rfds, nullptr, nullptr, nullptr);switch(n){case 0:logMessage(DEBUG, "%s", "timeout");break;case -1:logMessage(WARNING, "select error: %d : %s", errno, strerror(errno));break;default://select成功logMessage(DEBUG, "%s", "get a new link event");HandlerEvent(rfds);//对就绪的fd进行处理break;}}}void HandlerEvent(const fd_set& rfds){string clientip;uint16_t clientport = 0;if(FD_ISSET(_listensock, &rfds)){//listensock上面的读事件就绪了,表示可以读取了//获取新连接了int sock = Sock::Accept(_listensock, &clientip, &clientport); // 在这里进行accept是不会阻塞的if(sock < 0){logMessage(WARNING, "%s", "accept error");return;}logMessage(DEBUG, "get a new link success : [%s:%d] : %d", clientip.c_str(), clientport, sock);}}private:uint16_t _port;int _listensock;
};
#endif
运行结果:
能够成功获取链接,但是此时还不能对该fd进行读取;
- 因为我们不清楚该sock上面数据什么时候到来,此时只是建立连接成功 ,recv、read就有可能先被阻塞(IO = 等+数据拷贝);
- 得到新连接的时候,此时我们应该考虑的是,将新的sock托管给select,让select帮我们进行检测sock上是否有新的数据;
- 有了数据select,读事件就绪,select就会通知我,我们在进行读取,此时我们就不会被阻塞了;
但是我们在Start中调用了HandlerEvent方法来获取连接,获取成功后如果还需要重新向select中添加新的fd,就很困难,因此需要更新编写代码的模式;
- nfds: 随着我们获取的sock越来越多,随着我们添加到select的sock越来越多,注定了nfds每一次都可能要变化,我们需要对它动态计算;
- rfds/writefds/exceptfds:都是输入输出型参数,输入输出不一定以一样的,所以注定了我们每一次都要对rfds进行重新添加;
- 这就注定了我们必须自己将合法的文件描述符需要单独全部保存起来,用来支持:1. 更新最大fd; 2.更新位图结构;
select的一般代码编写模式:
- 需要有一个第三方的数组,用于保存所有的合法文件描述符;
- 在每一次循环中,都对该数组进行以下操作:
- 遍历数组,更新出max fd;
- 遍历数组,添加所有需关心的fd到fd_set位图中;
- 调用select进行事件检测;
- 遍历数组,找到就绪的事件,完成对应的动作:
对于listensock进行accept;
对于普通sock进行recv;
完整的selectServer.hpp代码
#ifndef __SELECT_SVR_H__
#define __SELECT_SVR_H__#include <iostream>
#include <string>
#include <vector>
#include <sys/select.h>
#include <sys/time.h>
#include "Log.hpp"
#include "Sock.hpp"#define BITS 8
#define NUM (sizeof(fd_set) * BITS) // fd_set能够管理的fd的最大值
#define FD_NONE -1 // 文件描述符初始化状态
using namespace std;class SelectServer
{
public:SelectServer(const uint16_t &port = 8080): _port(port){_listensock = Sock::Socket();Sock::Bind(_listensock, _port);Sock::Listen(_listensock);logMessage(DEBUG, "%s", "create base socket success");// 初始化数组for (int i = 0; i < NUM; i++){_fd_array[i] = FD_NONE;}// 规定:_fd_array[0] = _listensock_fd_array[0] = _listensock;}void Start(){while (true){// struct timeval timeout = {0, 0};// 如何看待listensock? 获取新连接,我们把它依旧看做成为IO,input事件,如果没有连接到来呢?阻塞// 不能直接调用accept了// FD_SET(_listensock, &rfds); // 将listensock添加到读文件描述符集中// int n = select(_listensock + 1, &rfds, nullptr, nullptr, &timeout);// 1. nfds: 随着我们获取的sock越来越多,随着我们添加到select的sock越来越多,注定了nfds每一次都可能要变化,我们需要对它动态计算// 2. rfds/writefds/exceptfds:都是输入输出型参数,输入输出不一定以一样的,所以注定了我们每一次都要对rfds进行重新添加// 3. timeout: 都是输入输出型参数,每一次都要进行重置,前提是你要的话// 1,2 => 注定了我们必须自己将合法的文件描述符需要单独全部保存起来 用来支持:1. 更新最大fd 2.更新位图结构DebugPrint();fd_set rfds;FD_ZERO(&rfds); // 将rfds清零int maxfd = _listensock;// 将_fd_array中的需要关注的fd更新到rfds中for (int i = 0; i < NUM; i++){if (_fd_array[i] == FD_NONE){continue;}FD_SET(_fd_array[i], &rfds);if (maxfd < _fd_array[i]){maxfd = _fd_array[i];}}// rfds未来一定有两类sock:listensock和普通sockint n = select(maxfd + 1, &rfds, nullptr, nullptr, nullptr);switch (n){case 0:logMessage(DEBUG, "%s", "timeout");break;case -1:logMessage(WARNING, "select error: %d : %s", errno, strerror(errno));break;default:// select成功logMessage(DEBUG, "%s", "get a new link event");HandlerEvent(rfds); // 对就绪的fd进行处理break;}}}~SelectServer(){if (_listensock >= 0){close(_listensock);}}private: fd_set 是一个集合,里面可能会存在多个sock,不同种的sock需要进行不同的处理,不能在这个函数中只有一种处理void HandlerEvent(const fd_set &rfds){for (int i = 0; i < NUM; i++){// 1.去掉不合法fdif (_fd_array[i] == FD_NONE){continue;}// 2.合法fd也不一定就绪了if (FD_ISSET(_fd_array[i], &rfds)){// 指定的fd,读事件就绪// 读事件就绪:连接事件到来,acceptif (_fd_array[i] == _listensock){Accepter(); // listensock需要进行accept}else{Recver(i); // 普通sock进行recv}}}}void Accepter(){string clientip;uint16_t clientport = 0;// listensock上面的读事件就绪了,表示可以读取了// 获取新连接了int sock = Sock::Accept(_listensock, &clientip, &clientport); // 在这里进行accept是不会阻塞的if (sock < 0){logMessage(WARNING, "%s", "accept error");return;}logMessage(DEBUG, "get a new link success : [%s:%d] : %d", clientip.c_str(), clientport, sock);// read / recv? 不能!为什么不能?我们不清楚该sock上面数据什么时候到来,此时只是建立连接成功 ,recv、read就有可能先被阻塞,IO = 等+数据拷贝// 谁可能最清楚呢?select!// 得到新连接的时候,此时我们应该考虑的是,将新的sock托管给select,让select帮我们进行检测sock上是否有新的数据// 有了数据select,读事件就绪,select就会通知我,我们在进行读取,此时我们就不会被阻塞了// 要将sock添加 给 select, 其实我们只要将fd放入到数组中即可!int pos = 1;for (; pos < NUM; pos++){if (_fd_array[pos] == FD_NONE) // 找出_fd_array中未设置合法fd的位置{break;}}if (pos == NUM) // 数组满了{logMessage(WARNING, "%s:%d", "select server already full,close: %d", sock);close(sock);}else{_fd_array[pos] = sock; // 将sock加入_fd_array数组}}void Recver(int pos){// 读事件就绪:INPUT事件到来,recv,readlogMessage(DEBUG, "message in, get IO event: %d", _fd_array[pos]);// 此时select已经帮我们进行了事件检测,fd上的数据一定是就绪的,即 本次 不会被阻塞// 这样读取有bug吗?有的,你怎么保证以读到了一个完整报文呢?char buffer[1024];int n = recv(_fd_array[pos], buffer, sizeof(buffer) - 1, 0);if (n > 0){buffer[n] = 0;logMessage(DEBUG, "client[%d]# %s", _fd_array[pos], buffer);}else if (n == 0) // 对端关闭连接{logMessage(DEBUG, "client[%d] quit, me too...", _fd_array[pos]);// 1.我们也要关闭不需要的fdclose(_fd_array[pos]);// 2.不要让select帮我关心当前的fd了_fd_array[pos] = FD_NONE;}else{logMessage(WARNING, "%d sock recv error, %d : %s", _fd_array[pos], errno, strerror(errno));// 1.我们也要关闭不需要的fdclose(_fd_array[pos]);// 2.不要让select帮我关心当前的fd了_fd_array[pos] = FD_NONE;}}void DebugPrint(){cout << "_fd_array[]: ";for (int i = 0; i < NUM; i++){if (_fd_array[i] == FD_NONE)continue;cout << _fd_array[i] << " ";}cout << endl;}private:uint16_t _port;int _listensock;int _fd_array[NUM]; // 第三方数组,用来保存有所得合法fd
};
#endif
运行结果:
可以看出select服务器可以同时关心多个fd的事件,是一个高并发的服务器;
3.select的优缺点
优点:
- 效率高
- 应用场景:有大量的连接,但是只有少量是活跃的,省资源;
缺点:
- 为了维护第三方数组,select服务器会充满大量的遍历,OS底层帮我们关心fd的时候,也要遍历;
- 每一次都要对select输出参数进行重新设定;
- 能够同时管理的fd的个数是有上限;
- 因为几乎每一个参数都是输入输出型的,select一定会频繁的进行用户到内核,内核到用户的参数数据拷贝;
- 编码比较复杂;