该代码利用socket套接字建立Tcp连接,包含服务器和客户端。当服务器和客户端启动时需要把端口号或ip地址以命令行参数的形式传入。服务器启动如果接受到客户端发来的请求连接,accept函数会返回一个打开的socket文件描述符,区别于监听连接的listensock,它用来为客户端提供服务的。因为有线程池的存在,可以立即使用已经创建好的线程来为客户端提供服务。线程池中存在一个数据结构专门用来存放客户端IP与端口信息,如果没有新的客户端连接服务器,那么该数据结构内容为空,那么多余的线程就会因为没有用户连接而阻塞,直到新用户的到来。以上就是对代码的大概介绍了。
下面是关于代码的六点细节解释:
1.查看网络连接
netstat -nltp
2.可以用read函数读取TCP套接字的数据,而UDP不行。因为UDP是面向数据报,而TCP是面向数据流。所以代码使用了read与write函数进行收发消息,这也印证了网络并没有多么高大上,socket也是一个文件描述符。
3.客户端不需要手动bind,listen,accept,但是客户端需要自己connect服务器,connect会做两件事,bind和connect。
4.客户端的端口号要操作系统随机分配,防止客户端出现启动冲突。想想如果多个应用程序都想占用一个端口号进行网络通信的场景。
5.inet_aton函数
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
int inet_aton(const char *cp, struct in_addr *inp);//cp必须是点分十进制字符串
把“192.168.1.1”这样的点分十进制字符串转换成struct sockaddr_in结构体里面的in_addr结构体(网络序列)。已知in_addr结构体里面只有一个成员,一个32位无符号整数。
成功返回0,失败返回非0;
使用方法:
6.使用signal(SIGCHLD, SIG_IGN)处理僵尸进程
通过signal(SIGCHLD, SIG_IGN)通知内核对子进程的结束不关心,由内核回收。如果不想让父进程挂起,可以在父进程中加入一条语句:signal(SIGCHLD,SIG_IGN)。表示父进程忽略SIGCHLD信号,该信号是子进程退出的时候向父进程发送的。
代码:
tcp_server.cc
#include "tcp_server.hpp"
#include<iostream>
#include<cstdlib>
#include <memory>
using namespace ns_server;static void Usage(string proc)
{cout << "Usage:\n\t" << proc << " port\n"<< endl;
}static string echo(string message)
{return message;
}// .tcp_server serverport
int main(int argc,char* argv[])
{if(argc!=2){Usage(argv[0]);exit(USAGE_ERR);}uint16_t serverport=atoi(argv[1]);unique_ptr<TcpServer> tsvr(new TcpServer(serverport,echo));tsvr->InitServer();tsvr->Start();return 0;
}
tcp_server.hpp
#pragma once
#include <iostream>
using namespace std;
#include <functional>
#include "err.hpp"
#include <sys/types.h>
#include <sys/socket.h>
#include <cstdlib>
#include <cstring>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include<errno.h>
#include<pthread.h>
#include"ThreadPool.hpp"
#include "Task.hpp"
#include "Thread.hpp"
#include "lockGuard.hpp"
#include "log.hpp"namespace ns_server
{class TcpServer;class ThreadData{public:ThreadData(TcpServer* current,int sock,string client_ip,uint16_t client_port):_current(current),_sock(sock),_client_ip(client_ip),_client_port(client_port){}~ThreadData(){}TcpServer* _current;int _sock;string _client_ip;uint16_t _client_port;};static const uint32_t backlog = 32;static const uint16_t defaultport = 8888;using func_t = function<string(const string&)>;class TcpServer{public:TcpServer(uint16_t port ,func_t func): _port(port), _func(func), _quit(true){}void InitServer(){// 1.创建监听套接字_listensock = socket(AF_INET, SOCK_STREAM, 0);if (_listensock < 0){//cerr << "Socket create error!" << endl;logMessage(Fatal, "create socket error, code: %d, error string: %s",errno,strerror(errno));exit(SOCKET_ERR);}logMessage(Info, "create socket success, code: %d, error string: %s", errno, strerror(errno));// 2.绑定本地端口与IPstruct sockaddr_in local;memset(&local, 0, sizeof(local));local.sin_family = AF_INET;local.sin_addr.s_addr = htons(INADDR_ANY); // 32位全零,不用转成网络序列,因为主机序列与网络序列一样;local.sin_port = htons(_port);int n1 = bind(_listensock, (const struct sockaddr *)&local, sizeof(local));if (n1 < 0){//cerr << "Bind socket error!" << endl;logMessage(Fatal, "bind socket error, code: %d, error string: %s",errno,strerror(errno));exit(BIND_ERR);}logMessage(Info, "bind socket success, code: %d, error string: %s",errno,strerror(errno));// 3.监听int n2 = listen(_listensock, backlog);if (n2 < 0){//cerr << "Listen socket error!" << endl;logMessage(Fatal, "listen socket error, code: %d, error string: %s", errno, strerror(errno));exit(LISTEN_ERR);}logMessage(Info, "listen socket success, code: %d, error string: %s", errno, strerror(errno));}void Start(){// signal(SIGCHLD,SIG_IGN);//不关心子进程的退出,由内核回收;_quit = false;while (!_quit){struct sockaddr_in client;socklen_t len = sizeof(client);// 4.不断获取新的客户端的连接,没有就阻塞;int sock = accept(_listensock, (struct sockaddr *)&client, &len);if (sock < 0){//cerr << "ACCEPT error!" << endl;logMessage(Warning, "accept error, code: %d, error string: %s", errno, strerror(errno));continue;} // 提取client信息---debug;string client_ip = inet_ntoa(client.sin_addr);//从网络序列转成主机序列,并将点分十进制字符串uint16_t client_port = ntohs(client.sin_port);// 网络序列转为主机序列//cout << client_ip << "-" << client_port<<"连接成功"<< endl; logMessage(Info, "accept success,%d from %d,name:%s-%d",sock,_listensock,client_ip.c_str(),client_port);//线程池(主线程只负责接受客户端信息,并为其创建套接字进行沟通)//1.创建线程池//2.利用次线程处理沟通//线程池一定是有限个线程个数,一定是处理短任务Task t(sock,client_ip,client_port,bind(&TcpServer::service,this,placeholders::_1,placeholders::_2,placeholders::_3));ThreadPool<Task>::getinstance()->pushtask(t);// //多线程// pthread_t tid;// ThreadData* td=new ThreadData(this,sock,client_ip,client_port);// //因为threadRoutine函数只能有一个参数,想让线程执行service函数就必须把this指针,还有其它参数传过去,这时候可以利用一个结构体;// pthread_create(&tid,nullptr,threadRoutine,td);// //多进程(父进程负责连接,子进程负责业务)// pid_t id=fork();// if(id<0)// {// //创建子进程失败;// close(sock);// cerr<<strerror(errno)<<endl;// continue;// }// else if(id==0)// {// //子进程// close(_listensock);// // if(fork>0) exit(0);//创建一个孙子进程,儿子进程直接退出。利用孤儿进程处理业务,系统自动回收资源;// service(sock,client_ip,client_port);// exit(0);//子进程执行完服务直接退出;// }// //父进程// close(sock);//子进程已经继承到sock文件描述符,关闭父进程的sock;// // //回收子进程// // int ret=waitpid(id,nullptr,0);//不获取退出码// // if(ret==id) cout<<"回收子进程"<<id<<"成功!"<<endl;}}// static void *threadRoutine(void* args)// {// //直接分离不用回收// pthread_detach(pthread_self());// ThreadData* td=static_cast<ThreadData*>(args);// td->_current->service(td->_sock,td->_client_ip,td->_client_port); // delete td;// return nullptr;// }void service(int sock,string client_ip,uint16_t client_port){string name;name+=client_ip;name+="-";name+=to_string(client_port);char buffer[1024];while (true)//为某个客户端不间断服务{int n = read(sock, buffer, sizeof(buffer) - 1);if (n > 0){buffer[n]=0;string res=_func(buffer);logMessage(Debug, "%s# %s", name.c_str(), res.c_str());write(sock,res.c_str(),res.size());}else if (n == 0){//说明对方断开连接了close(sock);//关闭文件描述符logMessage(Info, "%s quit,me too",name.c_str());break;}else{//cout << "read error:" <<strerror(errno)<< endl;logMessage(Error, "read error, %d:%s", errno, strerror(errno));break;}}}~TcpServer(){}private:uint16_t _port;int _listensock;bool _quit;func_t _func;};}
tcp_client.cc
#include <iostream>
using namespace std;
#include <sys/types.h>
#include <sys/socket.h>
#include <cstdlib>
#include <cerrno>
#include <cstring>
#include<cstdio>
#include<unistd.h>
//sockaddr_in结构体的头文件,当然也包含一些主机转网络序列的函数比如htons
#include <netinet/in.h>
#include <arpa/inet.h>#include "err.hpp"// static void* rfo(void *args)
// {
// int sock=*(static_cast<int*>(args));
// while(true)
// {
// //收
// char buffer[4096];
// struct sockaddr_in tmp;//输入型参数;
// socklen_t len=sizeof(tmp);//要初始化,不然没法修改;// //阻塞式接收
// int n=recvfrom(sock,buffer,sizeof(buffer)-1,0,(struct sockaddr*)&tmp,&len);// if(n>0)//接收服务器数据成功
// {
// buffer[n]=0;
// cout<<buffer<<endl;
// }
// }// }//当传入程序参数个数不对时,调用这个Usage函数告诉他什么是他妈的惊喜!
static void Usage(string proc)
{cout<<"Usage:\n\t"<<proc<<" serverip "<<" serverport\n"<<endl;
}// ./tcp_client serverip serverport
int main(int argc,char* argv[])
{if(argc!=3){Usage(argv[0]);exit(USAGE_ERR);}//保留输入的服务器的IP地址与端口号string serverip=argv[1];uint16_t serverport=atoi(argv[2]);//1.客户端创建套接字int sock = socket(AF_INET, SOCK_STREAM, 0);if (sock < 0){cerr << " create socket error " << strerror(errno) << endl;exit(SOCKET_ERR);}//明确server是谁struct sockaddr_in server;memset((void*)&server,0,sizeof(server));server.sin_family=AF_INET;server.sin_port=htons(serverport);//主机序列转网络序列//把字符串转成sockaddr_in结构体里的结构体inet_aton(serverip.c_str(),&(server.sin_addr));socklen_t len =sizeof(server);int cnt=5;cout<<"cnt=5"<<endl;//2.连接服务器while(connect(sock,(struct sockaddr*)&server,len)!=0){cout<<"正在重新连接中,还有"<<cnt<<"次重新连接机会"<<endl;if(cnt--<=0) break;}if(cnt<=0){cout<<"连接失败"<<endl;exit(CONNECT_ERR);}else{cout<<"连接成功"<<endl;}char buffer[1024];while(true){string message;cout<<"Enter>>>";getline(cin,message);write(sock,message.c_str(),message.size());//给服务器发数据ssize_t n=read(sock,buffer,sizeof(buffer)-1);//如果服务器没有发送数据,这里会阻塞;if(n>0){buffer[n]=0;cout<<"server echo>>>"<<buffer<<endl;//打印服务器传输来的数据;}else if(n==0){cout<<"与服务器断开了"<<endl;break;}else{cout<<"read error"<<strerror(errno)<<endl;break;}}//关闭文件描述符(虽然进程退出自动关闭)close(sock);return 0;
}
log.hpp
#pragma once#include <iostream>
#include <stdarg.h>
#include <sys/types.h>
#include <unistd.h>
#include <string>
#include<cstdio>
#include<time.h>
enum
{Debug = 0,Info,Warning,Error,Fatal,Uknown
};static std::string toLevelString(int level)
{switch (level){case Debug:return "Debug";case Info:return "Info";case Warning:return "Warning";case Error:return "Error";case Fatal:return "Fatal";default:return "Uknown";}
}static std::string gettime()
{time_t curr = time(nullptr);struct tm *tmp = localtime(&curr);char buffer[128];snprintf(buffer, sizeof(buffer), "%d-%d-%d %d:%d:%d", tmp->tm_year + 1900, tmp->tm_mon + 1, tmp->tm_mday,tmp->tm_hour, tmp->tm_min, tmp->tm_sec);return buffer;
}// 日志格式: 日志等级 时间 pid 消息体void logMessage(int level, const char *format, ...) // format是%d、%s这样的形式,也就是printf("%d %s",a,b);
{char logLeft[1024];std::string level_string = toLevelString(level);std::string curr_time = gettime();snprintf(logLeft, sizeof(logLeft), "[%s] [%s] [%d]", level_string.c_str(), curr_time.c_str(), getpid());char logRight[1024];// 可变参数va_list p; // 指向可变参数的开始处,//va_arg(p, int); // 根据类型提取参数,凭借%d这样的格式判定类型与大小。va_start(p, format); // p=const char*& formatvsnprintf(logRight, sizeof(logRight), format, p); // 向logRight缓冲区里面打印所有参数。va_end(p); // p=nullptrprintf("%s%s\n", logLeft, logRight);}
ThreadPool.hpp
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <queue>
#include <pthread.h>
#include <unistd.h>using namespace std;#include "Task.hpp"
#include "Thread.hpp"
#include "lockGuard.hpp"
#include "log.hpp"
const int N = 6;template <class T>
class ThreadPool
{public:// 单例模式static ThreadPool<T> *getinstance(){if (_instance == nullptr) // 当一个对象已经被创建以后,就不进入申请锁并且判断的环节了;{lockGuard lock(&_mutex_instance);if (_instance == nullptr){_instance = new ThreadPool<T>();logMessage(Debug, "线程池单例形成");_instance->init();_instance->start();}}return _instance;}bool isEmpty(){return _tasks.empty();}void init(){// 创建线程for (int i = 1; i <= _num; ++i)// pthread_create(&_threads[i],nullptr,ThreadRoutine,this);{_threads.push_back(Thread(i, ThreadRoutine, this));logMessage(Debug, "%d thread running", i);}}void start(){// 线程启动for (auto &e : _threads){e.run();}}void check(){for (auto &e : _threads){std::cout << "线程ID" << e.threadid() << " , " << e.threadname() << "is running··· " << std::endl;}}// 放入任务void pushtask(const T &task){lockGuard lock(&_mutex);_tasks.push(task);threadwakeup(); // 有新任务进来,唤醒线程去处理}// 拿出任务T poptask(){T t = _tasks.front();_tasks.pop();return t;}private:// 重点!!!static void *ThreadRoutine(void *args){pthread_detach(pthread_self());// 指针强转成线程池对象类型;ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);while (true){T t;{lockGuard lock(tp->getlock());// 1.判断是否有任务// 有->处理// 无->等待// 如果任务队列为空,则等待while (tp->isEmpty()){tp->threadwait();}t = tp->poptask(); // 从共有区域拿到线程独立栈上;}t(); // 调用task类里面的仿函数处理任务}}private:ThreadPool(int num = N): _num(num){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);}ThreadPool(const ThreadPool<T> &tp) = delete; // 删除默认拷贝构造void operator=(const ThreadPool<T> &tp) = delete; // 删除赋值函数pthread_mutex_t *getlock(){return &_mutex;}void threadwait(){// 挂起一个线程pthread_cond_wait(&_cond, &_mutex);}void threadwakeup(){// 唤醒一个线程pthread_cond_signal(&_cond);}~ThreadPool(){for (auto &e : _threads){e.join();}pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}private:std::vector<Thread> _threads;int _num; // 线程池里有几个线程;std::queue<T> _tasks; // 使用STL的自动扩容的特性pthread_mutex_t _mutex;pthread_cond_t _cond;static ThreadPool<T> *_instance; // 懒汉方式实现单例模式static pthread_mutex_t _mutex_instance; // 单例对象有自己的锁
};// 静态变量初始化
template <class T>
ThreadPool<T> *ThreadPool<T>::_instance = nullptr;template <class T>
pthread_mutex_t ThreadPool<T>::_mutex_instance = PTHREAD_MUTEX_INITIALIZER;
Thread.hpp
#pragma once#include <iostream>
#include <pthread.h>
#include <string>class Thread
{
public:typedef void* ( *func_t) (void*);typedef enum{NEW=0,RUNNING,EXITED}ThreadStatus;public://状态:new,running,exitedint status(){return _status;}//线程名std::string threadname(){return _name;}//线程ID(共享库中的进程地址空间的虚拟地址)pthread_t threadid(){if(_status==RUNNING)//线程已经被创建,线程id已经输入到成员变量_tid中;return _tid;else { return 0;}}public://构造函数;Thread(int num,func_t func,void* args):_tid(0),_status(NEW),_func(func),_args(args){char name[128];snprintf(name,sizeof(name),"thread-%d",num);_name=name;}//析构函数~Thread(){}//静态成员函数不能访问类内所有成员,因为没有this指针;static void* runHelper(void *args){Thread* td=(Thread*)args;(*td)();//调用仿函数执行线程的回调函数;return nullptr; }void operator()()//仿函数{//如果函数指针不为空,则执行该函数指针指向的回调函数;if(_func!=nullptr) _func(_args);}//创建线程void run(){//因为runHelper函数必须只能有一个void*参数,所以runHelper函数在类内必须定义为static,这样才没有this指针;int n=pthread_create(&_tid,nullptr,runHelper,this);if(n!=0) return exit(1);//线程创建失败,那么直接退出进程;_status=RUNNING;}//等待线程结束void join(){int n=pthread_join(_tid,nullptr);if(n!=0) {std::cerr<<"main thread join thread "<<_name<<" error "<<std::endl;return;}_status=EXITED;//线程退出;}private:pthread_t _tid;//线程ID(原生线程库中为该线程所创建的TCB起始虚拟地址)std::string _name;//线程名func_t _func;//线程要执行的回调void* _args;//线程回调函数参数ThreadStatus _status;//枚举类型:状态
};
lockGuard.hpp
#pragma once#include <pthread.h>
#include <iostream>class Mutex//成员:加锁函数和解锁函数
{
public:Mutex(pthread_mutex_t* pmutex):_pmutex(pmutex) {}void lock(){pthread_mutex_lock(_pmutex);}void unlock(){pthread_mutex_unlock(_pmutex);}~Mutex(){}private:pthread_mutex_t* _pmutex;//需要传入一个互斥量(锁)的指针;
};//对Mutex进行二次封装;
//创建该对象时自动加锁,析构时自动解锁;
class lockGuard
{
public:lockGuard(pthread_mutex_t* pmutex):_mutex(pmutex)//利用锁的指针构建Mutex对象{_mutex.lock();}~lockGuard(){_mutex.unlock();}private:Mutex _mutex;//类内创建对象
};
Task.hpp
#pragma once
#include<string>
#include<iostream>
#include<functional>
using namespace std;using cb_t=function<void(int,string,uint16_t)>;class Task
{
public:Task(){}Task(int sock,string clientip,uint16_t clientport,cb_t func):_sock(sock),_clientip(clientip),_clientport(clientport),_func(func){}int operator()(){//开始为客户端---处理任务cout<<"开始为客户端"<<_clientip<<"-"<<_clientport<<"服务"<<endl;_func(_sock,_clientip,_clientport);//实际上是一个已经绑了一个参数的TcpServer::service函数;}~Task() {}private:int _sock;string _clientip;uint16_t _clientport;cb_t _func;
};
err.hpp
#pragma onceenum{ USAGE_ERR=1,SOCKET_ERR,BIND_ERR,LISTEN_ERR,CONNECT_ERR,SETSID_ERR};