0302 多进程网络服务器架构
专栏内容:
- postgresql使用入门基础
- 手写数据库toadb
- 并发编程
个人主页:我的主页
管理社区:开源数据库
座右铭:天行健,君子以自强不息;地势坤,君子以厚德载物.
一、概述
在大规模数据处理中,会有大量的客户端接入同一台服务器,每个客户端都需要长时间提供服务。
服务器采用中心化的部署,而客户端往往分散在不同机器上,服务器与客户端之间跨网络通信,一般采用C/S架构。
而服务端的架构需要能应对大量并发客户端,同时可以给每个客户端独占的服务,这就用到了多任务的网络模型架构,下面我们来看看用多进程如何实现。
二、多路复用的网络模型
C/S架构中,处理大量的网络请求,需要一套基于多路复用的网络处理模型。
- 可以同时处理网络连接请求和网络数据传递;
- 减少程序的阻塞时间,避免无效的CPU消耗;
- 适应不同的并发规模;
以此为目标实现如下网络模型。
2.1 服务端网络监听
多路复用模型这里采用了epoll方式,如果自己的平台不支持,可以换为select或者poll的方式。
代码如下:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <errno.h> #define MAX_EVENTS 10
#define BUFFER_SIZE 1024
#define PORT 8080 // 设置文件描述符为非阻塞模式
int set_nonblocking(int fd) { int flags, s; flags = fcntl(fd, F_GETFL, 0); if (flags == -1) { perror("fcntl F_GETFL"); return -1; } flags |= O_NONBLOCK; s = fcntl(fd, F_SETFL, flags); if (s == -1) { perror("fcntl F_SETFL"); return -1; } return 0;
} int main() { int listen_fd, conn_fd, nfds, epoll_fd; struct sockaddr_in server_addr; struct epoll_event ev, events[MAX_EVENTS]; char buffer[BUFFER_SIZE]; ssize_t count; // 创建监听socket listen_fd = socket(AF_INET, SOCK_STREAM, 0); if (listen_fd == -1) { perror("socket"); exit(EXIT_FAILURE); } // 设置非阻塞模式 if (set_nonblocking(listen_fd) == -1) { close(listen_fd); exit(EXIT_FAILURE); } // 绑定地址和端口 server_addr.sin_family = AF_INET; server_addr.sin_addr.s_addr = INADDR_ANY; server_addr.sin_port = htons(PORT); if (bind(listen_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) { perror("bind"); close(listen_fd); exit(EXIT_FAILURE); } // 开始监听 if (listen(listen_fd, SOMAXCONN) == -1) { perror("listen"); close(listen_fd); exit(EXIT_FAILURE); } // 创建epoll实例 epoll_fd = epoll_create1(0); if (epoll_fd == -1) { perror("epoll_create1"); close(listen_fd); exit(EXIT_FAILURE); } // 添加监听socket到epoll实例 ev.events = EPOLLIN; ev.data.fd = listen_fd; if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev) == -1) { perror("epoll_ctl: listen_fd"); close(listen_fd); close(epoll_fd); exit(EXIT_FAILURE); } // 主循环 while (1) { nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1); if (nfds == -1) { perror("epoll_wait"); exit(EXIT_FAILURE); } for (int n = 0; n < nfds; ++n) { if (events[n].data.fd == listen_fd) { // 新的连接 conn_fd = accept(listen_fd, NULL, NULL); if (conn_fd == -1) { perror("accept"); continue; } // 设置非阻塞模式 if (set_nonblocking(conn_fd) == -1) { close(conn_fd); continue; } // 添加新的连接socket到epoll实例 ev.events = EPOLLIN | EPOLLET; ev.data.fd = conn_fd; if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, conn_fd, &ev) == -1) { perror("epoll_ctl: conn_fd"); close(conn_fd); } } else { // 处理读事件 conn_fd = events[n].data.fd; while ((count = read(conn_fd, buffer, BUFFER_SIZE)) > 0) { // 处理接收到的数据(这里简单回显) write(conn_fd, buffer, count); } if (count == -1 && errno != EAGAIN) { // 出现错误或连接关闭 close(conn_fd); } else if (count == 0) { // 连接关闭 close(conn_fd); } // 从epoll实例中移除已关闭的socket if (count <= 0 && errno != EAGAIN) { ev.events = 0; epoll_ctl(epoll_fd, EPOLL_CTL_DEL, conn_fd, &ev); }} } } close(listen_fd); close(epoll_fd); return 0;
}
说明
- TCP服务端的基本步骤创建socket,设置为非阻塞模式,绑定IP与端口,开启监听;
- 这里服务端的socket需要设置为非阻塞模式,因为我们是在单进程中处理多个连接,每个连接不能阻塞等待;
- 然后加入到epoll监听池中,开始epoll事件的等待;这里只处理接收事件;
- 如果有服务端socket的接收事件,那么说明有客户端连接消息,进行accep,创建客户端连接的socket;
- 同样将客户端连接的socket设置为非阻塞,理由同上;加入epoll临听池中,同样也只处理接收事件;
- 如果有客户端连接的socket上的接收事件,那么说明客户端正在给服务端发消息;
- 收到客户端消息后,这里只是简单处理,原样再发给客户端;
- 如果客户端关闭或出错,将客户端连接关闭,并从epoll临听池中移除;
2.2 客户端测试
现在我们来编写一个简单的客户端模拟程序,测试一下多路复用的网络框架。
/** ex020302_client.c*/#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h> #define SERVER_IP "127.0.0.1"
#define SERVER_PORT 4808
#define BUFFER_SIZE 1024 #define CLIENT_SEND_CNT 20int main()
{ int sockfd; struct sockaddr_in server_addr; char buffer[BUFFER_SIZE] = {0}; const char *message = "Hello, Server!"; // 创建套接字 if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { perror("socket creation failed"); exit(EXIT_FAILURE); } // 配置服务器地址信息 server_addr.sin_family = AF_INET; server_addr.sin_port = htons(SERVER_PORT); // 将IP地址从字符串转换为二进制形式 if (inet_pton(AF_INET, SERVER_IP, &server_addr.sin_addr) <= 0) { perror("Invalid address/ Address not supported"); close(sockfd); exit(EXIT_FAILURE); } // 连接到服务器 if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) { perror("Connection Failed"); close(sockfd); exit(EXIT_FAILURE); } for(int i = 0; i < CLIENT_SEND_CNT; i++){// 发送消息到服务器 send(sockfd, message, strlen(message), 0); printf("Message sent: %s\n", message); // 接收服务器的响应 int bytes_received = recv(sockfd, buffer, BUFFER_SIZE - 1, 0); if (bytes_received < 0) { perror("Error in receiving"); } else if (bytes_received == 0) { printf("Server closed the connection\n"); } else { buffer[bytes_received] = '\0'; // 确保字符串以空字符结尾 printf("Message received from server: %s\n", buffer); } sleep(1);}// 关闭套接字 close(sockfd); return 0;
}
说明
- TCP客户端建立的基本步骤,创建socket,初始化服务端地址,连接服务器;
- 然后向服务端发送相同的消息;
- 每次发送完成后,等待接收消息;
2.3 客户端测试
可以看到,服务端处理客户端的请求时,都是按照接收到的顺序进行串行处理;
当客户端的数量达到成百上千时,对客户端的响应时间就会出现非常明显的延迟,
这种延迟会随着业务的复杂度而放大。
这时就需要充分利用多核CPU硬件资源,来进行并发任务的处理。
三、多进程服务处理
上面是在单个任务进程中处理了监听和大量任务连接的网络处理,各客户端连接的服务会相互影响,实际是串行化处理的。
要让大量的客户端能同时被响应,需要采用多任务的方式,那么在上面的网络模型基础上加入多进程,服务端为每个客户端连接准备一个独立的进程,这样就可以及时响应。
3.1 多进程架构
首先我们利用前面几个章节的介绍,来搭建一个多进程的代码架构,由主进程根据需要进行创建子进程,并且由主进程进行全局的控制。
/** ex020302_netprocess.c*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <errno.h> #define MAX_EVENTS 10
#define BUFFER_SIZE 1024
#define PORT 4808 void daemon_fork()
{int pid = -1;pid = fork();if(pid < 0){printf("fork error[%s]\n",strerror(errno));exit(-1);}else if(pid > 0){// parent exit.exit(0);}else {// child daemonreturn;}
}void subprocess(int sock)
{int pid = -1;pid = fork();if(pid < 0){printf("fork error[%s]\n",strerror(errno));exit(-1);}else if(pid > 0){// parent.close(sock);return;}else {// child close(listen_fd);processMsg(sock);exit(0);}
}
说明
- daemon服务程序函数,这个前一章节已经介绍过了,服务端以后台进程的方式运行;
- 子进程任务处理函数;这里创建的是任务子进程,并在子进程中调用消息处理函数;
- 这里需要注意的是,在子进程中要关闭服务端的socket,同时在父进程中要关闭客户端连接的socket; 因为父子进程会复制内存空间,但是在各自的进程中,已经不再需要;
3.2 并发网络处理模型
现在就可以将上面的多路复用网络处理放入多进程架构中,处理逻辑进行如下切分:
- 服务端监听socket初始化,多路复用器的初始化等,都放在主进程中,作为服务端网络初始化的一部分;
- 每个客户端连接的socket,以及它的读写消息处理逻辑,放在子进程中;这样每个客户端连接对应一个后台服务子进程;
- 创建子进程的时机,也就是在主进程中接收到新连接时,创建新连接成功后,就可以新建子进程进行处理;
- 而子进程的退出时间,就是客户端断开连接,或者处理出错时;
void initializeServerNet()
{struct sockaddr_in server_addr; // 创建监听socket listen_fd = socket(AF_INET, SOCK_STREAM, 0); if (listen_fd == -1) { perror("socket"); exit(EXIT_FAILURE); } // 绑定地址和端口 server_addr.sin_family = AF_INET; server_addr.sin_addr.s_addr = INADDR_ANY; server_addr.sin_port = htons(PORT); if (bind(listen_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) { perror("bind"); close(listen_fd); exit(EXIT_FAILURE); }// 开始监听if (listen(listen_fd, SOMAXCONN) == -1){perror("listen");close(listen_fd);exit(EXIT_FAILURE);}
}void closeServerFd()
{close(listen_fd);
}void dispatchLoop()
{int conn_fd; // 主循环 while (1) { // 新的连接 conn_fd = accept(listen_fd, NULL, NULL); if (conn_fd == -1) { perror("accept"); sleep(1);continue; }subprocess(conn_fd);}
}void processMsg(int sock)
{char buffer[BUFFER_SIZE]; ssize_t count; printf("serv-process:%d start.\n");while ((count = read(sock, buffer, BUFFER_SIZE)) > 0){// 处理接收到的数据(这里简单回显)write(sock, buffer, count);}if (count == -1 && errno != EAGAIN){// 出现错误或连接关闭close(sock);}else if (count == 0){// 连接关闭close(sock);}printf("serv-process:%d exit.\n");
}
那么主程序实现如下:
void daemon_fork();
void subprocess(int sock);
void processMsg(int sock);void initializeServerNet();
void closeServerFd();
void dispatchLoop();int listen_fd;int main(int argc ,char *argv[])
{daemon_fork();initializeServerNet();dispatchLoop();closeServerFd();return 0;
}
- 在主进程中先进程服务端初始化;
- 然后就可以开始监听,并接收客户端的连接;
- 当有客户端连接时,就创建客户端连接,并启动子进程与该客户端进行网络通信;
- 子进程在客户端断开连接或出错时,就会退出;
2.3 客户端测试
可以看到将客户端发送次数调大后,开启的客户端越多,服务端启动的子进程也就会越多;
此时,可以看到服务端每个进程的CPU使用率并不是很高;
但是随着客户端数量越来越多,服务端进程数量超过CPU核数时,就会增加系统的负担;
四、总结
本文主要介绍了基于多进程架构的网络服务器的设计与实现,在多进程架构中每个客户端会有一个服务端的进程专门处理通信,增加了对客户端消息的响应效率,提升了并发处理能力。
结尾
非常感谢大家的支持,在浏览的同时别忘了留下您宝贵的评论,如果觉得值得鼓励,请点赞,收藏,我会更加努力!
作者邮箱:study@senllang.onaliyun.com
如有错误或者疏漏欢迎指出,互相学习。
注:未经同意,不得转载!