目录
1. 项目介绍:
2. 技术选择;
3. 第三方库介绍;
4. 项目功能;
5. 模块功能;
6. 项目实现:
1. 项目介绍:
RPC是远程过程调用, 像调用本地接口一样调用远程接口, 进行完成业务处理, 计算任务等, 一个完整的RPC包括: 序列化协议, 通信协议, 连接复用, 服务注册, 服务发现, 服务订阅和通知, 负载均衡, 服务监控, 同步调用, 异步调用.
2. 技术选择:
采用远程调用call接口, 传递接口函数参数来调用接口, 网络协议的参数和返回值采用的是实验Json实现, 网络传输采用muduo库, 序列化和反序列化采用Json.
3. 第三方库介绍:
3.1 Jsoncpp库:
(1) 序列化: 将数据对象全部一种形式存放在value里面. 可以包含任何类型int, string, 等等.
StreamWriterBuilder(是StreamWriter的继承类)的newStreamWriter创建写入序列化工厂类, 将val里面的数据序列化找出以字符串形式给body.
下面是使用到接口的函数.
class JSON_API StreamWriter {virtual int write(Value const& root, std::ostream* sout) = 0; }class JSON_API StreamWriterBuilder : public StreamWriter::Factory {virtual StreamWriter* newStreamWriter() const; }
bool serialize(const Json::Value& val, std::string& body)
{std::stringstream ss;Json::StreamWriterBuilder swb;std::unique_ptr<Json::StreamWriter> sw( swb.newStreamWriter());int ret = sw->write(val, &ss);if(ret != 0){std::cout << "json serialize failed\n" << std::endl;}body = ss.str();return true;
}
(2) 反序列化: 将序列化好的数据, 又从序列化Value里面取出想要的对应数据.
CharReaderBuilder(是CharReader继承类)的newCharReader是读取反序列化的工厂类.
parse是进行将body中的数据进行分割, 再划分具体key和value的数据. 之后就可以根据key找到value了.
bool unserialize(std::string& body, Json::Value& val)
{Json::CharReaderBuilder crb;std::unique_ptr<Json::CharReader> cr(crb.newCharReader());std::string errs;bool ret = cr->parse(body.c_str(), body.c_str() + body.size(), &val, &errs);if(ret == false){std::cout << "json unserialize failed\n" << std::endl;return false;}return true;
}
(3) 序列化和反序列化实验: Json::Value 就是创建Json对象, 如果是数组类型, 就需要使用append进行插入.
int main()
{const char* name = "张三";int age = 19;const char* sex = "男";float score[3] = {89, 79.5, 98};Json::Value student;student["姓名"] = name;student["年龄"] = age;student["性别"] = sex;student["成绩"].append(score[0]);student["成绩"].append(score[1]);student["成绩"].append(score[2]);Json::Value fav;fav["书籍"] = "水浒传";fav["运动"] = "打羽毛球";student["爱好"] = fav;std::string body;serialize(student, body);std::cout << body << std::endl;std::string str = R"({"姓名": "王五", "年龄": 18, "成绩": [19, 23, 45.5]})";Json::Value stu;bool ret = unserialize(str, stu);if(ret == false)return -1;std::cout << "姓名: " << stu["姓名"] << std::endl;std::cout << "年龄: " << stu["年龄"] << std::endl;int sz = stu["成绩"].size();for(int i = 0; i < sz; i++){std::cout << "成绩: " << stu["成绩"][i].asFloat() << std::endl;}return 0;
}
3.2 Muduo库:
主要使用Muduo库来编写server和client的, 这里演示一个英汉翻译的程序.
(1) server端:
主要调用Muduo库里面EventLoop和TcpServer数据的构造, server服务器绑定port是本地主机, 以及9090端口, 再调用setConnectionCallback连接回调和setMessageCallback信息回调, onConnection是判断是否正常连接, onMessage对连接后信息的处理, 这里是进行英汉翻译, retrieveAllAsString是返回全部获取的字符串类型的数据. 最后还有给conn->send传递回去进行翻译成中文的数据.
开启服务器端就两点: _start进行开始服务器, loop进行事件的死循环监控.
#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/TcpConnection.h>
#include <muduo/net/Buffer.h>
#include <iostream>
#include <string>
#include <unordered_map>//翻译服务器:
class DictServer
{
public:DictServer(int port):_server(&_baseloop, muduo::net::InetAddress("0.0.0.0", 9090), "DictServer", muduo::net::TcpServer::kReusePort){_server.setConnectionCallback(std::bind(&DictServer::onConnection, this, std::placeholders::_1));_server.setMessageCallback(std::bind(&DictServer::onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));}void start(){_server.start();//打开服务器机械能监听;_baseloop.loop(); //开始死循环事务监听;}private:void onConnection(const muduo::net::TcpConnectionPtr& conn){if(conn->connected()){std::cout << "链接建立成功!" << std::endl;}else{std::cout << "链接断开" << std::endl;}}void onMessage(const muduo::net::TcpConnectionPtr& conn, muduo::net::Buffer* buf, muduo::Timestamp){static std::unordered_map<std::string, std::string> dict_map = {{"hello", "你好"},{"world", "世界"},{"love", "爱"}};//获取全部字符串;std::string msg = buf->retrieveAllAsString();std::string res;//查找对应单词;auto it = dict_map.find(msg);if(it != dict_map.end()){res = it->second;}else{res = "未知单词";}conn->send(res);}
private://服务器muduo::net::EventLoop _baseloop;muduo::net::TcpServer _server;
};int main()
{DictServer server(9090);server.start();return 0;
}
(2) 客户端: 客户端有TcpConnectionPtr进行连接管理, TcpClient进行客户端创建服务,
EventLoop进行事件监控, EventLoopThread 对全部事件进行监控, CountDownLatch进行同步控制(因为客户端和服务器都是异步操作, 不允许没有建立连接成功就发送数据),
setConnectionCallback是连接回调函数, setMessageCallback是信息回调函数.
#include <muduo/net/TcpServer.h>
#include <muduo/net/TcpClient.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/TcpConnection.h>
#include <muduo/net/Buffer.h>
#include <iostream>
#include <string>
#include <unordered_map>
#include <muduo/base/CountDownLatch.h>
#include <muduo/net/EventLoopThread.h>class DictClient
{
public:DictClient(const std::string& sip, int sport):_downlatch(1),//初始化计数为1._baseloop(_loopthread.startLoop()),_client(_baseloop, muduo::net::InetAddress(sip, sport), "DictClient"){_client.setConnectionCallback(std::bind(&DictClient::onConnection, this, std::placeholders::_1));_client.setMessageCallback(std::bind(&DictClient::onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_client.connect();//这里是异步操作, 可能连接没有建立成功, 需要等待之后被唤醒._downlatch.wait();}bool send(const std::string& msg){if(_conn->connected() == false){std::cout << "连接已断开!" << std::endl;return false;}_conn->send(msg);return true;}void onConnection(const muduo::net::TcpConnectionPtr& conn){if(conn->connected()){std::cout << "链接建立成功!" << std::endl;_downlatch.countDown();//计数为0被唤醒._conn = conn;}else{std::cout << "链接断开" << std::endl;_conn.reset();//重置.}}void onMessage(const muduo::net::TcpConnectionPtr& conn, muduo::net::Buffer* buf, muduo::Timestamp){std::string res = buf->retrieveAllAsString();std::cout << res << std::endl;}private:muduo::net::TcpConnectionPtr _conn;muduo::CountDownLatch _downlatch;muduo::net::EventLoopThread _loopthread;muduo::net::EventLoop* _baseloop;muduo::net::TcpClient _client;
};int main()
{DictClient client("127.0.0.1", 9090);while(1){std::string msg;std::cin >> msg;client.send(msg);}return 0;
}
实验结果: 客户端和服务器端的测试结果.
3.3 C++异步操作:
std::futrue: 是可以获取一个异步的结果,std::future::get()会阻塞获取结果, 直到异步完成.
(1)std::async关联异步任务, std::launch::deferred表示延迟调用, 直到调用get或者wait就执行任务. std::launch::async: 在自己创建的线程上执行;
异步会先调用函数执行, 等待阻塞之后在进行返回结果; 但是同步就是等待阻塞之后再进行函数执行和返回值返回的.
#include <iostream>
#include <future>int Add(int x, int y)
{std::cout << "into Add" << std::endl;return x + y;
}int main()
{//进行异步非阻塞调用. 异步先调用Add函数再返回结果.//std::future<int> res = std::async(std::launch::async, Add, 11, 22);//同步的话就是先休眠再调用Add再返回结果.std::future<int> res = std::async(std::launch::deferred, Add, 11, 22);//休眠一秒.std::this_thread::sleep_for(std::chrono::seconds(1));std::cout << "-------------------" << std::endl;std::cout << res.get() << std::endl; //获取异步结果.return 0;
}
(2) std::packaged_task: 将和任务绑定在一起;
实现使用包装器封装任务; get_future: 获取任务对象; 使用线程来执行任务, 最后使用get调用最后任务结果.
#include <iostream>
#include <future>
#include <memory>
#include <thread>int Add(int x, int y)
{std::cout << "into Add" << std::endl;return x + y;
}int main()
{//封装任务;auto task = std::make_shared<std::packaged_task<int(int, int)>>(Add);//获取future对象;std::future<int> res = task->get_future();//执行任务;std::thread thr([task](){(*task)(11, 22);});//获取任务结果:std::cout << res.get() << std::endl;thr.join();return 0;
}
(3) promise: 可以让本来等待的future就绪.
先创建promise对象, get_futrue获取future对象, 使用线程完成任务调用, 保持结果, 最后使用get获取任务结果.
#include <iostream>
#include <future>
#include <memory>
#include <thread>int Add(int x, int y)
{std::cout << "into Add" << std::endl;return x + y;
}int main()
{std::promise<int> pro;std::future<int> res = pro.get_future();std::thread thr([&pro](){int sum = Add(11, 22);pro.set_value(sum);});std::cout << res.get() << std::endl;thr.join();return 0;
}
4. 项目功能:
(1) 项目主要完成就是使用远程接口调用, 完成某种任务, 再将结果返回即可. 如果采用一对一进行客户端和服务器的设计, 那么服务器一旦挂了后面客户端都无法连接. 所以还得采用分布式架构, 这样才能将不同业务或者同一个业务拆分不同结点来执行, 解决高并发的问题, 提高性能. 这里采用的是采用注册中心, 首先客户端先在注册中心进行注册对应的服务, 找到关联的服务器, 于是就可以根据客户端想要提高什么服务, 特定服务器就为它提供.
(2) 还有就是主题的订阅转发, 除了单纯的消息的转发功能, 其次就是主题对订阅了某种服务的客户端再会主动发送数据消息给其他订阅该服务器这个消息. 注册中心也可以采用分布式架构,一旦注册中心下线, 还有备用注册中心提供服务,
(3) 整体RPC的功能就是:
RPC调用, 服务注册和发现, 服务上下线, 消息的发布和订阅.
5. 模块功能:
5.1 服务端模块划分:
a.接收客户端请求, 提供rpc服务, b. 提供服务注册和发现, 上线和下线的通知. c. 接收客户端请求, 进行主题的创建, 删除, 订阅, 取消功能.
主要接口: Network: 网络通信模块, 采用Muduo库来实现网络通信功能.
Protocol: 应⽤层通信协议模块; 解析数据, 防止粘包问题, 读取完整一条数据. 主要采用LV格式来处理粘包问题.
Length: 消息长度字段; MType: 消息类型 IDLength: ID字段长度; MID:ID, body: 正文数据.
Dispatcher: 消息分发处理模块; 对不同消息类型调用不同业务处理函数.
得到消息在onMessage里面进行应用层数据解析, 得到数据载荷(LV格式), 使用分发模块, map(消息类型, 回调函数) 进行根据不同类型数据调用不同处理回调函数.
RpcRouter: 远端调⽤路由功能模块;
提供rpc回调接口, 识别客户端需要的服务在返回结果. 主要根据请求方法名称以及请求参数信息, 这部分数据在body里面, 就需要进行序列化和反序列化操作.
PublishSubscibe: 发布订阅功能模块;
对发布请求进行处理, 提供一个回调函数给Dispatcher. 包含主题创建, 删除, 订阅, 取消订阅, 发布消息.
主题管理: 收到一条消息必须转发给关联过的其他服务器. 订阅者管理: 订阅者需要保持自己订阅的主题名称. 还有订阅取消, 创建, 主题上下线的服务.
Registry-Discovery:服务注册/发现/上线/下线功能模块;
针对服务注册与发现请求的处理, 服务注册: 告诉中转中心, 自己可以提供什么服务.
服务发现: 查询中转中心那个可以提供指定服务. 服务上线/下线: 那个服务可以提供和不能再提供服务.
Server:基于以上模块整合⽽出的服务端模块.
5.2 客户端模块划分:
和服务器大差不差,
1.Protocol:应用层通信协议模块 2. Network:网络通信模块 3. Dispatcher:消息分发处理模块 (这些和服务端一样);
4. Requestor:请求管理模块, 针对每个客户端进行管理, 提供合适服务的接口进行处理,
使用ID来标识每个客户端的管理, 这样就不会弄错客户端并且提供相关服务. 而且还有利于进行异步操作.
5. RpcCaller:远端调用功能模块 向用户提供rpc接口, 采用同步调用, 异步调用, 回调调用.
6. Publish-Subscribe:发布订阅功能模块 ;
这里有两个角色, 消息发布者,要先创建主题再发布消息. 消息订阅者, 不同消息进行不同处理.
7. Registry-Discovery:服务注册/发现/上线/下线功能模块
包含注册者和发现者: 关注上下线的服务.
8. Client:基于以上模块整合而出的客户端模块
5.3 抽象层:
(1) BaseBuffer: 缓冲区读取数据处理;
(2) BaseConnection: 对连接进行管理;
(3) BaseMessage: 数据处理;
(4) BaseServer: 服务器抽象;
(5) BaseProtocol: 应用层协议抽象;
5.4 具象层:
5.5 业务层:
(1) RPC:
(2) 发布订阅:
(3) 服务发现注册:
6. 项目实现:
6.1 零碎接口实现:
(1) 日志宏:
错误日志进打印工作;
DBUG: bug消息; DINF: 普通消息; DERR: 错误信息; DEFAULT: 是默认采用数据类型, DBUG. 采用不定参数获取不同类型的信息, 使用#define (...) 以及 给出数据类型和数据, 就可以打印出数据时间, 所在文件以及所在行.
strftime是将时间结构体转发成时间字符串类型.
#pragma once
#include <stdio.h>
#include <time.h>
#include <jsoncpp/json/json.h>
#include <sstream>
#include <iostream>
#include <memory>
#include <string>
#include <random>
#include <chrono>
#include <sstream>
#include <atomic>
#include <iomanip>namespace RPC
{// #define LOG(msg) printf("%s\n", msg)// #define LOG(format, msg) printf(format "\n", msg)// #define LOG(format, msg) printf("[%s:%d]" format "\n", __FILE__, __LINE__, msg)// #def ine LOG(format, msg){\
// time_t t = time(NULL);\
// struct tm* it = localtime(&t);\
// char time_tmp[32] = {0};\
// strftime(time_tmp, 31, "%m-%d %T", it);\
// printf("[%s][%s:%d] " format "\n", time_tmp, __FILE__, __LINE__, msg);\
// }// 不定参数...;##连接字符串;// #def ine LOG(format, ...){\
// time_t t = time(NULL);\
// struct tm* it = localtime(&t);\
// char time_tmp[32] = {0};\
// strftime(time_tmp, 31, "%m-%d %T", it);\
// printf("[%s][%s:%d] " format "\n", time_tmp, __FILE__, __LINE__, ##__VA_ARGS__);\
// }#define LDBG 0
#define LINF 1
#define LERR 2
#define LDEFAULT LDBG#define LOG(level, format, ...) \{ \if (level >= LDEFAULT) \{ \time_t t = time(NULL); \struct tm *it = localtime(&t); \char time_tmp[32] = {0}; \strftime(time_tmp, 31, "%m-%d %T", it); \fprintf(stdout, "[%s][%s:%d] " format "\n", time_tmp, __FILE__, __LINE__, ##__VA_ARGS__); \} \}#define DLOG(format, ...) LOG(LDBG, format, ##__VA_ARGS__)
#define ILOG(format, ...) LOG(LINF, format, ##__VA_ARGS__)
#define ELOG(format, ...) LOG(LERR, format, ##__VA_ARGS__)
}
(2) Json实现:
就是之前demo里面序列化和反序列化的操作.
class JSON{public:static bool serialize(const Json::Value &val, std::string &body){std::stringstream ss;Json::StreamWriterBuilder swb;std::unique_ptr<Json::StreamWriter> sw(swb.newStreamWriter());int ret = sw->write(val, &ss);if (ret != 0){ELOG("json serialize failed");return false;}body = ss.str();return true;}static bool unserialize(const std::string& body, Json::Value &val){Json::CharReaderBuilder crb;std::unique_ptr<Json::CharReader> cr(crb.newCharReader());std::string errs;bool ret = cr->parse(body.c_str(), body.c_str() + body.size(), &val, &errs);if (ret == false){ELOG("json unserialize failed : %s", errs.c_str());return false;}return true;}};
(3) UUID实现:
主要对客户端提供服务进行标识客户端的, 保证进行异步处理时候正常传递信息以及进行函数调用.
random_device构造随机数对象, distribution就是限定范围, setw: 设置输出宽度, setfill: 不足补0; hex十六进制输出. fetch_add: 原子的添加数据.
class UUID{public:std::string uuid(){std::stringstream ss;// 1. 构造随机数对象;std::random_device rd;// 2. 随机数种子进行构造伪随机数对象;std::mt19937_64 generator(rd());// 限定范围;std::uniform_int_distribution<int> distribution(0, 255);for (int i = 0; i < 8; i++){if (i == 4 || i == 6)ss << "-";ss << std::setw(2) << std::setfill('0') << std::hex << distribution(generator);}static std::atomic<size_t> seq(1);size_t cur = seq.fetch_add(1);for (int i = 7; i >= 0; i--){if (i == 5)ss << "-";ss << std::setw(2) << std::setfill('0') << std::hex << ((cur >> i * 8) & 0xFF);}return ss.str();}};
6.2 消息类型字段定义:
(1) 请求字段宏定义:
消息ID, 消息字段类型,
消息正文: a. Rpc请求: 方法名称, 方法参数, b.发布订阅相关请求: 主题名称, 操作类型, 主题消息; c. 服务操作相关请求: 方法名称, 操作类型, 主机消息(ip和port), d.响应码, e.RPC响应:响应结果.
(1) 请求正文字段:
//请求正文字段宏定义;#define KEY_METHOD "method"//请求方法;#define KEY_PARAMS "parameters"//请求参数;#define KEY_TOPIC_KEY "topic_key" //主题名称;#define KEY_TOPIC_MSG "topic_msg" //主题消息;#define KEY_OPTYPE "optype"//操作类型#define KEY_HOST "host"//主机信息#define KEY_HOST_IP "ip"//主机ip#define KEY_HOST_PORT "port"//主机端口号#define KEY_RCODE "rcode"//RPC响应码#define KEY_RESULT "result"//响应结果
(2) 消息类型字段:
包括rpc的响应和请求, 主题的响应和请求, 以及服务操作的响应和请求.
enum class MType{REQ_RPC = 0,RSP_RPC,REQ_TOPIC,RSP_TOPIC,REQ_SERVICE,RSP_SERVICE};
(3) 响应状态码: 还包含对响应码错误的返回描述.
enum class RCode {RCODE_OK = 0,RCODE_PARSE_FAILED,RCODE_ERROR_MSGTYPE,RCODE_INVALID_MSG,RCODE_DISCONNECTED,RCODE_INVALID_PARAMS,RCODE_NOT_FOUND_SERVICE,RCODE_INVALID_OPTYPE,RCODE_NOT_FOUND_TOPIC,RCODE_INTERNAL_ERROR};static std::string errReason(RCode code){static std::unordered_map<RCode, std::string> err_map = {{RCode::RCODE_OK, "成功处理!"},{RCode::RCODE_PARSE_FAILED, "消息解析失败!"},{RCode::RCODE_ERROR_MSGTYPE, "消息类型错误!"},{RCode::RCODE_INVALID_MSG, "无效消息!"},{RCode::RCODE_DISCONNECTED, "连接已断开!"},{RCode::RCODE_INVALID_PARAMS, "无效的Rpc参数!"},{RCode::RCODE_NOT_FOUND_SERVICE, "没有找到对应的服务"},{RCode::RCODE_INVALID_OPTYPE, "无效的操作类型!"},{RCode::RCODE_NOT_FOUND_TOPIC, "没有找到对应的主题"},{RCode::RCODE_INTERNAL_ERROR, "内部错误"}};auto it = err_map.find(code);if(it != err_map.end()){return "未知错误";}return it->second;}
(4) Rpc请求类型:
包含同步, 异步, 回调函数.
//1. 同步请求, 2. 异步请求, 3. 回调请求.enum class RType{REQ_SYNC = 0,//同步;REQ_ASYNC,//异步;REQ_CALLBACK//回调;};
(5) 主题类型定义:
//主题类型定义;enum class TopicOpType{TOPIC_CREATE = 0,//主题创建TOPIC_REMOVE,//主题删除TOPIC_SUBSCRIBE,//主题订阅TOPIC_CANCEL,//主题取消TOPIC_PUBLISH//主题发布};
(6) 服务类型字段定义:
//服务操作类型定义;enum class ServiceOpType{SERVICE_REGISTRY = 0,//服务注册SERVICE_DISCOVERY,//服务发现SERVICE_ONLINE,//服务上线SERVICE_OFFLINE,//服务下线SERVICE_UNKNOW//服务未知;};