目录
一. 服务端模块实现
二. 处理聊天消息模块实现
三. 调用服务端模块实现
四. 客户端模块实现
五. 效果展示
本文介绍了如何用UDP创建一个简单的聊天室。
一. 服务端模块实现
服务端仍然沿用我们前面的思想(高内聚低耦合),因此我们用一下上一篇UDP英译汉网络词典的服务端实现(点此查看)。
#pragma once
#include <iostream>
#include <string>
#include <cerrno>
#include <cstring>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <strings.h>
#include <stdlib.h>
#include<functional>
#include "Log.hpp"
#include"InetAddr.hpp"
#include"Dict.hpp"using namespace std;enum
{SOCKET_ERROR = 1,BIND_ERROR,USAGE_ERROR
};const static int defaultfd = -1;
using func_t=function<string(const string&,bool& ok)>;class UdpServer
{
public:UdpServer(uint16_t port,func_t func): _sockfd(defaultfd), _port(port), _func(func),_isrunning(false){}void InitServer(){// 1.创建udp socket 套接字...必须要做的_sockfd = socket(AF_INET, SOCK_DGRAM, 0);if (_sockfd < 0){LOG(FATAL, "socket error,%s,%d\n", strerror(errno), errno);exit(SOCKET_ERROR);}LOG(INFO, "socket create success,sockfd: %d\n", _sockfd);// 2.1 填充sockaddr_in结构struct sockaddr_in local; // struct sockaddr_in 系统提供的数据类型,local是变量,用户栈上开辟空间bzero(&local, sizeof(local)); // 清空local.sin_family = AF_INET;local.sin_port = htons(_port); // port要经过网络传输给对面,即port先到网络,所以要将_port,从主机序列转化为网络序列local.sin_addr.s_addr=INADDR_ANY;//htonl(INADDR_ANY)// 2.2 bind sockfd和网络信息(IP(?)+Port)int n = bind(_sockfd,(struct sockaddr*)&local,sizeof(local));if(n<0){LOG(FATAL, "bind error,%s,%d\n", strerror(errno), errno);exit(BIND_ERROR);}LOG(INFO, "socket bind success\n");}void Start()//所有的服务器,本质解决的是输入输出的问题!不想让网络通信模块和业务模块进行强耦合{}~UdpServer(){}private:int _sockfd;uint16_t _port; // 服务器所用的端口号bool _isrunning;//给服务器设定回调,用来让上层进行注册业务的处理方法func_t _func;
};
首先明确的是,初始化函数InitServer是不变的,我们再来看Start函数,也是大差不差,只需改动一捏捏,我们也可以仿照以前的思路让上层去实现这个聊天的功能,那么我们就知道了,这次的服务端也需要一个回调函数,让上层进行业务处理。我们稍作修改。
using handler_message_t=......
我们先定义出来处理业务的函数类型,参数部分留待下面解析。
那么我们的TcpServer类的类成员就变成了:
class UdpServer
{
private:int _sockfd;//string _ip;//不是必须的uint16_t _port; // 服务器所用的端口号bool _isrunning;//给服务器设定回调,用来让上层进行注册业务的处理方法handler_message_t _handler_message;
};
由此来编写构造函数,以及Start函数就显得水到渠成了。
const static int defaultfd = -1;
using handler_message_t=......class UdpServer
{
public:UdpServer(uint16_t port,handler_message_t handler_message): _sockfd(defaultfd), _port(port), _handler_message(handler_message),_isrunning(false){}void Start()//所有的服务器,本质解决的是输入输出的问题!不想让网络通信模块和业务模块进行强耦合{//一直运行,直到管理者不想运行了,服务器都是死循环_isrunning=true;while(true){char message[1024];struct sockaddr_in peer;socklen_t len=sizeof(peer);//1.我们要让server先收数据ssize_t n=recvfrom(_sockfd,message,sizeof(message)-1,0,(struct sockaddr*)&peer,&len);if(n>0){message[n]=0;InetAddr addr(peer);LOG(DEBUG,"get message from [%s:%d]: %s\n",addr.Ip().c_str(),addr.Port(),message);_handler_message(_sockfd,message,addr);}}_isrunning=false;}~UdpServer(){}private:int _sockfd;//string _ip;//不是必须的uint16_t _port; // 服务器所用的端口号bool _isrunning;//给服务器设定回调,用来让上层进行注册业务的处理方法handler_message_t _handler_message;
};
那好我们下面就具体来看看该如何处理业务,以补充服务端的处理方法。
二. 处理聊天消息模块实现
大家不用猜也知道该怎么办了吧。没错,仍然封装成一个类。
来看看基本框架如何写。
class MessageRoute
{
public:MessageRoute(){pthread_mutex_init(&_mutex,nullptr);}~MessageRoute(){pthread_mutex_destroy(&_mutex);}
private:vector<InetAddr> _online_user;pthread_mutex_t _mutex;
};
我们的成员有两位,首先我们想想平时我的微信、QQ,聊天的话肯定不止一个人聊天,我不聊天但是别人的消息仍然能显示到我的屏幕。所以定义一个vector结构的数组用来装聊天成员。再定义一个锁来保护临界资源,更加安全。
第一次看的朋友,可能不知道vector里面装的InetAddr是什么,其实是我们封装的一个类。
class InetAddr
{
private:void GetAddress(string* ip,uint16_t* port){*port=ntohs(_addr.sin_port);*ip=inet_ntoa(_addr.sin_addr);}
public:InetAddr(const struct sockaddr_in &addr):_addr(addr){GetAddress(&_ip,&_port);}string Ip(){return _ip;}bool operator==(const InetAddr& addr){//if(_ip==addr._ip) 任何时刻只允许一个用户if(_ip==addr._ip && _port==addr._port)//方便测试{return true;}return false;}struct sockaddr_in Addr(){return _addr;}uint16_t Port(){return _port;}~InetAddr(){}
private:struct sockaddr_in _addr;string _ip;uint16_t _port;
};
这样封装更便于我们的操作。
当有新用户进入聊天室进行聊天的时候,我们应该将其插入到用户数组中,而当由用户退出的时候,我们同样应该及时的将其从数组中删除。
bool IsExists(const InetAddr& addr)
{for(auto a:_online_user){if(a==addr) return true;}return false;
}void AddUser(const InetAddr& addr)
{LockGuard lockguard(&_mutex);if(IsExists(addr)) return;_online_user.push_back(addr);
}void DelUser(const InetAddr& addr)
{LockGuard lockguard(&_mutex);for(auto iter=_online_user.begin();iter!=_online_user.end();iter++){if(*iter==addr){_online_user.erase(iter);break;}}
}
这里出现了一个新东西----LockGuard,这是我们按照RAII(点此查看)的思路封装的锁。
#ifndef __lock_GUARD_HPP__
#define __lock_GUARD_HPP__#include<iostream>
#include<pthread.h>class LockGuard
{
public:LockGuard(pthread_mutex_t* mutex):_mutex(mutex){pthread_mutex_lock(_mutex);//构造加锁}~LockGuard(){pthread_mutex_unlock(_mutex);}
private:pthread_mutex_t* _mutex;
};#endif
那么正式来说该如何处理消息呢?
void RouteHelper(int sockfd,string message,InetAddr who)
{LockGuard lockguard(&_mutex);//2.进行消息转发for(auto user:_online_user){string send_message="\n["+who.Ip()+":"+to_string(who.Port())+"]#"+message+"\n";struct sockaddr_in clientaddr=user.Addr();::sendto(sockfd,send_message.c_str(),send_message.size(),0,(struct sockaddr*)&clientaddr,sizeof(clientaddr));}
}void Route(int sockfd,string message,InetAddr who)
{//1.1 我们任务:用户首次发消息,还要将自己,插入到在线用户中AddUser(who);//1.2 如果客户端要退出if(message=="Q" || message=="QUIT") {DelUser(who);}//2.构建任务对象,入队列,让线程池进行转发task_t t=bind(&MessageRoute::RouteHelper,this,sockfd,message,who);ThreadPool<task_t>::GetInstance()->Enqueue(t);
}
我们来说说逻辑,处理方法就是将发来的消息通过线程池进行转发。
#pragma once//单例模式的线程池
#include<iostream>
#include<vector>
#include<queue>
#include<pthread.h>
#include"Thread.hpp"
#include"Log.hpp"
#include"LockGuard.hpp"using namespace std;
using namespace ThreadModule;const static int gdefaultthreadnum=3;template<typename T>
class ThreadPool
{
private:void LockQueue(){pthread_mutex_lock(&_mutex);}void UnLockQueue(){pthread_mutex_unlock(&_mutex);}void ThreadSleep(){pthread_cond_wait(&_cond,&_mutex);}void ThreadWakeup(){pthread_cond_signal(&_cond);}void ThreadWakeAll(){pthread_cond_broadcast(&_cond);}//私有的ThreadPool(int threadnum=gdefaultthreadnum):_threadnum(threadnum),_waitnum(0),_isrunning(false){pthread_mutex_init(&_mutex,nullptr);pthread_cond_init(&_cond,nullptr);LOG(INFO,"ThreadPool Construct()");}void Start(){for(auto& thread:_threads){thread.Start();}}void HandlerTask(string name)//类的成员方法,也可以成为另一个类的回调方法,方便我们继续类级别的互相调用{LOG(INFO,"%s is running\n",name.c_str());while(true){//1.保证队列安全LockQueue();//2.队列中不一定有数据while(_task_queue.empty() && _isrunning){_waitnum++;ThreadSleep();_waitnum--;}//2.1 如果线程池已经退出了 && 任务队列是空的if(_task_queue.empty() && !_isrunning){UnLockQueue();break;}//2.2 如果线程池不退出 && 任务队列不是空的//2.3 如果线程池已经退出 && 任务队列不是空的 --- 处理完所有的任务,然后再退出//3.一定有任务,处理任务T t=_task_queue.front();_task_queue.pop();UnLockQueue();LOG(DEBUG,"%s get a task",name.c_str());//4.处理任务,这个任务属于线程独占的任务,所以不能放在加锁和解锁之间t();//LOG(DEBUG,"%s handler a task,result is: %s",name.c_str(),t.ResultToString());}}void InitThreadPool(){//指向构建出所有的线程,并不自动for(int num=0;num<_threadnum;num++){string name="thread-"+to_string(num+1);_threads.emplace_back(bind(&ThreadPool::HandlerTask,this,placeholders::_1),name);LOG(INFO,"init thread %s done\n",name.c_str());}_isrunning=true;}//复制拷贝禁用ThreadPool<T> &operator=(const ThreadPool<T>&)=delete;ThreadPool(const ThreadPool<T> &)=delete;
public:static ThreadPool<T> *GetInstance(){//如果是多线程获取线程池对象,下面的代码就有问题,所以要加锁//双判断的方式,可以有效减少获取单例的加锁成本,而且保证线程安全if(_instance==nullptr)//只有第一次会创建对象,后续都是获取,这样就不用每次都申请锁{//保证第二次之后,所有线程,不用再加锁,直接返回_instance单例对象LockGuard lockguard(&_lock);if (_instance == nullptr){_instance = new ThreadPool<T>();_instance->InitThreadPool();_instance->Start();LOG(DEBUG, "创建线程池单例\n");return _instance;}}LOG(DEBUG, "获取线程池单例\n");return _instance;}bool Enqueue(const T& t){bool ret=false;LockQueue();if(_isrunning){ _task_queue.push(t);if(_waitnum>0){ThreadWakeup();}LOG(DEBUG,"enqueue task success\n");ret=true;}UnLockQueue();return ret;}void Stop(){LockQueue();_isrunning=false;ThreadWakeAll();UnLockQueue();}void Wait(){for(auto& thread:_threads){thread.Join();LOG(INFO,"%s is quit",thread.name().c_str());}}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}
private:int _threadnum;vector<Thread> _threads;queue<T> _task_queue;pthread_mutex_t _mutex;pthread_cond_t _cond;int _waitnum;bool _isrunning;//添加单例模式--懒汉static ThreadPool<T> *_instance;static pthread_mutex_t _lock;//保护单例的锁
};template<typename T>
ThreadPool<T> *ThreadPool<T>::_instance=nullptr;template<typename T>
pthread_mutex_t ThreadPool<T>::_lock=PTHREAD_MUTEX_INITIALIZER;
我们可以知道,Route函数就是我们之前在服务器说的上层处理函数。那么handler_message_t类型的上层处理函数的参数就很明确了。
using handler_message_t=function<void(int sockfd,const string message,const InetAddr who)>;
那么调用服务端的主函数如何写就很明确了。
此处我们还封装了原生线程库,命名文件为Thread.hpp。
//封装原生线程库#ifndef __THREAD_HPP__
#define __THREAD_HPP__#include<iostream>
#include<string>
#include<functional>
#include<unistd.h>
#include<pthread.h>using namespace std;namespace ThreadModule
{using func_t=function<void(string&)>;class Thread{public:void Excute(){_func(_threadname);}public:Thread(func_t func,const string& name="none-name"):_func(func),_threadname(name),_stop(true){}static void* threadroutine(void* args)//类成员函数,形参是有this指针的!{Thread *self=static_cast<Thread*>(args);self->Excute();return nullptr;}bool Start(){int n=pthread_create(&_tid,nullptr,threadroutine,this);if(!n){_stop=false;return true;}else {return false;}}void Detach(){if(!_stop){pthread_detach(_tid);}}void Join(){if(!_stop){pthread_join(_tid,nullptr);}}string name(){return _threadname;}void Stop(){_stop=true;}~Thread(){}private:pthread_t _tid;string _threadname;func_t _func;bool _stop;};
}#endif
三. 调用服务端模块实现
我们只需将服务端中处理业务函数初始化为处理业务模块中的Route函数,然后依次调用InitServer函数、Start函数即可。
#include<iostream>
#include<memory>
#include"UdpServer.hpp"
#include"Log.hpp"
#include"MessageRoute.hpp"
using namespace std;void Usage(string proc)
{cout<<"Usage:\n\t"<<proc<<" local_port\n"<<endl;
}// ./udpserver ip
int main(int argc,char *argv[])
{if(argc!=2){Usage(argv[0]);exit(USAGE_ERROR);}EnableScreen();//string ip=argv[1];//定义消息转发模块MessageRoute route;//网络模块uint16_t port=stoi(argv[1]);unique_ptr<UdpServer> usvr=make_unique<UdpServer>(port,\bind(&MessageRoute::Route,&route,placeholders::_1,\placeholders::_2,placeholders::_3));//C++14usvr->InitServer();usvr->Start();return 0;
}
MessageRoute.hpp文件即我们的处理聊天消息模块。
四. 客户端模块实现
此处虽说大体还是发送消息,并接收服务器发送回来的消息。
但是与众不同的是:此处发送消息和接收服务器发送回来的消息应该是两个不同的线程。因为要做到不发消息的时候还是能接收到消息。
#include<iostream>
#include<string>
#include<cstdio>
#include<cstring>
#include<unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include"Thread.hpp"
#include"Comm.hpp"using namespace std;
using namespace ThreadModule;void recvmessage(int sockfd,string name)
{//version 1int fd=OpenDev("/dev/pts/0",O_WRONLY);while(true){struct sockaddr_in peer;socklen_t len=sizeof(peer);char buffer[1024];ssize_t n=recvfrom(sockfd,buffer,sizeof(buffer)-1,0,(struct sockaddr*)&peer,&len);if(n>0){buffer[n]=0;write(fd,buffer,strlen(buffer));}}//version 2// while(true)// {// struct sockaddr_in peer;// socklen_t len=sizeof(peer);// char buffer[1024];// ssize_t n=recvfrom(sockfd,buffer,sizeof(buffer)-1,0,(struct sockaddr*)&peer,&len);// if(n>0)// {// buffer[n]=0;// fprintf(stderr,"%s | %s\n",name.c_str(),buffer);// //此时运行指令变为./udpclient + ip + port + 2>/dev/pts/2// }// }
}void sendmessage(int sockfd,struct sockaddr_in& server,string name)
{string message;while(true){printf("%s | Enter# ",name.c_str());fflush(stdout);getline(cin,message);sendto(sockfd,message.c_str(),message.size(),0,(struct sockaddr*)&server,sizeof(server));}
}void Usage(string proc)
{cout<<"Usage:\n\t"<<proc<<" serverip serverport\n"<<endl;
}int InitClient(string& serverip,uint16_t serverport,struct sockaddr_in *server)
{//1.创建socketint sockfd=socket(AF_INET,SOCK_DGRAM,0);if(sockfd<0){cerr<<"socket error"<<endl;return -1;}//2.client一定要bind,client也有自己的ip和port,但是不建议显示(和server一样用bind函数)bind//a.那如何bind呢?当udp client首次发送数据的时候,os会自动随机的给client进行bind--为什么?要bind,必然要和port关联!防止client port冲突//b.什么时候bind?首次发送数据的时候//构建目标主机的socket信息memset(server,0,sizeof(struct sockaddr_in));server->sin_family=AF_INET;server->sin_port=htons(serverport);server->sin_addr.s_addr=inet_addr(serverip.c_str());return sockfd;
}// ./udpclient serverip serverport
int main(int argc,char *argv[])
{if(argc!=3){Usage(argv[0]);exit(1);}string serverip=argv[1];uint16_t serverport=stoi(argv[2]);struct sockaddr_in serveraddr;int sockfd=InitClient(serverip,serverport,&serveraddr);if(sockfd==-1) return 1;func_t r=bind(&recvmessage,sockfd,placeholders::_1);func_t s=bind(&sendmessage,sockfd,serveraddr,placeholders::_1);//创建两个线程,分别用来接收消息和发消息,使其两个互不受影响Thread Recver(r,"recver");//recver在前面,还是sender在前面,都行Thread Sender(s,"sender");Sender.Start();Recver.Start();Recver.Join();Sender.Join();return 0;
}
同样用的是自己封装的线程。
值得注意的是这里接收消息模块有两个版本。此处的终端文件(/dev/pts)可以根据自己实际情况修改。
五. 效果展示
分别来看看两个版本都是怎么样的吧。
version 1:
version 2:
总结:
好了,到这里今天的知识就讲完了,大家有错误一点要在评论指出,我怕我一人搁这瞎bb,没人告诉我错误就寄了。
祝大家越来越好,不用关注我(疯狂暗示)