文章目录
- TCP客户端connect断线重连
- 1、为什么要断线重连
- 2、实现代码
TCP客户端connect断线重连
1、为什么要断线重连
客户端会面临服务器崩溃的情况,我们可以试着写一个客户端重连的代码,模拟并理解一些客户端行为,比如游戏客户端等.
考虑到下面2种情况:
- 服务器故障突然断开连接,过几秒又恢复连接,那么在这等待的几秒,客户端可以重新发起连接。
- 客户端wifi突然断了,也就是没网,然后过几秒又恢复了,客户端又可以重新发起连接了。
2、实现代码
代码主要实现细节:
枚举类型
Status
:
- 用于表示连接状态,包括
NEW
,CONNECTED
,CONNECTING
,DISCONNECTED
,CLOSE
。全局常量:
defaultsockfd
,默认套接字文件描述符为-1。
retryinterval
,重连间隔时间为1秒。
retryamxtimes
,重连最大次数为5次。
Connection
类:
构造函数初始化服务器IP、端口、状态、重连间隔和最大重连次数。
ConnectStatus()
方法返回当前连接状态。
Connect()
方法负责建立连接,并设置连接状态。
Process()
方法负责发送和接收数据,处理通信过程中的各种状态变化。
ReConnect()
方法处理重连逻辑,尝试在连接失败时进行多次重连。
DisConnect()
方法负责断开连接并重置套接字文件描述符。
TcpClient
类:
构造函数初始化连接对象。
Execute()
方法根据连接状态执行相应操作,包括建立连接、处理数据、重连和断开连接。
Usage()
函数:
- 用于输出使用说明。
main()
函数:
- 检查命令行参数,初始化
TcpClient
对象并执行连接逻辑。
TcpClient.cc
文件:#include <iostream> #include <string> #include <strings.h> #include <unistd.h>#include "Comm.hpp"enum class Status {NEW,CONNECTED,CONNECTING,DISCONNECTED,CLOSE };const int defaultsockfd = -1; const int retryinterval = 1; // 重连间隔时间 const int retryamxtimes = 5; // 重连最大次数class Connection { public:Connection(std::string serverip, uint16_t serverport): _sockfd(defaultsockfd),_serverip(serverip),_serverport(serverport),_status(Status::NEW),_retry_interval(retryinterval),_retry_max_times(retryamxtimes){}Status ConnectStatus(){return _status;}void Connect(){_sockfd = socket(AF_INET, SOCK_STREAM, 0);if (_sockfd < 0){_status = Status::DISCONNECTED;exit(CREATE_ERROR);}struct sockaddr_in server;bzero(&server, sizeof(server));server.sin_family = AF_INET;server.sin_port = htons(_serverport);// int inet_pton(int af, const char *src, void *dst);inet_pton(AF_INET, _serverip.c_str(), &server.sin_addr.s_addr); // 也可以&server.sin_addr,因为sin_addr只有s_addrint n = ::connect(_sockfd, CONV(&server), sizeof(server));if (n < 0){DisConnect(); //_status = Status::DISCONNECTED;return;}std::cout << "connect success" << std::endl;_status = Status::CONNECTED; // 已连接}void Process(){while (true){// 发送数据std::string message;std::cout << "Please Enter# ";std::getline(std::cin, message);int n = send(_sockfd, message.c_str(), message.size(), 0);if (n > 0){// 接收数据char buff[1024];int m = recv(_sockfd, buff, sizeof(buff) - 0, 0);if (m > 0){buff[m] = 0;std::cout << "Server Echo$ " << buff << std::endl;}else{_status = Status::DISCONNECTED; // 接收不成功就重连std::cerr << "recv error" << std::endl;break;}}else{_status = Status::CLOSE; // 发送不成功就退出std::cerr << "send error" << std::endl;break;}}}void ReConnect(){_status = Status::CONNECTING;int cnt = 1;while (true){Connect();if (_status == Status::CONNECTED){break;}std::cout << "正在重连,重连次数 : " << cnt++ << std::endl;if (cnt > _retry_max_times){_status = Status::CLOSE; // 重连失败std::cout << "重连失败,请检查网络.." << std::endl;break;}sleep(_retry_interval);}}void DisConnect(){if (_sockfd > defaultsockfd){close(_sockfd);_sockfd = defaultsockfd;}}private:int _sockfd;std::string _serverip;uint16_t _serverport;Status _status;int _retry_interval;int _retry_max_times; };class TcpClient { public:TcpClient(std::string serverip, uint16_t serverport) : _connect(serverip, serverport){}void Execute(){while (true){switch (_connect.ConnectStatus()){case Status::NEW:_connect.Connect();break;case Status::CONNECTED:_connect.Process();break;case Status::DISCONNECTED:_connect.ReConnect();break;case Status::CLOSE:_connect.DisConnect();return; // 断开连接了,重连不管用了default:break;}}}private:Connection _connect; };void Usage() {std::cout << "Please use format : ./tcp_client serverip serverport" << std::endl; }int main(int argc, char *argv[]) {if (argc != 3){Usage();exit(USAGE_ERROR);}std::string serverip = argv[1];uint16_t serverport = std::stoi(argv[2]);TcpClient tcpclient(serverip, serverport);tcpclient.Execute();return 0; }
和UDP服务器响应程序一样,客户端发什么就回什么,只不过多了建立连接的步骤。
Comm.hpp
文件#pragma once #include "InetAddr.hpp"enum errorcode {CREATE_ERROR = 1,BIND_ERROR,LISTEN_ERROR,SEND_ERROR,RECV_ERROR,CONNECT_ERROR,FORK_ERROR,USAGE_ERROR };#define CONV(ADDR) ((struct sockaddr *)ADDR)std::string CombineIpAndPort(InetAddr addr) {return "[" + addr.Ip() + ":" + std::to_string(addr.Port()) + "] "; }
InetAddr.hpp
文件#pragma once#include <iostream> #include <sys/types.h> /* See NOTES */ #include <sys/socket.h> #include <arpa/inet.h> #include <netinet/in.h> #include <string>class InetAddr {void GetAddress(std::string *ip, uint16_t *port){// char *inet_ntoa(struct in_addr in);*ip = inet_ntoa(_addr.sin_addr);*port = ntohs(_addr.sin_port);}public:InetAddr(const struct sockaddr_in &addr) : _addr(addr){GetAddress(&_ip, &_port);}std::string Ip(){return _ip;}uint16_t Port(){return _port;}bool operator==(InetAddr &addr){return _ip == addr.Ip() && _port == addr.Port();}const struct sockaddr_in& GetAddr(){return _addr;}~InetAddr() {}private:struct sockaddr_in _addr;std::string _ip;uint16_t _port; };
LockGuard.hpp
文件# pragma once#include <pthread.h>class LockGuard { public:LockGuard(pthread_mutex_t *mutex) : _mutex(mutex){pthread_mutex_lock(_mutex); // 构造加锁}~LockGuard(){pthread_mutex_unlock(_mutex); // 析构解锁}private:pthread_mutex_t *_mutex; };
Log.hpp
文件#pragma once#include <string> #include <iostream> #include <fstream> #include <unistd.h> #include <stdarg.h> #include <sys/types.h> #include "LockGuard.hpp"using namespace std;bool isSave = false; // 默认向显示器打印 pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; #define FILEPATH "./log.txt"enum level {DEBUG = 0,INFO,WARNING,ERROR,FATAL };void SaveToFile(const string &message) {ofstream out(FILEPATH, ios_base::app);if (!out.is_open())return;out << message;out.close(); }std::string LevelToString(int level) {switch (level){case DEBUG:return "Debug";case INFO:return "Info";case WARNING:return "Warning";case ERROR:return "Error";case FATAL:return "Fatal";default:return "Unknow";} }std::string GetTimeString() {time_t curr_time = time(nullptr);struct tm *format_time = localtime(&curr_time);if (format_time == nullptr)return "None";char buff[1024];snprintf(buff, sizeof(buff), "%d-%d-%d %d:%d:%d",format_time->tm_year + 1900,format_time->tm_mon + 1,format_time->tm_mday,format_time->tm_hour,format_time->tm_min,format_time->tm_sec);return buff; }void LogMessage(const std::string filename, int line, bool issave, int level, const char *format, ...) {std::string levelstr = LevelToString(level);std::string timestr = GetTimeString();pid_t pid = getpid();char buff[1024];va_list arg;// int vsnprintf(char *str, size_t size, const char *format, va_list ap); // 使用可变参数va_start(arg, format);vsnprintf(buff, sizeof(buff), format, arg);va_end(arg);LockGuard lock(&mutex);std::string message = "[" + timestr + "]" + "[" + levelstr + "]" + "[pid:" + std::to_string(pid) + "]" + "[" + filename + "]" + "[" + std::to_string(line) + "] " + buff + '\n';if (issave == false)std::cout << message;elseSaveToFile(message); }// 固定文件名和行数 #define LOG(level, format, ...) \do \{ \LogMessage(__FILE__, __LINE__, isSave, level, format, ##__VA_ARGS__); \} while (0)#define EnableScreen() \do \{ \isSave = false; \} while (0)#define EnableFile() \do \{ \isSave = true; \} while (0)void Test(int num, ...) {va_list arg;va_start(arg, num);while (num--){int data = va_arg(arg, int);std::cout << data << " ";}std::cout << std::endl;va_end(arg); }
Main.cc
文件#include <iostream> #include <memory> #include "TcpServer.hpp"void Usage() {// printf("./udp_server serverip serverport\n");printf("Usage : ./udp_server serverport\n"); // ip 已经设置为0 }int main(int argc, char *argv[]) {// if (argc != 3)if (argc != 2){Usage();exit(USAGE_ERROR);}uint16_t serverport = std::stoi(argv[1]);std::unique_ptr<TcpServer> tsvr = std::make_unique<TcpServer>(serverport);tsvr->InitServer();tsvr->Start();return 0; }
Makefile
文件.PHONY:all all:tcp_client tcp_servertcp_client:TcpClient.ccg++ -o $@ $^ -std=c++14 -lpthread tcp_server:Main.ccg++ -o $@ $^ -std=c++14 -lpthread.PHONY:clean clean:rm -f tcp_server tcp_client
TcpServer.hpp
文件#pragma once#include <sys/types.h> /* See NOTES */ #include <sys/wait.h> #include <sys/socket.h> #include <arpa/inet.h> #include <netinet/in.h>#include <error.h> #include <string.h> #include <pthread.h> #include <functional>#include "Log.hpp" #include "InetAddr.hpp" #include "Comm.hpp" #include "Threadpool.hpp"const int defaultsockfd = -1; int gbacklog = 16; // 暂时先用using task_t = std::function<void()>;// 声明 class TcpServer;class ThreadData { public:ThreadData(int sockfd, InetAddr addr, TcpServer *self): _sockfd(sockfd), _addr(addr), _self(self) {}~ThreadData() = default;public:int _sockfd;InetAddr _addr;TcpServer *_self; };class TcpServer { public:TcpServer(uint16_t port) : _port(port), _listensock(defaultsockfd), _isrunning(false){}void InitServer(){// 创建_listensock = socket(AF_INET, SOCK_STREAM, 0); // 这个就是文件描述符if (_listensock < 0){LOG(FATAL, "create sockfd error, error code : %d, error string : %s", errno, strerror(errno));exit(CREATE_ERROR);}LOG(INFO, "create sockfd success");struct sockaddr_in local;bzero(&local, sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(_port);local.sin_addr.s_addr = INADDR_ANY;// 绑定int n = ::bind(_listensock, CONV(&local), sizeof(local));if (n < 0){LOG(FATAL, "bind sockfd error, error code : %d, error string : %s", errno, strerror(errno));exit(BIND_ERROR);}LOG(INFO, "bind sockfd success");}void Service(int sockfd, InetAddr client){while (true){// TCP是字节流(可以使用write和read接口),UDP是数据报char buff[1024];// 接收消息int n = ::read(sockfd, buff, sizeof(buff)); // bug,接收数据可能收到的不完整,比如1+100,可能先收到1+1,再收到00 -- 按序到达std::string clientAddr = CombineIpAndPort(client);if (n > 0){buff[n] = 0;std::string message = clientAddr + buff;LOG(INFO, "get message : \n %s", message.c_str());// 发送消息int m = ::write(sockfd, buff, strlen(buff)); if (m < 0){LOG(FATAL, "send message error ,error code : %d , error string : %s", errno, strerror(errno));exit(SEND_ERROR);}}else if (n == 0){// 发送端不发送数据了LOG(INFO, "%s quit", clientAddr.c_str());break;}else{LOG(FATAL, "recv message error ,error code : %d , error string : %s", errno, strerror(errno));exit(RECV_ERROR);}}::close(sockfd); // 服务结束,关闭文件描述符,避免文件描述符泄漏}static void *HandlerService(void *args){pthread_detach(pthread_self()); // 分离线程ThreadData *td = static_cast<ThreadData *>(args);td->_self->Service(td->_sockfd, td->_addr);delete td;return nullptr;}void Start(){_isrunning = true;while (_isrunning){// 监听int ret = ::listen(_listensock, gbacklog);if (ret < 0){LOG(FATAL, "listen error, error code : %d , error string : %s", errno, strerror(errno));exit(LISTEN_ERROR);}LOG(INFO, "listen success!");struct sockaddr_in peer;socklen_t len = sizeof(peer);// 获取新连接int sockfd = accept(_listensock, CONV(&peer), &len); // 建立连接成功,创建新文件描述符进行通信if (sockfd < 0){LOG(WARNING, "accept error, error code : %d , error string : %s", errno, strerror(errno));continue;}LOG(INFO, "accept success! new sockfd : %d", sockfd);InetAddr addr(peer); // 给后面提供传入的ip、port// 服务 -- 发送和接收数据// V0 -- 单进程// Service(sockfd, addr); // 这里是while死循环,没有运行完就一直运行,下一个请求来的时候得这个while退出才能执行// v1 -- 多进程// int id = fork();// if (id == 0)// {// // 子进程// ::close(_listensock); // 子进程对监听文件描述符不关心// if (fork() > 0)// exit(0); // 子进程创建进程后退出,孙子进程被系统领养,不用等待// Service(sockfd, addr); // 孙子进程执行任务// exit(0);// }// else if (id > 0)// {// ::close(sockfd); // 这里每次关闭的文件描述符都是4,使得每次accept创建的文件描述符都是4,这个4是留个各个子进程(子进程再给孙子进程)的(互不影响)// // 父进程// pid_t rid = waitpid(id, nullptr, 0); // 虽然是阻塞等待,但是子进程是刚创建就退出来了,让孙子进程(孤儿进程)执行任务,接下来继续监听和建立连接// if (rid == id)// {// LOG(INFO, "wait child process success");// }// }// else// {// // 异常// LOG(FATAL, "fork error ,error code : %d , error string : %s", errno, strerror(errno));// exit(FORK_ERROR);// }// v2 -- 多线程// pthread_t tid;// ThreadData *td = new ThreadData(sockfd, addr, this); // 传指针// pthread_create(&tid, nullptr, HandlerService, td); // 这里创建线程后,线程去做执行任务,主线程继续向下执行 , 并且线程不能关闭sockf,线程和进程共享文件描述符表// v3 -- 线程池task_t t = std::bind(&TcpServer::Service, this, sockfd, addr);Threadpool<task_t>::GetInstance()->Enqueue(t);// v4 -- 进程池 -- 不推荐,需要传递文件描述符!}_isrunning = false;}~TcpServer(){if (_listensock > defaultsockfd)::close(_listensock); // 不用了关闭监听}private:uint16_t _port;int _listensock;bool _isrunning; };
Thread.hpp
文件#ifndef __THREAD_HPP__ #define __THREAD_HPP__#include <iostream> #include <string> #include <unistd.h> #include <functional> #include <pthread.h>using namespace std;// 封装Linux线程 namespace ThreadModule {using func_t = function<void(string &)>;class Thread{public:// /* ThreadData* */Thread(func_t<T> func, T data, const string& name = "default name") : _func(func), _data(data), _threadname(name), _stop(true) {}Thread(func_t func, const string &name = "default name") : _func(func), _threadname(name), _stop(true) {}void Execute(){_func(_threadname);// _func(_data);}// 隐含thisstatic void *threadroutine(void *arg){Thread *self = static_cast<Thread *>(arg);self->Execute(); // static 访问不了成员变量return nullptr;}bool Start(){int n = pthread_create(&_tid, nullptr, threadroutine, this);if (!n){_stop = false;return true;}else{return false;}}void Detach(){if (!_stop){pthread_detach(_tid);}}void Join(){if (!_stop){pthread_join(_tid, nullptr);}}string name(){return _threadname;}void Stop(){_stop = true;}// ~Thread() {}private:pthread_t _tid;string _threadname;func_t _func;bool _stop;};} // namespace ThreadModule#endif
Threadpool.hpp
文件#pragma once#include <vector> #include <queue> #include <queue> #include "Thread.hpp" #include <pthread.h> #include "LockGuard.hpp"using namespace ThreadModule;const int NUM = 3;template <typename T> class Threadpool {void LockQueue(pthread_mutex_t &mutex){pthread_mutex_lock(&mutex);}void UnLockQueue(pthread_mutex_t &mutex){pthread_mutex_unlock(&mutex);}void SleepThread(pthread_cond_t &cond, pthread_mutex_t &mutex){pthread_cond_wait(&cond, &mutex);}void WakeUpThread(pthread_cond_t &cond){pthread_cond_signal(&cond);}void WakeUpAll(pthread_cond_t &cond){pthread_cond_broadcast(&_cond);}Threadpool(const int threadnum = NUM) : _threadnum(threadnum), _waitnum(0), _isrunning(false){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);LOG(INFO, "Threadpool Constructor successful ! ");}void TaskHandler(string &name){// sleep(1);// cout << name << " : hh " << endl;// sleep(1);LOG(DEBUG, "%s is running", name.c_str());while (true){LockQueue(_mutex);while (_task_queue.empty() && _isrunning){// 等待++_waitnum;SleepThread(_cond, _mutex);--_waitnum;}// 此时一定大于一个线程没有休眠if (_task_queue.empty() && !_isrunning){// 此时任务队列已经没有内容,且此时线程池已经停止UnLockQueue(_mutex);cout << name << " quit ... " << endl;break;}LOG(DEBUG, "%s get task sucessful !", name.c_str());// 其他情况就得处理任务T t = _task_queue.front();_task_queue.pop();UnLockQueue(_mutex);// 处理任务t();// cout << name << " : " << t.stringResult() << endl;// LOG(DEBUG, "%s handler task sucessful ! Result is %s", name.c_str(), t.stringResult().c_str());sleep(1);}}void InitThreadPool(){for (int i = 0; i < _threadnum; ++i){string name = "Thread - " + to_string(i + 1);_threads.emplace_back(bind(&Threadpool::TaskHandler, this, placeholders::_1), name);}_isrunning = true;LOG(INFO, "Init Threadpool successful !");}public:static Threadpool<T> *GetInstance(int threadnum = NUM){if (_instance == nullptr){LockGuard lockguard(&_lock);if (_instance == nullptr){// pthread_mutex_lock(&_lock);// 第一次创建线程池_instance = new Threadpool<T>(threadnum);_instance->InitThreadPool();_instance->Start();LOG(DEBUG, "第一次创建线程池");// pthread_mutex_unlock(&_lock);return _instance;}}LOG(DEBUG, "获取线程池");return _instance;}bool Enqueue(const T &in){bool ret = false;LockQueue(_mutex);if (_isrunning){_task_queue.push(in);if (_waitnum > 0)WakeUpThread(_cond);LOG(DEBUG, "enqueue sucessful...");ret = true;}UnLockQueue(_mutex);return ret;}void Stop(){LockQueue(_mutex);_isrunning = false;if (_waitnum > 0)WakeUpAll(_cond);UnLockQueue(_mutex);}void Start(){for (auto &thread : _threads){thread.Start();LOG(INFO, "%s is start sucessful...", thread.name().c_str());}}void Wait(){for (auto &thread : _threads){thread.Join();LOG(INFO, "%s is quit...", thread.name().c_str());}}~Threadpool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);LOG(INFO, "delete mutex sucessful !");}private:vector<Thread> _threads;queue<T> _task_queue;int _threadnum;int _waitnum;pthread_mutex_t _mutex; // 互斥访问任务队列pthread_cond_t _cond;bool _isrunning;// 懒汉模式static Threadpool<T> *_instance;static pthread_mutex_t _lock; };template <typename T> Threadpool<T> *Threadpool<T>::_instance = nullptr; template <typename T> pthread_mutex_t Threadpool<T>::_lock = PTHREAD_MUTEX_INITIALIZER;
运行结果:
OKOK,TCP客户端connect短线重连就到这里,如果你对Linux和C++也感兴趣的话,可以看看我的主页哦。下面是我的github主页,里面记录了我的学习代码和leetcode的一些题的题解,有兴趣的可以看看。
Xpccccc的github主页