文章目录
-
简单的TCP网络程序
-
-
服务端创建套接字
-
服务端绑定
-
服务端监听
-
服务端获取连接
-
服务端处理请求
-
客户端创建套接字
-
客户端发起请求
-
服务器测试
-
单执行流服务器的弊端
-
-
多进程版的TCP网络程序
-
线程池版的TCP网络程序
简单的TCP网络程序
服务端创建套接字
我们将TCP服务器封装成一个类,当我们定义出一个服务器对象后需要马上对服务器进行初始化,而初始化TCP服务器要做的第一件事就是创 建套接字。
TCP服务器在调用socket函数创建套接字时,参数设置如下:
- 协议家族选择
AF_INET
,因为我们要进行的是网络通信。 - 创建套接字时所需的服务类型应该是
SOCK_STREAM
,因为我们编写的是TCP服务器,SOCK_STREAM
提供的就是一个有序的、可靠的、全双工的、基于连接的流式服务。 - 协议类型默认设置 为0即可。
如果创建套接字后获得的文件描述符是小于0的,说明套接字创建失败,此时也就没必要进行后续操作了,直接终止程序即可。
class TcpServer
{
public:void InitServer(){//创建套接字_sock = socket(AF_INET, SOCK_STREAM, 0);if (_ sock < 0){std::cerr << "socket error" << std::endl;exit(2);}}~TcpServer(){if (_sock >= 0){close(_sock);}}
private:int _sock; //套接字
};
说明一下:
- 实际TCP服务器创建套接字的做法与UDP服务器是一样的,只不过创建套接字时 TCP需要的是流式服务,而UDP需要的是用户数据报服务。
- 当析构服务器时,可以将服务器对应的文件描述符进行关闭。
服务端绑定
套接字创建完毕后我们实际只是在系统层面上打开了一个文件,该文件还没有与网络关联起来,因此创建完套接字后我们还需要调用bind函数进行绑定操作。
绑定的步骤如下:
- 定义一个
struct sockad dr_in
结构体,将服务器网络相关的属性信息填充到该结构体当中,比如协议家族、IP地址、端口号等。 - 填充服务器网络相关的属性信息时,协议家族对应就是
AF_INET
,端口号就是当前TCP服务器程序的端口号。在设置端口号时,需要调用htons函数将端口号由主机序列转为网络序列。 - 在设置服务器的IP地址时,我们可以设置为本地环回
12 7.0.0.1
,表示本地通信。也可以设置为公网IP地址,表示网络通信。 - 如果使用的是云服务器,那么在设置服务器的IP地址时,不需要显示绑定IP地址,直接将IP地址设置为
INADDR_ANY
即可,此时服务器就可以从本地任何一张网卡当中读取数据。此外,由于INADDR_ANY
本质就是0,因此在设置时不需要进行网络字节序的转换。 - 填充完服 务器网络相关的属性信息后,需要调用bind函数进行绑定。绑定实际就是将文件与网络关联起来,如果绑定失败也没必要进行后续操作了,直接终止程序即可。
由于TCP服务器初始化时需要服务器的端口号,因此在服务器类当中需要引入端口号,当实例化服务器对象时就需要给传入一个端口号。而由于我当前使用的是云服务器,因此在绑定TCP服务器的IP地址时不需要绑定公网IP地址,直 接绑定INADDR_ANY
即可,因此我这里没有在服务器类当中引入IP地址。
class TcpServer
{
public:TcpServer(int port): _sock(-1), _port(port){}void InitServer(){//创建套接字_sock = socket(AF _INET, SOCK_STREAM, 0);if (_sock < 0){std::cerr << "socket error" << std::endl;exit(2);}//绑定struct sockaddr_in local;memset( &local, '\0', sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(_port);local.sin_addr.s_addr = INADDR_ANY;if (bind(_sock, (struct sockaddr*) &local, sizeof(local)) < 0){std::cerr << "bind error" << std::endl;exit(3);}}~TcpServer(){if (_sock >= 0){close(_sock);}}
private:int _sock; //监听套接字int _port; //端口号
};
当定义好struct sockaddr_in
结构体后,最好先 用memset函数对该结构体进行清空,也可以用bzero函数进行清空。bzero函数也可以对特定的一块内存区域进行清空,bzero函数的函数原型如下:
void bzero(void *s, size_t n);//过时且不安全,建议用memset
说明一下:
- TCP服务器绑定时的步骤与UDP服务器是完全一样的,没 有任何区别。
服务端监听
UDP服务器的初始化操作只有两步,第一步就是创建套接字,第二步就是绑定。而TCP服务器是面向连接的,客户端在正式向TCP服务器发送数据之前,需要先与TCP服务器建立连接,然后才能与服务器进行通信。
因此TCP服务器需要时刻注意是否有客户端发来连接请求,此时就需要将TCP服务器创建的套接字设置为监听状态。
list en函数
设置套接字为监听状态的函数叫做listen,该函数的函数原型如下:
int listen(int sockfd, int backlog);
参数说明:
- sockfd:需要设置为监听状态的套接字对应的文件描述符。
- backlog:全连接队列的最大长度。如果有多个客户端同时发来连接请求,此时未被服务器处理 的连接就会放入连接队列,该参数代表的就是这个全连接队列的最大长度,一般不要设置太大,设置为5或10即可。
返回值说明:
- 监听成功返回0,监听失败返回-1,同时错误码会被设置。
服务器监听
TCP服务器在创建完套接字和绑定后,需要再进一步将套接字设置为监听状态,监听是否有新的连接到来。如果监听失败也没必要进行后续操作了,因为监听失败也就意味 着TCP服务器无法接收客户端发来的连接请求,因此监听失败我们直接终止程序即可。
#define BACKLOG 5class TcpServer
{
public:void InitServer(){//创建套接字_listen_sock = socket(AF_INET, SOCK_STREAM, 0);if (_listen_sock < 0){std::cerr << "socket error" << std::endl;exit(2);}//绑定struct sockaddr_in local;memset( &local, '\0', sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(_port);local.sin_addr.s_addr = INADDR_ANY;if (bind(_listen_sock, (struct sockaddr*) &local, sizeof(local)) < 0){std::cerr << "bind error" << std::endl;exit(3);}//监听if (listen(_listen_sock, BACKLOG) < 0){std::cerr << "listen error" << std::endl;exit(4);}}
private:int _listen_sock; //监听套接字int _port; //端口号
};
说明一下:
- 初始化TCP服务器时创建的套接字并不是普通的套接字,而应该叫做监听套接字。为了表明寓意,我们将代码中套接字的名字由sock改为listen
socket。 - 在初始化TCP服务器时,只有创建套接字成功、绑定成功、监听成功,此时TCP服务器的初始化才算完成。
服务端获取连接
TCP服务器初始化后就可以开始运行了,但TCP服务器在与客户端进行网络通信之前,服务器需要先获取到客户端的连接请求。
accept函数
获取连接的函数叫做accept,该函数的函数原型如下 :
int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
参数说明:
- sockfd:特定的监听套接字,表示从该监听套接字中获取连接。
- addr:对端网络相关的属性信息,包括协议家族、IP地址、端口号等。
- addrlen:调用时传 入期望读取的addr结构体的长度,返回时代表实际读取到的addr结构体的长度,这是一个输入输出型参数。
返回值说明:
- 获取连接成功返回接收到的套接字的文件描述符,获取连接失败返回-1,同时错误码会被设置。
accept函数返回的套接字是什么?
调用accept函数获取连接时,是从监听套接字当中获取的。如果accept函数获取连接成功,此时 会返回接收到的套接字对应的文件描述符。
监听套接字与accept函数返回的套接字的作用:
- 监听套接字:用于获取客户端发来的连接请求。accept函数会不断从监听套接字当中获取新连接。
- accept函数返回的套接字:用于为本次accept获取到的连接提供服务。监听套接字的任务只是不断获取新连接,而真正为这些连接提供服务的套接字是ac cept函数返回的套接字,而不是监听套接字。
服务端获取连接
服务端在获取连接时需要注意:
- accept函数获取连接时可能会失败,但TCP服务器不会因为获取某个连接失败而退出,因此服务端获取连接失败后应该继续获取连接。
- 如果要将获取到的连接对应客户端的IP地址和端口号信息进行输出,需要调用inet_ntoa函数将整数IP转换成字符 串IP,调用ntohs函数将端口号由网络序列转换成主机序列。
- inet_ntoa函数在底层实际做了两个工作,一是将网络序列转换成主机序列,二是将主机序列的整数IP转换成字符串风格的点分十进制的IP。
class TcpServer
{
public:void Start(){for (;;)//死循环{//获取连接struct sockaddr_in peer;memset( &peer, '\0', sizeof(peer));socklen_t len = sizeof(peer);int sock = accept(_listen_sock, (struct sockaddr*) &peer, &len);if (sock < 0){std::cerr << "accept error, continue next" << std::endl;continue;}std::string client_ip = inet_ntoa(peer.sin_addr);int client_port = ntohs(peer.sin_port);std::cout " 服务端接收连接测试/*现在我们可以做一下简单的测试,看看当前服务器能否成功接收请求连接。在运行服务端时需要传入一个端口号作为服务端的端口号,然后我们用该端口号构造一个服务端对象,对服务端进行初始化后启动服务端即可。*/```cpp
void Usage(std::string proc)
{std::cout << "Usage: " << proc << " port" << std::endl;
}
int main(int argc, char* argv[])
{if (argc != 2){Usage(argv[0]);exit(1);}int port = atoi(argv[1]);TcpServer* svr = new TcpServer(port);svr->InitServ er();svr->Start();return 0;
}
编译代码后,以./tcp_server 端口号
的方式运行服务端。
# 运行命令示例
$ ./tcp_server 8081
# 输出示例
Server started on port 8081
服务端运行后,通过ne tstat
命令可以查看到一个程序名为tcp_server的服务程序,它绑定的端口就是8081,而由于服务器绑定的是INADDR_ANY
,因此该服务器的本地IP地址是0.0.0.0
,这就意味着该TCP服务器可以读取本地任何一张网卡里面的数据。此外,最重要的是当前该服务器所处的状态是LISTEN
状态,表明当前服务器可以接收外部的请求连接。
# 使用 netstat 查看监听状态
$ netstat -tuln | grep 8081
# 输出示例
tcp 0 0 0.0.0.0:8081 0.0.0.0:* LISTEN
虽然现在还没有编写客户端相关的代码,但是我们可以使用telnet
命令远程登录到该服务器,因为telnet
底层实际采用的就是TCP协议。
使用telnet
命令连接当前 TCP服务器后可以看到,此时服务器接收到了一个连接,为该连接提供服务的套接字对应的文件描述符就是4。因为0、1、2是默认打开的,其分别对应标准输入流、标准输出流和标准错误流,而3号文件描述符在初始化服务器时分配给了监听套接字,因此当第一个客户端发起连接请求时,为该客户端提供服务的套接字对应的文件描述符就是4。
# 使用 telnet 测试连接
$ telnet 127.0.0.1 8081
# 输出示例
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
如果此时我们再用其他窗口继续使用telnet
命令,向该TCP服务器发起请求连接,此时为该客户端提供服务的套接字对应的文件描述符就是5。
# 第二个 telnet 连接
$ telnet 127.0.0.1 8081
# 输出示例
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
当然,也可以直接用浏览器来访问这个TCP服务器,因为浏览器常见的应用层协议是http或https,其底层对应的也是TCP协议,因此浏览器也可以向当前这个TCP服务器发起请求连接。
# 使用浏览器访问
http://127.0.0.1:8081
# 输出示例(假设服务器有响应)
Hello, World!
说明一下:
- 至于这里为什么浏览器一次会向我们的TCP服务器发起两次请求这个问题,这里就不作讨论了,我们只是要证明当前TCP服务器能够正常接收外部的请求连接。
服务端处理请求
现在TCP服务器已经能够获取连接请求了, 下面当然就是要对获取到的连接进行处理。但此时为客户端提供服务的不是监听套接字,因为监听套接字获取到一个连接后会继续获取下一个请求连接,为对应客户端提供服务的套接字实际是accept函数返回的套接字,下面就将其称为“服务套接字”。
为了让通信双方都能看到对应的现象,我们这里就实现一个简单的回声TCP服务器,服务端在为客户端提供服务时就简单的将客户端发来的数据 进行输出,并且将客户端发来的数据重新发回给客户端即可。当客户端拿到服务端的响应数据后再将该数据进行打印输出,此时就能确保服务端和客户端能够正常通信了。
read函数
TCP服务器读取数据的函数叫做read,该函数的函数原型如下:
ssize_t read(int fd, void *buf, size_t count);
参 数说明:
- fd:特定的文件描述符,表示从该文件描述符中读取数据。
- buf:数据的存储位置,表示将读取到的数据存储到该位置。
- count:数据的个数,表示从该文件描述符中读取数据的字节数。
返回值说明:
- 如果返回值大于0,则表示本次实际读取到的字节个数。
- 如果返回值等于0,则表示对端已经把连接关闭了。
- 如果 返回值小于0,则表示读取时遇到了错误。
read返回值为0表示对端连接关闭
这实际和本地进程间通信中的管道通信是类似的,当使用管道进行通信时,可能会出现如下情况:
- 写端进程不写,读端进程一直读,此时读端进程就会被挂起,因为此时数据没有就绪。
- 读端进程不读,写端进程一直写,此时当管道被写满后写端进程就会被挂起,因为此时空间没有就绪。
- 写端进程将数据写完后将写端关闭,此时当读端进程将管道当中的数据读完后就会读到0。
- 读端进程将读端关闭,此时写端进程就会被操作系统杀掉,因为此时写端进程写入的数据不会被读取。
这里的写端就对应客户端,如果客户端将连接关闭了,那么此时服务端将套接字当中的信息读完后就会读取到0,因此如果服务端调用read函数后得到的返回值为0,此时服务端就不必再 为该客户端提供服务了。
write函数
TCP服务器写入数据的函数叫做write,该函数的函数原型如下:
ssize_t write(int fd, const void *buf, size_t count);
参数说明:
- fd:特定的文件描述符,表示将数据写入该文件描述符对应的套接字。
- buf:需要写入 的数据。
- count:需要写入数据的字节个数。
返回值说明:
- 写入成功返回实际写入的字节数,写入失败返回-1,同时错误码会被设置。
当服务端调用read函数收到客户端的数据后,就可以再调用write函数将该数据再响应给客户端。
服务端处理请求
需要注意的是,服务端读取数据是服务套接字中读取的,而写入数据的时候也是写入进服务套接字 的。也就是说这里为客户端提供服务的套接字,既可以读取数据也可以写入数据,这就是TCP全双工的通信的体现。
在从服务套接字中读取客户端发来的数据时,如果调用read函数后得到的返回值为0,或者读取出错了,此时就应该直接将服务套接字对应的文件描述符关闭。因为文件描述符本质就是数组的下标,因此文件描述符的资源是有限的,如果我们一直占用,那么可用的文件描述符就会越 来越少,因此服务完客户端后要及时关闭对应的文件描述符,否则会导致文件描述符泄漏。
class TcpServer
{
public:void Service(int sock, std::string client_ip, int client_port){char buffer[1024];while (true){ssize_t size = read(sock, buffer, sizeof(buffer)-1);if (size > 0){ //读取成功buffer[size] = '\0';std::cout << "get a new link->" << sock << " [" << client_ip << "]:" << client_port << std::endl;write(sock, buffer, size);}else if (size == 0){ //对端关闭连接std::cout << client_ip << ":" << client_port << " close!" << std::endl;break;}else{ //读取失败std::cerr << sock << " read error!" << std::endl;break;}}close(sock); //归还文件描述符std::cout << client_ip << ":" << client_port << " service done!" << std::endl;}void Start(){for (;;){//获取连接struct sockaddr_in peer;memset( &peer, '\0', sizeof(peer));socklen_t len = sizeof(peer);int sock = accept(_listen_sock, (struct sockaddr*) &peer, &len);if (sock < 0){std::cerr << "accept error, continue next" << std::endl;continue;}std::string client_ip = inet_ntoa(peer.sin_addr);int client_port = ntohs(peer.sin_port);std::cout << "get a new link [" << client_ip << "]:" << client_port << std::endl;//处理请求Service(sock, client_ip, client_port);}}
private:int _listen_sock; //监听套接字int _port; //端口号
};
客户端创建套接字
同样的,我们将客户端也封装成一个类,当我们定义出一个客户 端对象后也需要对其进行初始化,而初始化客户端唯一需要做的就是创建套接字。而客户端在调用[socket函数](https://so.csdn.net/so/search?q=socket%E5%87%BD%E6%95%B0 &spm=1001.2101.3001.7020)创建套接字时,参数设置与服务端创建套接字时是一样的。
客户端不需要进行绑定和监听:
- 服务端要进行绑定是因为服务端的IP地址和端口号必须要众所周知,不能随意改变。而客户端虽然也需要IP地址和端口号,但是客户端并不需要我们进行绑定操作,客户端连接服务端时系统会自动指定一个端口号给客户端。
- 服务 端需要进行监听是因为服务端需要通过监听来获取新连接,但是不会有人主动连接客户端,因此客户端是不需要进行监听操作的。
此外,客户端必须要知道它要连接的服务端的IP地址和端口号,因此客户端除了要有自己的套接字之外,还需要知道服务端的IP地址和端口号,这样客户端才能够通过套接字向指定服务器进行通信。
class TcpClient
{
public:TcpClient(std::string server_ip, int server_port): _sock(-1), _server_ip(server_ip), _server_port(server_port){}void InitClient(){//创建套接字_sock = socket(AF_INET, SOCK_STREAM, 0);if (_sock < 0){std::cerr << "socket error" << std::endl;exit(2);}}~TcpClient(){if (_sock >= 0){close(_sock);}}
private:int _sock; //套接字std::string _server_ip; //服务端IP地址int _server_port; //服务端端口号
};
客户端发起请求
客户端在连接到服务端后,可以通过 write
函数向服务端发送数据,并通过 read
函数读取服务端的响应。以下是完整的代码和解释:
-
客户端发送数据:
- 使用
std::cin
获取用户输入的消息。 - 调用
write
函数将消息写入套接字。
- 使用
-
客户端接收响应:
- 调用
read
函数从套接字中读取服务端的响应。 - 如果读取成功,打印服务端的响应;如果读取失败或服务端关闭连接,则退出循环。
- 调用
-
完整代码:
class TcpClient {
public:void Request() {std::string msg;char buffer[1024];while (true) {// 提示用户输入std::cout << "Please Enter# ";std::getline(std::cin, msg);// 发送消息到服务端if (write(_sock, msg.c_str(), msg.size()) < 0) {std::cerr << "Write failed!" << std::endl;break;}// 接收服务端的响应ssize_t size = read(_sock, buffer, sizeof(buffer) - 1);if (size > 0) { // 读取成功buffer[size] = '\0'; // 确保字符串以 NULL 结尾std::cout << "Server echo: " << buffer << std::endl;} else if (size == 0) { // 服务端关闭连接std::cout << "Server closed the connection!" << std::endl;break;} else { // 读取失败std::cerr << "Read failed!" << std::endl;break;}}}void Start() {struct sockaddr_in peer;memset(&peer, 0, sizeof(peer));peer.sin_family = AF_INET;peer.sin_port = htons(_server_port);peer.sin_addr.s_addr = inet_addr(_server_ip.c_str());// 连接到服务端if (connect(_sock, (struct sockaddr*)&peer, sizeof(peer)) == 0) {std::cout << "Connected to server successfully..." << std::endl;Request(); // 开始与服务端交互} else {std::cerr << "Failed to connect to server..." << std::endl;exit(3);}}private:int _sock; // 套接字std::string _server_ip; // 服务端 IP 地址int _server_port; // 服务端端口号
};
服务器测试
为了验证服务端和客户端的功能是否正常,我们需要进行以下步骤:
-
启动服务端:
- 编译并运行服务端程序,监听指定端口。
- 使用
netstat
命令查看服务端是否处于监听状态:
输出示例:netstat -tuln | grep 端口号
tcp 0 0 0.0.0.0:8080 0.0.0.0:* LISTEN
-
启动客户端:
- 编译并运行客户端程序,指定服务端的 IP 和端口:
./tcp_client 127.0.0.1 8080
- 客户端成功连接后,提示用户输入消息。
- 编译并运行客户端程序,指定服务端的 IP 和端口:
-
测试通信:
- 在客户端输入消息,观察服务端是否正确打印消息并回显。
- 在服务端输出示例:
Received from client [127.0.0.1]: Hello, Server! Sent response: Hello, Server!
- 在客户端输出示例:
Please Enter# Hello, Server! Server echo: Hello, Server!
-
测试多客户端:
- 启动多个客户端,分别向服务端发送消息。
- 验证服务端是否能够同时处理多个客户端的消息。
单执行流服务器的弊端
单执行流服务器的主要问题在于它一次只能为一个客户端提供服务,无法充分利用系统资源。以下是具体现象和原因分析:
-
现象:
- 当第一个客户端正在与服务端通信时,第二个客户端虽然可以成功连接,但其消息不会被服务端处理。
- 只有当第一个客户端断开连接后,服务端才会开始处理第二个客户端的消息。
-
原因:
- 单执行流服务器在调用
accept
获取连接后,会立即进入服务逻辑,阻塞在read
或write
操作上。 - 在此期间,服务端无法处理其他客户端的连接请求。
- 单执行流服务器在调用
-
底层机制:
- 当客户端发起连接请求时,操作系统会将该连接放入全连接队列(由
listen
函数的backlog
参数指定大小)。 - 如果服务端没有调用
accept
获取连接,这些连接会一直停留在队列中,直到队列满或超时。
- 当客户端发起连接请求时,操作系统会将该连接放入全连接队列(由
-
解决方案:
- 将单执行流服务器改为多执行流服务器,使用多进程或多线程技术。
- 多进程:每个客户端连接由一个子进程处理。
- 多线程:每个客户端连接由一个线程处理。
多进程版的TCP网络程序
多进程版服务器通过 fork
创建子进程为每个客户端提供服务,父进程继续监听新连接。以下是实现细节:
-
子进程继承文件描述符表:
- 子进程会继承父进程的文件描述符表,包括监听套接字和服务套接字。
- 父进程需要关闭子进程中不需要的文件描述符(如服务套接字),避免资源泄漏。
-
等待子进程退出:
- 如果父进程不等待子进程退出,子进程会变成僵尸进程。
- 解决方法:
- 捕捉
SIGCHLD
信号,将其处理动作设置为忽略。 - 或者让子进程创建孙子进程,父进程只等待子进程退出。
- 捕捉
-
代码实现:
class TcpServer {
public:void Start() {signal(SIGCHLD, SIG_IGN); // 忽略 SIGCHLD 信号,避免僵尸进程for (;;) {struct sockaddr_in peer;socklen_t len = sizeof(peer);int sock = accept(_listen_sock, (struct sockaddr*)&peer, &len);if (sock < 0) {std::cerr << "Accept error, continue next" << std::endl;continue;}std::string client_ip = inet_ntoa(peer.sin_addr);int client_port = ntohs(peer.sin_port);std::cout << "New connection from [" << client_ip << "]:" << client_port << std::endl;pid_t id = fork();if (id == 0) { // 子进程close(_listen_sock); // 关闭监听套接字Service(sock, client_ip, client_port); // 为客户端提供服务exit(0); // 子进程服务完成后退出}close(sock); // 父进程关闭服务套接字}}private:int _listen_sock; // 监听套接字int _port; // 端口号
};
**线程池版的TCP网络程序
任务类与Handler类的整合
在上一部分中,我们设计了Task
类和Handler
类,并提到通过仿函数的方式将业务逻辑解耦。接下来我们将详细说明如何将这些组件整合到服务器中。
-
任务类的设计:
Task
类封装了客户端的套接字、IP地址和端口号。- 提供一个
Run
方法,该方法调用Handler
类的仿函数来处理任务。
-
Handler类的设计:
Handler
类重载了()
操作符,定义了具体的业务逻辑。- 当前实现的是简单的回显服务,但可以轻松扩展为其他业务逻辑。
-
线程池的任务队列:
- 线程池中的线程不断从任务队列中取出任务并执行。
- 每个任务都是一个
Task
对象,其Run
方法会被调用。
以下是完整的代码实现:
1. ThreadPool
类
#define NUM 5template <typename T>
class ThreadPool {
private:bool IsEmpty() { return _task_queue.empty(); }void LockQueue() { pthread_mutex_lock(&_mutex); }void UnLockQueue() { pthread_mutex_unlock(&_mutex); }void Wait() { pthread_cond_wait(&_cond, &_mutex); }void WakeUp() { pthread_cond_signal(&_cond); }public:ThreadPool(int num = NUM) : _thread_num(num) {pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);}~ThreadPool() {pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}static void* Routine(void* arg) {pthread_detach(pthread_self());ThreadPool* self = (ThreadPool*)arg;while (true) {self->LockQueue();while (self->IsEmpty()) {self->Wait();}T task;self->Pop(task);self->UnLockQueue();task.Run(); // 执行任务}return nullptr;}void ThreadPoolInit() {for (int i = 0; i < _thread_num; ++i) {pthread_t tid;pthread_create(&tid, nullptr, Routine, this);}}void Push(const T& task) {LockQueue();_task_queue.push(task);UnLockQueue();WakeUp();}void Pop(T& task) {task = _task_queue.front();_task_queue.pop();}private:std::queue<T> _task_queue; // 任务队列int _thread_num; // 线程池中的线程数量pthread_mutex_t _mutex; // 互斥锁pthread_cond_t _cond; // 条件变量
};
2. Task
类
class Task {
public:Task() {}Task(int sock, const std::string& client_ip, int client_port): _sock(sock), _client_ip(client_ip), _client_port(client_port) {}void Run() {_handler(_sock, _client_ip, _client_port); // 调用仿函数处理任务}private:int _sock; // 客户端套接字std::string _client_ip; // 客户端IP地址int _client_port; // 客户端端口号Handler _handler; // 业务逻辑处理
};
3. Handler
类
class Handler {
public:void operator()(int sock, const std::string& client_ip, int client_port) {char buffer[1024];while (true) {ssize_t size = read(sock, buffer, sizeof(buffer) - 1);if (size > 0) { // 读取成功buffer[size] = '\0';std::cout << client_ip << ":" << client_port << "# " << buffer << std::endl;write(sock, buffer, size); // 回显消息} else if (size == 0) { // 客户端关闭连接std::cout << client_ip << ":" << client_port << " close!" << std::endl;break;} else { // 读取失败std::cerr << sock << " read error!" << std::endl;break;}}close(sock); // 关闭套接字std::cout << client_ip << ":" << client_port << " service done!" << std::endl;}
};
4. TcpServer
类
class TcpServer {
public:TcpServer(int port) : _listen_sock(-1), _port(port), _tp(nullptr) {}void InitServer() {// 创建监听套接字_listen_sock = socket(AF_INET, SOCK_STREAM, 0);if (_listen_sock < 0) {perror("Socket creation failed");exit(1);}// 绑定地址struct sockaddr_in local;memset(&local, 0, sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(_port);local.sin_addr.s_addr = INADDR_ANY;if (bind(_listen_sock, (struct sockaddr*)&local, sizeof(local)) < 0) {perror("Bind failed");close(_listen_sock);exit(1);}// 监听if (listen(_listen_sock, SOMAXCONN) < 0) {perror("Listen failed");close(_listen_sock);exit(1);}// 初始化线程池_tp = new ThreadPool<Task>();}void Start() {_tp->ThreadPoolInit(); // 启动线程池while (true) {struct sockaddr_in peer;socklen_t len = sizeof(peer);int sock = accept(_listen_sock, (struct sockaddr*)&peer, &len);if (sock < 0) {perror("Accept failed");continue;}std::string client_ip = inet_ntoa(peer.sin_addr);int client_port = ntohs(peer.sin_port);std::cout << "New connection from " << client_ip << ":" << client_port << std::endl;// 构造任务并加入线程池Task task(sock, client_ip, client_port);_tp->Push(task);}}~TcpServer() {if (_listen_sock != -1) {close(_listen_sock);}if (_tp) {delete _tp;}}private:int _listen_sock; // 监听套接字int _port; // 服务器端口ThreadPool<Task>* _tp; // 线程池
};
运行步骤
-
编译代码:
g++ main.cpp server.cpp client.cpp -o tcp_program -pthread
-
启动服务器:
./tcp_program 8080
-
启动多个客户端:
在不同终端中运行:./tcp_program 127.0.0.1 8080
-
测试通信:
- 客户端输入消息,服务器会回显。
- 多个客户端同时连接时,线程池中的线程会分配任务,确保并发处理。