C++实现集群聊天服务器
JSON
Json是一种轻量级的数据交换模式(也叫做数据序列化方式)。Json采用完全独立于编程语言的文本格式来存储和表示数据。见解和清晰的层次结构使得Json称为理想的数据交换语言。易于阅读和编写。同时也易于支持机器解析和生成,有效地提升网络传出效率。
这里讲的网络传输,就涉及到序列化和反序列化。以客户端和服务器端通信为例,一般情况下,客户端给服务器端发送信息,发送的信息可能是字符串、整型等信息,需要先转化为字节流数据,这就是序列化;同样,服务器接收到客户端发来的字节流信息,需要转化成原始的数据格式,这就是反序列化。
JSON for Modern C++是一个C++下的JSON库,具有以下特点:直观的语法、仅需使用头文件json.hpp
依赖、C++11标准编写、类似于STL使用json、STL和json可以互相转化、严谨的测试(所有类都经过严格的单元测试)。
数据序列化
在网络中,常见的数据传输序列化格式有XML、Json、ProtoBuf,其中ProtoBuf最为常用,其数据压缩编码传输占用带宽小,同样的数据信息,是Json的1/10,XML的1/20,但是使用起来稍微比Json复杂。
Json使用
头文件引入和重命名#include"json.hpp" using json = nlohmann::json;
。然后就可以是使用json类似于对象的使用方法使用json了。
#include"json.hpp"
using json = nlohmann::json;
#include <iostream>
#include <string>
using namespace std;void func1()
{json js;js["from"] = "zhangsan";js["message_type"] = 2;js["to"] = "lisi";js["message"] = "Hi, what are you doing?";cout << js << endl;string str=js.dump();//序列化,转化成字符串格式cout<<str.c_str()<<endl;// 模拟从网络接收到json字符串,通过json::parse函数把json字符串字节流转化为json对象json js2 = json::parse(temp);cout << js2 << endl;
}
void func2()
{json js;js["id"] = {1, 2, 3, 4, 5};js["name"] = "zhang san";js["msg"]["zhang san"] = "hello world";js["msg"]["liu shuo"] = "hello china";js["msg"] = {{"zhang san", "hello world"}, {"liu shuo", "hello china"}};cout << js << endl;
}
int main()
{func1();func2();return 0;
}
key采用哈希表,是无顺序的结构。
json的序列化——json_obj.dump()
。json反序列化——json::parse(json_str)
。
cmake常规使用
首先给出一个代码样例,通过代码样例基本上可以看懂一些常用的cmake命令:
cmake_minimum_required(VERSION 3.0) #CMake最小版本project(main)#定义当前工程的名字# set表示创建一个变量,并初始化对应的值
set(CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS} -g) #配置编译选项# include_directories()#头文件搜索目录# link_directories() #库文件搜索目录# 设置需要编译的源文件列表,其实也就是定义一个SRC_LIST变量名
set(SRC_LIST ./muduo_server.cpp)# 设置可执行文件最终存储的目录
set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/bin)# 把指定目录下的所有源文件名字放入变量名SRC_LIST里面
# aux_source_directory(./ SRC_LIST)# 表示生成可执行文件server,由SRC_LIST变量所定义的源文件编译而来
add_executable(server ${SRC_LIST})# 表示这个server目标程序,需要连接 muduo_net muduo_base pthread 等库文件
target_link_libraries(server muduo_net muduo_base pthread)
一般C++开源项目标准目录结构如下图所示:
一般在build目录下进行cmake ..
进行编译,然后会在build目录下生成编译过程中的中间文件,其中会存在一个Makefile
文件,在执行make
命令来生成最终的可执行文件。
PROJECT_NAME
: 通过 project() 指定项目名称
PROJECT_SOURCE_DIR
: 工程的根目录
PROJECT_BINARY_DIR
: 执行 cmake 命令的目录
CMAKE_CURRENT_SOURCE_DIR
: 当前 CMakeList.txt 文件所在的目录
CMAKE_CURRENT_BINARY_DIR
: 编译目录,可使用 add subdirectory 来修改
EXECUTABLE_OUTPUT_PATH
: 二进制可执行文件输出位置
LIBRARY_OUTPUT_PATH
: 库文件输出位置
BUILD_SHARED_LIBS
: 默认的库编译方式 ( shared 或 static ) ,默认为 static
CMAKE_C_FLAGS
: 设置 C 编译选项
CMAKE_CXX_FLAGS
: 设置 C++ 编译选项
CMAKE_CXX_FLAGS_DEBUG
: 设置编译类型 Debug 时的编译选项
CMAKE_CXX_FLAGS_RELEASE
: 设置编译类型 Release 时的编译选项
CMAKE_GENERATOR
: 编译器名称
CMAKE_COMMAND
: CMake 可执行文件本身的全路径
CMAKE_BUILD_TYPE
: 工程编译生成的版本, Debug / Release
Muduo
muduo网络库给用户提供了两个主要的类:
- TcpServer:用于编写服务器程序的
- TcpClient:用于编写客户端程序的
epoll+线程池:
优点:能够把网络I/O的代码和业务代码区分开、用户的断开和连接,用户可读写事件
Muduo服务端
下面提供了muduo进行服务器I/O和worker线程分离的代码示例:
#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <iostream>
#include <functional>
#include<string>
using namespace std;
using namespace muduo;
using namespace muduo::net;// 通用模板
class ChatServer
{
public:ChatServer(EventLoop *loop,const InetAddress &listenAddr,const string &nameArg) : _tcpserver(loop, listenAddr, nameArg), _loop(loop){// 给服务器注册用户连接的创建和断开的回调_tcpserver.setConnectionCallback(bind(&ChatServer::onConnection, this, placeholders::_1));// 给服务器注册用户读写事件回调_tcpserver.setMessageCallback(bind(&ChatServer::onMessage, this, placeholders::_1, placeholders::_2, placeholders::_3));// 设置服务器线程数量,1个I/O线程,3个worker线程_tcpserver.setThreadNum(4);}//开启事件循环void start(){_tcpserver.start();}private:// 专门处理用户的连接创建和断开 epoll、listenfd、acceptvoid onConnection(const TcpConnectionPtr &conn){if(conn->connected()){cout << conn->peerAddress().toIpPort() << " -> " << conn->localAddress().toIpPort() << " state online;"<<endl;}else{cout << conn->peerAddress().toIpPort() << " -> " << conn->localAddress().toIpPort() << " state offline;" << endl;conn->shutdown();//关闭连接}}// 专门处理用户的读写事件void onMessage(const TcpConnectionPtr &conn, // 连接Buffer *buffer, // 缓冲区Timestamp time) // 接收到信息的时间信息{string buf = buffer->retrieveAllAsString();cout << "recv data: " << buf << " time: " << time.toString() << endl;conn->send(buf);}TcpServer _tcpserver;EventLoop *_loop;
};
int main()
{EventLoop loop;InetAddress addr("127.0.0.1", 6000);//本机地址ChatServer server(&loop,addr,"ChatServer");//将listenfd通过epoll_ctl ->(传递给) epollserver.start();//按照epoll_wait以阻塞方式等待新用户连接,已连接用户的读写事件等loop.loop();return 0;
}
编译命令:
server:muduo_server.cppg++ $+ -o $@ -lmuduo_net -lmuduo_base -lpthread -std=c++11
$+
:表示依赖项,这里表示muduo_server.cpp 。$@
:表示目标项,这里表示server。
基于muduo网络库开发服务器程序,大概步骤如下:
1、组合TcpServer对象
2、创建EventLoop事件循环对象的指针
3、明确TcpServer构造函数需要什么参数(TcpServer无默认构造函数),输出ChatServer构造函数
4、在当前服务器的构造函数中,注册处理连接的回调函数和处理事件的连接函数
5、设置合适的服务器端线程数量,muduo会自己分配I/O线程和worker线程(一般设置1个I/O,n-1个worker线程)
负载均衡器
一台服务器在32位Linx系统环境下,大致的并发量sockfd大约是1024个,大约支持20000个人进行同时聊天。使用ulimit -n
命令查看系统允许当前用户进程打开的文件数限制,一般情况下每个进程最多允许打开1024个文件,还需要去除给当前用户进程必然打开的标准输入、标准输出、标准错误、服务器监听、进程通信等文件,剩下可以给客户端socket连接的文件数大概只有1014个左右,也就是说,基于Linux的通信程序最多运行同时1024个TCP并发连接。Linux下高并发socket最大连接数所受的各种限制点击查看更多。
在实际环境中可能会存在多个服务器同时在后台运行,当开始通信时,需要选定聊天的服务器。
LVS:负载均衡器常使用的设备。
nginx负载均衡器:相当于把服务器串联起来,在用户连接服务器后,ngnix负载均衡器将对client分配服务器,如果一台服务器时支持2W用户的连接,那么三台服务器就支持6W用户的连接。
聊天服务器属于长连接的业务。
redis是基于发布-订阅模式,类似于设计模式的观察者模式。
nginx安装
nginx在1.9版本之前只支持HTTP协议的web服务器的负载均衡,之后的版本开始支持TCP长连接的负载均衡。但是,nginx默认情况下没有编译TCP负载均衡模块,需要使用--with-stream
进行激活。
进入nginx官网下载对应的nginx的压缩包。
我们使用的ubuntu系统,所以下载第二列的nginx-1.25.2
版本,下载后得到安装包。使用tar -zxvf nginx-1.25.2.tar.gz
进行解压。解压后目录里面存在auto CHANGES CHANGES.ru conf configure contrib html LICENSE man README src
等文件夹和文件。在执行./configure --with-stream
开启基于TCP的负载均衡。
安装过程中可能存在库文件的丢失,这里我遇到了zlib,PCRE等缺失。可以按照下方命令进行安装:
sudo apt install zlib1g
sudo apt install zlib1g-dev
sudo apt-get install libpcre3 libpcre3-dev
然后再执行命令(可能需要管理员权限):
./configure --with-stream
make
make install
nginx -s reload #重新加载配置文件,例如添加服务器配置
nginx -s stop #停止nginx服务
需要在nginx的配置文件加入以下内容(nginx配置文件在/usr/local/nginx/conf/nginx.conf
,可执行文件在/usr/local/nginx/sbin/nginx
)
stream {upstream MyServer {server 127.0.0.1:6000 weight=1 max_fails=3 fail_timeout=30s;server 127.0.0.1:6002 weight=1 max_fails=3 fail_timeout=30s;}server {proxy_connect_timeout 1s;listen 8000;proxy_pass MyServer;tcp_nodelay on;}
}
上图显示了一个客户端连接服务器(连接的是nginx提供的ip和port),nginx将会给每个服务器按照配置进行分发客户端相应,间接等于客户端直连服务器(还是需要通过负载均衡器nginx),不影响用户之间的通信(非跨服务器通信)。
配置之后需要进行重新加载配置nginx -s reload
。
redis安装
ubuntu安装redis非常简单:
sudo apt-get install redis-server
查看redis的运行
ps -ef | grep redis
netstat -tanp
默认运行在6379端口tcp 0 0 127.0.0.1:6379 0.0.0.0:* LISTEN 144028/redis-server
。
redis是一个强大的缓存服务器,支持多种数据结构,如字符串、list列表、set集合、map映射表等结构,支持数据的持久化存储(存储在硬盘中),经常被用于高并发的服务器环境设计中。
redis其实类似于mysql,是client/server设计的。redis本身支持事务处理,多线程对key自增自减是线程安全的。
redis-cli #启动redis客户端
key-value
root@xiehou--ubuntu:~# redis-cli
127.0.0.1:6379> get "abc"
(nil)
127.0.0.1:6379> set "abc" 122
OK
127.0.0.1:6379> get "abc"
"122"
redis的发布-订阅机制:发布-订阅模式包含了两种角色,分别是消息的发布者和消息的订阅者。订阅者可以定义一个或者多个频道channel,发布者可以指向向某个频道channel发送消息,所有订阅此频道的订阅者都会收到此消息。
订阅的命令是subscribe。进入订阅模式后,处于此状态的客户端不能使用除subscribe、unsubscribe、psubscribe和punsubscribe这四个属于发布订阅的命令之外,否则就会报错。
进入订阅状态后客户端可能收到3种类型的回复。每种类型的回复都包含3个值,第一个值是消息的
类型,根据消类型的不同,第二个和第三个参数的含义可能不同。消息类型的取值可能是以下3个:
- subscribe:表示订阅成功的反馈信息。第二个值是订阅成功的频道名称,第三个是当前客户端订阅的频道数量。
- message:表示接收到的消息,第二个值表示产生消息的频道名称,第三个值是消息的内容。
- unsubscribe:表示成功取消订阅某个频道。第二个值是对应的频道名称,第三个值是当前客户端订阅的频道数量,当此值为0时客户端会退出订阅状态,之后就可以执行其他非"发布/订阅"模式的命令了。
带输入参数的调试gdb --args ./chatserver 127.0.0.1 8000
。或者先运行gdb ./chatserver
,然后在run 127.0.0.1 8000
。在某个cpp文件中打断点break chatservice.cpp:23
。
项目地址:https://gitee.com/xiehou-design/ChatServer