ACE Acceptor-Connector框架
该框架实现 Acceptor-Connector 模式,该模式解除了“网络化应用中的协作对端服务的连接和初始化”与“连接和初始化之后它们所执行的处理”的耦合。Acceptor-Connector 框架允许成用独立于它们所提供的服务来配置其连接布局的关键属性。我们将阐释怎样将该框架与ACE Reactor和Task框架结合在一起,并应用于增强我们的网络化日志服务的可复用性、可扩展性、安全性以及可伸缩性。ACE Acceptor-Connector 框架的主要类如下所示:
ACE Acceptor-Connector框架主要扮演了以下角色:
事件基础设施类 执行一般的、不依赖于应用的事件分派策略。
连接管理层类 执行不依赖于应用的连接和初始化服务。这些类包括ACE_Svc_Handler、ACE_Acceptor,以及ACE_Connector.
应用层类 通过继承和/或模板实例化来定制由其他两层执行的:般策略,从而创建对象来建立连接、交换数据以及执行服务特有的处理。在ACE Acceptor-Connector 框架中,应用层类是上而概述的连接管理层类的后代和模板实例化。
ACE Acceptor-Connector框架主要提供了以下好处:
可复用性和可扩展性 面向连接的网络化心用常常含有相当数量的用于建立连接和初始化服务的低级代码。这些代码在很大程度上独立于已连接的服务处理器车它们所交换的数据执行的处理。ACE Acceptor-Connector框架将这些低级代码重构为可复用、不依赖于应用的接受器和连接器类,这些类知道怎样建立连接、并在连接建立后初始化与其相关联的服务处理器。这样,服务处理器可以完全专注于它们的应用所定义的处理。这样的事务分离使得开发者能更轻松地增加新的应用服务类型、服务实现、认证协议以及通信协议,而不会影响己有的连接建了和服务初始化配置代码。
可移植性 框架中的类模板都是通过建立连接和传输数据所需的IPC机制的类型来参数化的。在开发必须跨越多种 OS平台、高效而可移植地运行的用时,这样的基于模板的可扩展性所提供的灵活性十分有月。Acceptor-Connector 模板的IPC类参数可以是任何遵循模板所期望的接口网络编程类。例如,取决于底层OS平台的某些特定属性(比如它是UNIX的BSD 变种还是System V 变种),服务器可以实例化框架类,让它们使用 ACE Scoket wrapper facade 或 ACE TLI wrapper facade。对于非TCP/IP 应用,可以使用 ACE 的许多IPC wrapper facade 或是任何输出(Expon)了必需接口的、自行开发的类来对模板进行实例化。
健壮性 通过彻底地解除服务处理器与其接受器的耦合,该框架可以确保被动模式的传输端点不会在无意中被用于读写数据。这一额外的类型安全性消除了在通过弱类型化的网络编程接口(比如用C定义的Socket或TLIAPI)进行编程时,可能出现的一系列微妙而有害的错误。
高效性 在长延迟的广域网上,Acceptor-Connector 模式可以主动、异步、高效地与大量对端建立连接。在这些情况下,并步性十分重要,因为大型网络化用可能会自数百或数干对端要连接。
ACE_Svc_Handler类
ACE_Svc_Handler类是ACE 的同步和反应式数据传输及服务处理机制的基础。这个类提供了以下能力:
上述图片展示了ACE_Svc_Handler类的主要接口,其初始化主要通过以下类来进行参数化的:
PEER_STREAM traits类,这个类能够在已连接的对端服务处理器之间传输数据。它还定义了相关联的PEER_STREAM::PEER_ADDR trait,用于表示向服务标识对端的地址类PEER_STREAM 参数常常通过各个ACEIPCwrapperfacade中的一个来进行参数化,比如说ACE_SOCK_Stream
SYNCH_STRATEGY traits类,这个类应用了 Strategized Locking模式来参数化ACE_Task父类中的 ACE_Message_Queue 的同步 traits。这个参数常常通过 ACE_NULL_SYNCH 或ACE_MT_SYNCH traits类来进行实例化。
因为ACE_Svc_Handler继承于ACE_Event_Handler类,它的实例可以定义各种类型事件而登记到ACE Reactor框架。例如,它可以登记处理READ和 WRITE 事件。随后,在其数据模式的 socker句柄准备好接收或发送数据时,它的 handle_input()和handle_output()挂钩方法将分别被反应器自动分配。主要将其方法分为以下三个范畴:
1、服务的创建和激活方法 ACE Acceptor-Connector框架可以在编译或运行时修改服务处理器创建和初始化各个方面。在缺省的情况下,ACE_Svc_Handler子类对象是由接收器或连接工厂动态分配的,方法主要如下:
也可以将指向 ACE_Thread_Manager、ACE_Message_Queue和 ACE_Rcactor 对象的指针传给ACE_Svc_Handler 构造器,以替换其缺省所用的对象。open()挂钩方法可以进行初始化服务处理器的活动,比如:
派生一个线程(或一池线程),通过svc()挂钩方法执行服务处理。
向反应器登记一个或多个事件源,比如输入事件或超时。
打开日志文件并初始化使用统计
初始化锁或其他资源
template <typename PEER_STREAM, typename SYNCH_TRAITS> int//ACE_Svc_Handler对象的默认行为是向ACE_Reactor注册(从而确保单线程)。
//传入的参数是 创建服务处理器的接收器或连接器工厂 的指针
ACE_Svc_Handler<PEER_STREAM, SYNCH_TRAITS>::open (void * factory)
{//激活自身,成为主动对象 并发处理到来的事务if (this->reactor ()&& this->reactor ()->register_handler //注册对象(this,ACE_Event_Handler::READ_MASK) == -1)ACELIB_ERROR_RETURN ((LM_ERROR,ACE_TEXT ("%p\n"),ACE_TEXT ("unable to register client handler")),-1);return 0;
}
2、服务处理方法 服务处理器可以通过若干方式来执行其处理。例如,它可以用反应器来反应式地处理事件,或是通过一个或多个进程来并发地处理它们。下列继承自ACE_Svc Handler 的祖先的方法可被其子类取新定义,并用于执行服务处理器处理:
3、服务关闭方法 .可以通过许多方式来使用服务处理器。例如,它可以由反应器分派、运行在它自己的线程或进程中,或是构成线程池的部分。因此,ACE_Svc_Handler 类提供了以下方法来关闭服务处理器:
服务处理器常常足依照应用定义的协议被关闭的,比如在对端服务处理器关闭连接,或是发生严亚的通信错误时。在上表中列出的关闭方法可被划分进以下3个范畴中:
直接关闭 用可以直接调用destroy()来关闭服务处理器。该方法执行以下步骤:
1、从反应器那里移除处埋器。
2、取消任何与此处理器相关联的定时器。
3、关闭对端流对象,以避免发生付柄泄漏。
4、如果对象是被动态分配的,将其删除,以避免发生内存泄漏。
反应式关闭 如果某个ACE_Svc_Handler向ACE Reactor 框架作了登记,它常常会检测到对端应用心经关闭了连接,并在本地发起关闭活动。当被指示从其内部表中移除某个处理器时(通常服务处理器的 handle_input()方法在对端关闭连接之后返问-1时),反应器会调川该处理器的handle_close()。反应式处理器成该将关闭活动合片进 handle_close()方法中。如下图所示,缺省的 handle_close()方法会调用 destroy()方法,如果 handle_close()的缺省行为不合需要的话,可以在子类中重新定义它。
线程关闭 服务处理器的close()挂钩方法会在任务的svc()方法返同时在任务的各个线程中被调用。反应式服务使用了反应器关闭机制来发起关闭活动,而处理对端连接的主动线程可以在对端关闭连接时简单地返问。因为由单个线程执行服务是种常见的使用情况,缺省的 ACE_Svc_Handler::close()挂钩方法实现会调用上面描述的 handlc_close()方法。如果该方法的缺省行为不合需要的话,可以在子类中重新定义它,以执行应用特有的清理代码。
下面将给出一个实例阐释怎样使用 ACE_Svc_Handler类来实现一个基于tread-per-connection并发模型的日志服务器。示例代码:TPC_Logging_Server.cpp 和TPC_Logging_Server.h文件中。这个头文件声明示例类;它是从包括必需的头文件开始的:
#include "ace/Acceptor.h"
#include "ace/INET_Addr.h"
#include "ace/Reactor.h"
#include "ace/Svc_Handler.h"
#include "ace/FILE_IO.h"
#include "Logging_Handler.h"
下面所示的TPC_Logging_Handler继承于ACE_Svc_Handler,TPC_Logging_Handler的各个实例是由TPC_Logging_Acceptor在连接请求从对端连接器到达时动态分配的。TPC_Logging_Handler重新定义了ACE_Svc_Handler::open()挂钩方法来对处理器进行初始化。如下所示:
class TPC_Logging_Handler//使用ACE_SOCK_STREAM 数据传输类 及 ACE_NULL_SYNCH traint类来参数化模块: public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>
{
protected:ACE_FILE_IO log_file_; // File of log records.// Connection to peer service handler.Logging_Handler logging_handler_;public:TPC_Logging_Handler () : logging_handler_ (log_file_) {}virtual int open (void *)
{//根据连接的主机名设置日志名字static const ACE_TCHAR LOGFILE_SUFFIX[] = ACE_TEXT (".log");ACE_TCHAR filename[MAXHOSTNAMELEN + sizeof (LOGFILE_SUFFIX)];ACE_INET_Addr logging_peer_addr;peer ().get_remote_addr (logging_peer_addr);logging_peer_addr.get_host_name (filename, MAXHOSTNAMELEN);ACE_OS::strcat (filename, LOGFILE_SUFFIX);//创建或打开客户端日志文件ACE_FILE_Connector connector;connector.connect (log_file_,ACE_FILE_Addr (filename),0, // No timeout.ACE_Addr::sap_any, // Ignored.0, // Don't try to reuse the addr.O_RDWR | O_CREAT | O_APPEND,ACE_DEFAULT_FILE_PERMS);//借用服务处理器的socket句柄 将其赋值给logging_handler_//后者用于接收和处理客户日志服务logging_handler_.peer ().set_handle (peer ().get_handle ());//将TPC_Logging_Handler转换为主动对象,新派生的DETACHED线程运行下面的svc方法return activate (THR_NEW_LWP | THR_DETACHED);
}//专注于读取和处理客户日志记录virtual int svc () {for (;;)switch (logging_handler_.log_record ()) {case -1: return -1; // Error.case 0: return 0; // Client closed connection.default: continue; // Default case.}/* NOTREACHED */return 0;}
};
TPC_Logging_Handler::svc()对于服务器因为某种原因被要求在对端关闭的情况,没有提供线程处理的方法,用于关闭已阻塞线程的一些常用技术如下:
ACE_Acceptor类
许多面向连接的服务器应用将它们的连接建立与服务初始化代码紧密地耦合在一起,其耦合力式致使开发者难以复用现有的代码。例如,如果你检查Logging_Acceptor、Logging_Acceptor_Ex、Logging_Accepor_WFMO、CLD_Acceptor,以及TP_Logging_Acceptor类,你会看到即使各个日志处理器的handle_input()方法的结构和行为几乎是一样的,它仍然在被一次次地重写。ACE Acceptor-Connector架定义了ACE_Acceptor类,以使应用开发者无需再反复地重写这些代码。
ACE_Acceptor是一个工厂,它实现了Acceptor-Connector模式中的Accetpor角色,其功能主要如下:
ACE_Acceptor类有着灵活的接口,方法主要分为以下两个范畴:
1、接收器初始化、析构以及访问器方法 下表方法展示了初始化和销毁ACE_Acceptor对象:
下面给出ACE_Acceptor::open的代码:
template <typename SVC_HANDLER, typename PEER_ACCEPTOR>
ACE_Acceptor<SVC_HANDLER, PEER_ACCEPTOR>::open(const typename PEER_ACCEPTOR::PEER_ADDR &local_addr, //IPC寻址类的类型ACE_Reactor *reactor,int flags,int use_select,int reuse_addr)
{ACE_TRACE ("ACE_Acceptor<SVC_HANDLER, PEER_ACCEPTOR>::open");this->flags_ = flags;this->use_select_ = use_select;this->reuse_addr_ = reuse_addr;this->peer_acceptor_addr_ = local_addr;// Must supply a valid Reactor to Acceptor::open()...if (reactor == 0){errno = EINVAL;return -1;}// Open the underlying PEER_ACCEPTOR.if (this->peer_acceptor_.open (local_addr, reuse_addr) == -1)return -1;// Set the peer acceptor's handle into non-blocking mode. This is a// safe-guard against the race condition that can otherwise occur// between the time when <select> indicates that a passive-mode// socket handle is "ready" and when we call <accept>. During this// interval, the client can shutdown the connection, in which case,// the <accept> call can hang!(void) this->peer_acceptor_.enable (ACE_NONBLOCK);int const result = reactor->register_handler (this,ACE_Event_Handler::ACCEPT_MASK);if (result != -1)this->reactor (reactor);elsethis->peer_acceptor_.close ();return result;
}
连接建立和服务处理器初始化方法 下列 ACE_Acceptor 方法可被用于被动地建连接,以及初始化与其相关联的服务处理器:
上图显示了ACE_Acceptor在其handle_input()模块方法中所执行的缺省步骤:
1、它调用 make_svc_handler()工厂方法来动态地创建服务处理器。
2、它调用 accept_svc_handler()挂钩方法来接受连接,并将其存储在服务处理器中。
3、它调用 activate svc_handler()挂钩方法,让服务处理器完成对自己的初始化。
ACE_Acceptor::handler_input()使用了模板模式来应用上面的步骤行为,主要的变化点如下:
1、服务处理器创建 ACE_Acceptor::handler_input()模板方法调用make_svc_handler工厂来创建新的服务处理器。实现如下所示:
//使用SVC_HANDLER PEER_ACCEPTOR来初始化模块
template <typename SVC_HANDLER, typename PEER_ACCEPTOR> int
ACE_Acceptor<SVC_HANDLER, PEER_ACCEPTOR>::make_svc_handler (SVC_HANDLER *&sh)
{ACE_TRACE ("ACE_Acceptor<SVC_HANDLER, PEER_ACCEPTOR>::make_svc_handler");if (sh == 0)//动态创建SVC_HANDLER的一个实例,在ACE_Acceptor::handler_input()中检测到错误//SVC_HANDLER必须关闭ACE_NEW_RETURN (sh,SVC_HANDLER,-1);// 将新创建的服务处理器的反应器设置为与接受器相关联的反应器sh->reactor (this->reactor ());return 0;
}
子类也可以重新定义make_svc_handle(),以它们所喜欢的所喜欢的任何方式创建服务处理器:
基于某些条件创建服务处理器,比如可用的CPU的数目、所存储的配置参数、计算出的历史负载均量,或是当前的主机工作负我。
总是返回一个单体服务处理器,或是
通过使用之前描述的ACE_Service_Confg或ACE_DLL类,从 DLL中动态链接处埋器
2、连接建立 ACE_Acceptor::handler_input()模板方法调用accpet_svc_handler挂钩方法来被动接收来自对端的新连接,缺省方法是将其交给PEER_ACCEPTOE::accept()方法。实现如下所示:
template <typename SVC_HANDLER, typename PEER_ACCEPTOR> int
ACE_Acceptor<SVC_HANDLER, PEER_ACCEPTOR>::accept_svc_handler(SVC_HANDLER *svc_handler)
{ACE_TRACE ("ACE_Acceptor<SVC_HANDLER, PEER_ACCEPTOR>::accept_svc_handler");//试着找出我们正在使用的reactor的实现是否需要我们为新创建的句柄重置事件关联。//这是因为新创建的句柄将继承侦听句柄的属性,包括其事件关联。ACE_Reactor *reactor = this->reactor ();bool reset_new_handle;if (reactor){reset_new_handle = reactor->uses_event_associations ();}else{// Acceptor is closed, so reject this callerrno = EINVAL;return -1;}if (this->acceptor ().accept (svc_handler->peer (), // stream0, // remote address0, // timeouttrue, // restartreset_new_handle // reset new handler) == -1){// 确保保留errno,以防svc_handler close()方法重置它ACE_Errno_Guard error(errno);// 关闭处理程序以避免内存泄漏.svc_handler->close (CLOSE_DURING_NEW_CONNECTION);return -1;}elsereturn 0;
}
子类可以重新定义 accept_svc_handler(),以在连接被接受之前或之后,但是是在其能被使用之前,增加所需的额外处理。例如,该方法可以在激活服务之前对新连接进行认证。如果activate_svc_handler()是通过反应器回调被调用的,应用的整个事件分派循环可能会不可接受地长时间阻塞。
3、服务处理器激活 ACE_Acceptor::handle_input()模板方法调用 activate_svc_handler()桂钩才法在新服务被创建以及它所代表的新连接被接受之后将服务激活。该方法的缺省行为如下所示:
template <typename SVC_HANDLER, typename PEER_ACCEPTOR> int
ACE_Acceptor<SVC_HANDLER, PEER_ACCEPTOR>::activate_svc_handler(SVC_HANDLER *svc_handler)
{ACE_TRACE ("ACE_Acceptor<SVC_HANDLER, PEER_ACCEPTOR>::activate_svc_handler");int result = 0;// 查看是否应在<svc_handler>的peer上启用非阻塞I/O// 如果为 非阻塞IO 则启用非阻塞ID 则为阻塞if (ACE_BIT_ENABLED (this->flags_,ACE_NONBLOCK)){if (svc_handler->peer ().enable (ACE_NONBLOCK) == -1)result = -1;}// 否则,请确保默认情况下已禁用else if (svc_handler->peer ().disable (ACE_NONBLOCK) == -1)result = -1;//调用open()挂钩方法来激活处理器 激活失败则调用closeif (result == 0 && svc_handler->open ((void *) this) == -1)result = -1;if (result == -1)// 已建立连接;所以这个关闭是一个“正常”的关闭操作svc_handler->close (NORMAL_CLOSE_OPERATION);return result;
}
下面将介绍一个具体的实例,使用了通过ACE_SOCK_Acceptor实例化的ACE_Acceptor在被动模式的 TCP socket: 句柄上进行侦听。
如图7.6所示,主线程使用了反应器等待来自客户的新连接请求。当连接到达时,接受器使用OpenSSL认证协议确保客户日志daemon 有权连接到服务器,如果客户是合法的,接受器动态地创建TCP_Logging_Handler来处理连接,TPC_Logging_Handler::open()方法派生一个线程来处理客户在连接上所发送的日志记录。
因为大部分代码都复用自ACE Acceptor-Connector框架和 OpenSSL库,这个示例主要是护展实例化和使用了现有的各种能力。为了定义其认证协议的服务器端,我们对ACE_Acceptor 进行子类化,并重新定义了它的 open()方法和 acccpt_svc_handler()挂钩方法。TPC_Logging_Acceptor 类和它的proxtected 数据成员被声明如下:
class TPC_Logging_Acceptor: public ACE_Acceptor<TPC_Logging_Handler, ACE_SOCK_ACCEPTOR> {
protected:// The SSL ``context'' data structure.SSL_CTX *ssl_ctx_;// 与经过身份验证的SSL连接相对应的SSL数据结构.SSL *ssl_;public:typedef ACE_Acceptor<TPC_Logging_Handler, ACE_SOCK_ACCEPTOR>PARENT;typedef ACE_SOCK_Acceptor::PEER_ADDR PEER_ADDR;// Constructor.// 发出OPenSSLAPI调用后,将ssl_ctx_、ssl_作为成员传递TPC_Logging_Acceptor (ACE_Reactor *r): PARENT (r), ssl_ctx_ (0), ssl_ (0) {}// Destructor frees the SSL resources.virtual ~TPC_Logging_Acceptor () {SSL_free (this->ssl_);SSL_CTX_free (this->ssl_ctx_);}// Initialize the acceptor instance.// 初始化acceptor 实例virtual int open(const ACE_SOCK_Acceptor::PEER_ADDR &local_addr,ACE_Reactor *reactor = ACE_Reactor::instance (),int flags = 0, int use_select = 1, int reuse_addr = 1);virtual int handle_close(ACE_HANDLE = ACE_INVALID_HANDLE,ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK);// 连接建立和身份验证挂钩方法virtual int accept_svc_handler (TPC_Logging_Handler *sh);
}
为了防止对付潜在的危险,使用OPenSSL认证和加密协议进行保护。如下所示, TPC_Logging_Acceptor::open使用基类来初始化自身,并建立了服务器的身份:
#include "ace/OS_NS_string.h"
#include "ace/FILE_Addr.h"
#include "ace/FILE_Connector.h"
#include "ace/os_include/os_netdb.h"
#include "Reactor_Logging_Server_Adapter.h"
#include "TPC_Logging_Server.h"
#include "TPCLS_export.h"//因为没有用户界面 假定它的服务器证书和附随的密钥位于一组缺省的文件中
//应用可以通过替换缺省的宏定义来替换缺省的文件名
// TPC_CERTIFICATE_FILENAME 认证证书文件名 TPC_KEY_FILENAME 密钥文件名
#if !defined (TPC_CERTIFICATE_FILENAME)
# define TPC_CERTIFICATE_FILENAME "tpc-cert.pem"
#endif /* !TPC_CERTIFICATE_FILENAME */
#if !defined (TPC_KEY_FILENAME)
# define TPC_KEY_FILENAME "tpc-key.pem"
#endif /* !TPC_KEY_FILENAME */int TPC_Logging_Acceptor::open(const ACE_SOCK_Acceptor::PEER_ADDR &local_addr,ACE_Reactor *reactor,int flags, int use_select, int reuse_addr) {//调用缺省的open来初始化ACE_Acceptif (PARENT::open (local_addr, reactor, flags,use_select, reuse_addr) != 0)return -1;//初始化OpenSSL库OpenSSL_add_ssl_algorithms ();//设置SLL版本3连接,并创建于要认证的连接相应的SSL结构ssl_ctx_ = SSL_CTX_new (SSLv23_server_method ());if (ssl_ctx_ == 0) return -1;//设置用于在建立连接标识服务器的证书和附随的私钥,并随机认证与正确的证书是匹配的//这些代码和证书是以pem格式编码放在指定文件中的 if (SSL_CTX_use_certificate_file (ssl_ctx_,TPC_CERTIFICATE_FILENAME,SSL_FILETYPE_PEM) <= 0|| SSL_CTX_use_PrivateKey_file (ssl_ctx_,TPC_KEY_FILENAME,SSL_FILETYPE_PEM) <= 0|| !SSL_CTX_check_private_key (ssl_ctx_))return -1;//设置一个新的SSL结构。在通过OpenSSL API建立的SSL连接时,该数据结构//被用于TPC_Logging_Acceptor::accept_svc_handler的挂钩方法ssl_ = SSL_new (ssl_ctx_);return ssl_ == 0 ? -1 : 0;
}
上述代码最后两行设置一个新的SSL结构。在通过OpenSSL API建立的SSL连接时,该数据结构被用于TPC_Logging_Acceptor::accept_svc_handler()的挂钩方法中,如下所示:
int TPC_Logging_Acceptor::accept_svc_handler(TPC_Logging_Handler *sh) {//使用缺省的accept_svc_handler 实现TCP连接if (PARENT::accept_svc_handler (sh) == -1) return -1;//复位SSL数据结构 以用于新的SSL连接 SSL_clear (ssl_); // Reset for new SSL connection.SSL_set_fd (ssl_, sh->get_handle ());//配置SSL结构,以在接收SSL连接时强制实施客户认证 SSL_set_verify(ssl_,SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT,0);//进行实际的SSL连接,如果连接失败 SSL_accept 也失败if (SSL_accept (ssl_) == -1|| SSL_shutdown (ssl_) == -1) return -1;//认证成功则关闭SSL连接 当前还没进行加密return 0;
}
通过重新定义 open()和 accept_svc_handler()挂钩方法、给服务器日志 daemon增加了认证,而没有影响其实现的任何其他部分。这样的可扩展性说明了ACE_Acceptor 类设计中所使用的模板模式的威力。
当实例通过ACE Service Configurator框架被关闭时,Reactor_Logging_Server_Adapter::fini()最后将调用下面的handle_close。
int TPC_Logging_Acceptor::handle_close (ACE_HANDLE h,ACE_Reactor_Mask mask) {PARENT::handle_close (h, mask);delete this;return 0;
}
该方法调用 ACE_Acceptor::handle_close()来关闭监听接受器 socket,并解除其在反应器框架上的登记。为了避免内存泄漏,该方法随即删除this对象,该对象是在服务被初始化时动态分配的。 最后创建TPC_Logging_Acceptor类型定义:
typedef Reactor_Logging_Server_Adapter<TPC_Logging_Acceptor>TPC_Logging_Server;ACE_FACTORY_DEFINE (TPCLS, TPC_Logging_Server)
使用ACE_FACTORY_DEFINE 宏来自动生成_make_TPC_Logging_Server工厂函数,该函数位于下列的svc.conf文件中
dynamic TPC_Logging_Server Service_Object *
TPCLS:_make_TPC_Logging_Server() "$TPC_LOGGING_SERVER_PORT"
这个文件指示 ACE Service Configurator框架通过以下步骤来配置thread-per-connection日志服务器:
1、它动态地将 TPCLS DLL链接进进程的地址空间中。
2、它使用 ACE_DLL类来从 TPCLS DLL符号衣中提取_make_TPC_Logging_Server()工厂函数3、调用该函数,动态一个TPC_Logging_Server,并返回指问它的指针。
4、ACE Service Configurator 框架随即通过该指针调用TPC_Logging_Server::init(),将展开的TPC_LOGGING_SERVER_PORT 环境变量作为其 argc/argv参数传递,这个环境变量指定的足日志服务器用于侦听客户连接请求的端口号。最终端口号被向下传给Reactor_Logging_Server 构造器。
5、如果 init()成功,TPC_Logging_Server指针被存储在 ACE_Service_Repository 中,在“TPC_Logging_Server”名下。
我们已不再需要为先前的日志服务器示例手工编写的各种*Logging_Acceptor*类。它们的目的被继承目 ACE_Acceptor的TPC_Logging_Acceptor所涵括。传给 ACE_Acceptor 基类的第一个模板参数是承自ACE_Svc_Handler的TPC_Logging_Handler。
使用ACE_Acceptor能够减免大部分代码,但也需要写如下步骤:
1、侦听并接受连接。
2、创建并激活新的服务处理器
3、我们的 tread-per-connection日志服务器还复用了一些来自早先的解决方案的类,这些类提供了以下能力: Reactor_Logging_Server_Adapter 模板,初始化网络侦听者地址。
4、动态地配置日志服务器,并在main()函数中运行ACE Reactor的事件循环。
ACE_Connect类
ACE_Acceptor聚焦于怎样解除“服务处理器的功能”与“被动地连接和初始化它们所需的各步骤”的耦合。解除“服务处理器的功能”与“主功地连接和初始化它们所需的各步骤”的耦合也同样有限。ACE Acceptor-Connector 框架定义了ACE_Connector 类.
ACE_Connector是一个工厂类,它实现了Acceptor-Connector 框架中的Connector角色,这个类提供了以下能力:
其功能则主要分为以下两个范畴:
1、连接器初始化、析构方法以及访问器方法 下列方法被用于初始化、销毁和访问ACE_Connector类
我们可以传给 ACE_Connector的构造器和 open()方法一个标志,指示应该将连接器所初始化的服务处理器的IPC端点设置为阻塞模式(缺省)还是非阻塞模式(ACENONBLOCK)。还可以将与连接器相关联的反应器传给这些方法。全缺省情况下,它们使用的是单体ACE Reactor,就像ACE_Svc_Handler和ACE_Acceptor -样。
ACE_Connector 对象会在被销毁时或是其close()力法被显式调用时闭。ACE_Connector 不会为同步连接分配资源,所以如果它只被同步地使用的话,就没有什么要清理的。但是,如果是异步连接,这些方法就会释放连接器所分配的资源,这些资源被用于跟踪在连接器被关闭时还未完成的待处理连接。通过调用其close()挂钩方法,各个仍处在未连接状态的服务处理器也会被关闭。
2、建立连接和服务器初始化方法 下面的ACE_Connector方法可用于主动连接,并对与其相关的服务处理器进行初始化:
网络化应用使用connect()模板方法来主动地发起连接尝试,而不管其完成会被同步处理,还足被异步处理。该方法使用了以下步骤来连接和初始化新的服务处理器:
1、它获取一个服务处理器,或是通过使用调用者传入的处理器,或是通过调make_svc_handler)上厂方法。
2、它随即调用 connect_svc_handler()方法来发起连接。被动连接建立通常是即时的,而主动连接建立对能需要更长时间,特别是在广域网上。因此,可以指示ACE_Connector使用 ACE Reactor框架来在连接完成时(无论成功与否)异步地对其进行处理。
3、对于同步连接,connect()调用 activate_svc_handler()来让服务处理器完成对其自身的初始化。对于异步连接,反应器调用 ACE_Connector::handle_output()在连接完成之后完成服务处理器初始化。上图说明了异步情况下的各个步骤。
ACE_Connector::connect ()采用了模板模式,因此上述的几个步骤都是可以自定义的。
virtual int connect (SVC_HANDLER *&svc_handler,const typename PEER_CONNECTOR::PEER_ADDR &remote_addr,const ACE_Synch_Options &synch_options = ACE_Synch_Options::defaults,const typename PEER_CONNECTOR::PEER_ADDR &local_addr= reinterpret_cast<const peer_addr_type &>(peer_addr_type::sap_any),int reuse_addr = 0,int flags = O_RDWR,int perms = 0);
传给 ACE_Conneetor::connect()方法的是以下参数:
对指向 SVC_HANDLER的指针的引用。如果指针为NULL,make_svc_handler()工厂方法就会被调用,以获取“个服务处理器。make_svc_handler()的缺省版本会动态地分配一个个服务处理器。
地址参数,其型构通过C++ traits来匹配对端连接器和流类型。该地址指定对端用于进行连接的端点。例如,对于使用ACE_SOCK Stream的服务处理器,该地址可以是一个ACE INET Addr,其中含自用于服务处理器的端口号和IP地址。对于其他的IP机制,它可以是用于“通过名字服务来定位某项服务”的名称或是端口映射服务器的主机名,这可出应用设计者来决定。
对 ACE_Sync_Options 对象的引用,它合并了用于确定 ACE_Connector 的行为的各选项的值。如果成功了ACE_Sync_Options则会忽略,否则,则使用该参数改变完成以下处理:
1、是否使用ACE Reactor 框架来检测连接完成。
2、为连接完成进行多久的等待。
下表基于ACE_Sync_Options的值,总结了在连接请求没有立即完成的情况下,ACE_Connector的行为:
不管连接是怎样建立的,make_svc_handler()、connect_svc_handler(),以及 activate_svc_handler()方法的任何或全部行为都可由子类重新定义。这一可扩展的Template Method 模式设计允许开发者进行广泛的行为变更和定制来支持许多使用情况。接下来描述ACE_Connector::connect()中的3 个主要变化点。
1、获取服务处理器 因为接受器常常由出自反应器的upcall驱动,它的 make_svc_handler()工厂方法通常会创建新的服务处理器。与此相反,接受器可以选择以下面两种方式中的一种来连接和初始化服务处理器:
由调用者创建 在这种方式下调用者将指向已存在的服务处理器的指针传给连接器的connect()方法。
由连接器创建 在这种方式下调用各将NULL服务处理器指针传给连接器的connect()方法,从而指示 make_svc_handler()工厂方法获取指问服务处理器的指针(甚或是将此活动定全地延缓至connect_svc_handler()方法启用连接缓存之时)。
ACE Connector::make_svc_handler()的缺省实现处理了这两种情况,如下所示:
template <typename SVC_HANDLER, typename PEER_CONNECTOR> int
ACE_Connector<SVC_HANDLER, PEER_CONNECTOR>::make_svc_handler (SVC_HANDLER *&sh)
{ACE_TRACE ("ACE_Connector<SVC_HANDLER, PEER_CONNECTOR>::make_svc_handler");//仅当为NULL的时候才分配新的服务处理器if (sh == 0)ACE_NEW_RETURN (sh,SVC_HANDLER,-1);// 将新创建的<SVC_HANDLER>的反应器设置为该<Connector>正在使用的反应器。sh->reactor (this->reactor ());return 0;
}
2、建立连接 ACE_Connector:connect()模板方法调用其connect_svc_handler()挂钩方法来发起与对端接受器的新连接。该方法的缺省实现只是转移到PEER_CONNECTOR::connect()方法来发起连接,如下所示:
template <typename SVC_HANDLER, typename PEER_CONNECTOR> int
ACE_Connector<SVC_HANDLER, PEER_CONNECTOR>::connect_svc_handler
(SVC_HANDLER *&svc_handler,const typename PEER_CONNECTOR::PEER_ADDR &remote_addr,ACE_Time_Value *timeout,const typename PEER_CONNECTOR::PEER_ADDR &local_addr,int reuse_addr,int flags,int perms)
{ACE_TRACE ("ACE_Connector<SVC_HANDLER, PEER_CONNECTOR>::connect_svc_handler");//connector_为PEER_CONNECTOR对象return this->connector_.connect (svc_handler->peer (),remote_addr,timeout,local_addr,reuse_addr,flags,perms);
}
与先前的客户日志 daemon 类似,我们的新版本也使用了两个线程,它们使用各种 ACE 框架类执行以下任务:
输入处理 主线程使用单体ACE_Reactor、ACE_Acceptor,以及ACE_Svc_Handler 被动对象。通过网络loopback设备从连接到客户应用的socket那读取日志记录。各个日志记录在作为主动对象运行的第二个ACE_Svc_Handler中进行排队。
输出处理 主动对象ACE_Svc_Handler运行在自己的线程中。它从其消息队列中取出消息,在chunk 中缓冲消息,并在TCP连接上将这些chunk转发给服务器日志daemon。我们使用了ACE_Connector的一个子类来建立(并在需要时重新建立)和认证与日志服务器的连接。
下面介绍了基于ACE Acceptor-Connector框架的新客户日志daemon。
下图所示为这些类的实际交互。当服务在运作时,存在着两个线程。 第一个线程是初始化的程序线程,反应器事件循环由它运行。这个线程执行以下处理:
它通过 AC_CLD_Acceptor 接受新的日志客户连接。
它通过 AC_Input_Handler 接收日志记录,并确定断掉的日志客户连接:日志记录被放入AC_Output_Handler 消息队列中。
它在 AC_Output_Handler 中检测断掉的口志服务器连接,并通过 AC_CLD_Connector 更新连接服务器。
转发者线程(第二个线程)是在与日志服务器的初始连接建立时启动的,并且持续地运行到服务终止。AC_Output_Handler 主动对象服务线程由这个线程运行。我们从包括必要的 ACE 头文件开始我们的实现:
#include "ace/OS_NS_string.h"
#include "ace/OS_NS_unistd.h"
#include "ace/OS_NS_sys_socket.h"
#include "ace/OS_NS_sys_time.h"
#include "ace/INET_Addr.h"
#include "ace/SOCK_Acceptor.h"
#include "ace/SOCK_Connector.h"
#include "ace/SOCK_Stream.h"
#include "ace/Acceptor.h"
#include "ace/Connector.h"
#include "ace/Get_Opt.h"
#include "ace/Handle_Set.h"
#include "ace/Log_Record.h"
#include "ace/Truncate.h"
#include "ace/Message_Block.h"
#include "ace/Reactor.h"
#include "ace/Service_Object.h"
#include "ace/Signal.h"
#include "ace/Svc_Handler.h"
#include "ace/Thread_Manager.h"
#include "ace/os_include/os_netdb.h"
#include "Logging_Handler.h"
#include "AC_CLD_export.h"#include "AC_Client_Logging_Daemon.h"#include <openssl/ssl.h>
上图中的各个类是在AC_Client_Logging_Daemon.cpp中实现的,下面对其进行描述:
AC_Input_Handler类主要提供了以下能力:
1、它接收来自客户的日志记录。
2、它将每个日志记录存储在一个ACE_Message_Block中。
3、它将消息块传给AC_Output_Handler 进行处理。
class AC_Input_Handler: public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> {
public:AC_Input_Handler (AC_Output_Handler *handler = 0): output_handler_ (handler) {}virtual int open (void *); // Initialization hook method.virtual int close (u_long = 0); // Shutdown hook method.protected:// Reactor hook methods.virtual int handle_input (ACE_HANDLE handle);virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,ACE_Reactor_Mask = 0);// Pointer to the output handler.AC_Output_Handler *output_handler_;// Keep track of connected client handles.// 集合,可以追踪到所有已连接的客户socketACE_Handle_Set connected_clients_;
};
AC_Input_Handler继承于ACE_Svc_Handler,它可以使用单体 ACE_Reactor 来等待日志记录通过loopback TCP socket,从任意的与客户日志daemon 相连的客户应用到达。因此,它没有使用其消息队列,同时还通过 ACENULLSYNCH策略实例化ACE_Svc_Handler,以使其同步体使用降至最少。当日志记录到达客户日志daemon时,单体ACE Reactor 分派下自的AC_Input_Handler::handle input()挂钩方法:
int AC_Input_Handler::handle_input (ACE_HANDLE handle) {ACE_Message_Block *mblk = 0;//使用Logging_Handler读出数据 Logging_Handler logging_handler (handle);//将数据存储在ACE_Message_Block 中 并将此消息传给AC_Output_Handlerif (logging_handler.recv_log_record (mblk) != -1){if (output_handler_->put (mblk->cont ()) != -1){mblk->cont (0);mblk->release ();return 0; // Success return.}else{mblk->release ();}}return -1; // Error return.
}
这个方法使用了 Logging_Handler类来通过socket句柄读出日志信息,将其存储在ACE_Message_Block中,并将此消息传给AC_Output_Handler类,由它在单独的线程中排队消息,并进行服务。我们放入队列中的只是日志记录数据(通过mbik->cont()引用),而不是主机名(通过mnblk引用)。
如果客户应用断开了连接,或是发生错误,handle_input 将返回-1。这个值触发反应器调用以下的handle_close:
//返回-1时调用 无需删除自身 由AC_CLD_Accetor进行统一管理
int AC_Input_Handler::handle_close (ACE_HANDLE handle,ACE_Reactor_Mask) {connected_clients_.clr_bit (handle);return ACE_OS::closesocket (handle);
}
handle_close无需删除自用 由AC_CLD_Accetor进行统一管理。
当连接请求由客户应用到达客户日志daemon时,AC_CLD_Accetor::handler_input()将分派给下面的AC_Input_Handler::open方法:
int AC_Input_Handler::open (void *) {//复用AC_Input_Handler对象//当AC_CLD_Accept::accept_svc_handler()被调用接收新的连接时,它会自动复用AC_Input_Handler//的ACE_SOCK_Steam中的句柄。ACE_HANDLE handle = peer ().get_handle ();//使用ACE_Reactor::register_handler()方法简单为READ事件登记这一单一对象的指针//当日志记录到达时,则调用AC_Input_Handler::handler_input()函数 if (reactor ()->register_handler(handle, this, ACE_Event_Handler::READ_MASK) == -1)return -1;//记录句柄 使得能够正确地关闭句柄connected_clients_.set_bit (handle);return 0;
}
单体服务处理器和多线程化事件分派可能会引入竞争状态。如果AC_Input_Handler 类与多线程化反应器事件循环一起使用的话,我们可能会失去对某些连接的追踪。因为 AC_Input_Handler 对象的 ACE_SOCK_Stream 成员可被多个线程改变。涉入其中的各个事件分派线程会:
1、调用 AC_CLD_Acceptor::accept_svc_handler()万法,它接受新的 socket,并将其句柄存储在AC_Input_Handler 中.
2、调用AC_CLD_Acceptor::activate_svc_handler()方法,它随即调用上面所示的AC_Input_Handler::open()方法。
AC_CLD_Acceptor::handle_close()或 AC_Clicnt_Logging_Daemon::fini()调用该方法来关闭客户日志 daemon:
int AC_Input_Handler::close (u_long) {//将类型为MB_STOP 0大小消息块插入消息队列中//当运行AC_Input_Handler::svc()的转发者线程会取出这个shutdown_message//然后将剩余的日志刷出flush,给其他的日志服务器消息队列,并退出线程 ACE_Message_Block *shutdown_message = 0;ACE_NEW_RETURN(shutdown_message,ACE_Message_Block (0, ACE_Message_Block::MB_STOP), -1);output_handler_->put (shutdown_message);//移除connected_clients_中的所有句柄 每个句柄将调用handler_close()函数reactor ()->remove_handler(connected_clients_, ACE_Event_Handler::READ_MASK);//在close返回之前,使用output_handler_->wait ()来组设至其svc()挂钩方法退出return output_handler_->wait ();
}
AC_Output_Handler。这个类主要提供了以下能力:
它将 AC_Input _Handler 传给它的消息块插入其同步化消息队列中。
它在自己的线程中作为主动对象运行,从其同步化消息队列中取出消息块,将它们缓冲进chunk,并把这些 chunk 转发给日志服务器。
它向单体 ACE Reactor 登记,以处理日志服务器造成的连接断开,并重新建立连接
class AC_Output_Handler: public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH> {
public:enum { QUEUE_MAX = sizeof (ACE_Log_Record) * ACE_IOV_MAX };virtual int open (void *); // Initialization hook method.// Entry point into the <AC_Output_Handler>.virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0);protected:AC_CLD_Connector *connector_;// 处理与日志服务器的断开连接。virtual int handle_input (ACE_HANDLE handle);// Hook method forwards log records to server logging daemon.// Hook方法将日志记录转发到服务器日志守护进程virtual int svc ();// 使用收集写入操作发送缓冲的日志记录。virtual int send (ACE_Message_Block *chunk[], size_t &count);
};
因为 AC_Output_Handler 派生自 ACE_Svc_Handler,通过 ACE_MT SYNCH 来对其同步 traits 进行实例化,它继承了ACE_SOCK_Stream、ACE_Thread_Manager和同步化 ACE_Message_Queue,以及激活自身,为丰动对象的能力,我们使用了 Half-Sync/Half-Async 模式的变种来构造这个客户日志 daemon 的并发架构,AC_Input_Handler::handle_input()扮演了其中的反应器角色,它通过下面的 put()法将日志记录传给 AC_Output_Handler:
//将消息块放入AC_Output_Handler的同步化消息队列中
int AC_Output_Handler::put (ACE_Message_Block *mb,ACE_Time_Value *timeout) {int retval;//如果putq()调用阻塞,而消息队列被pushed的话,它就会再执行putq()来重试,while ((retval = putq (mb, timeout)) == -1) {if (msg_queue ()->state () != ACE_Message_Queue_Base::PULSED)break;}return retval;
}
这个方法只是将消息块放入AC_Output_Handler的同步化消息队列中。如果putq()调用阻塞,而消息队列被pushed的话,它就会再执行putq()来重试,下面两个方法解释了这个类是怎么使用队列pushing的。
int AC_Output_Handler::open (void *connector) {//保存指向AC_CLD_Connector工厂的指针 如果需要重连 将使用同一个工厂connector_ =static_cast<AC_CLD_Connector *> (connector);//将以连接的socket的发送缓冲区增加大最大int bufsiz = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;peer ().set_option (SOL_SOCKET, SO_SNDBUF,&bufsiz, sizeof bufsiz);//向单体反应器登记this对象,如果日志服务器断开,this对象的hand_input()立即被通知if (reactor ()->register_handler(this, ACE_Event_Handler::READ_MASK) == -1)return -1;//每次与日志服务器的新连接被建立时,都会调用该方法//消息队列一开始处于ACTIVETED状态,因此设置高水位//随后派生一个系统范围的线程执行AC_Input_Handler::svc//AC_Input_Handler::close会等在这个线程退出,所以不用传THR_DETACHEDif (msg_queue ()->activate ()== ACE_Message_Queue_Base::ACTIVATED) {msg_queue ()->high_water_mark (QUEUE_MAX);return activate (THR_SCOPE_SYSTEM);} else return 0;
}
但是,如果消息队列不在 ACTIVATED状态中,我们就知道日志服务器连接是被重新建立的。在这种情况下,消息队列的高水位标已经被设置,而服务线程池也已经在执行。
现在我们描述AC_Output.Handler:svc()挂钩方法,它运行在自己的线程中,将日志记录转发给服务器日志 daemon:如下所示,通过缓冲日志记录,直至达到最大限度的数目或是最大限度的时间己流逝,这种方法使网络吞吐量得以最优化。
int AC_Output_Handler::svc () {//定义消息体ACE_Message_Block *chunk[ACE_IOV_MAX];size_t message_index = 0;//记录消息数量//设置最大的过期时间ACE_Time_Value time_of_last_send (ACE_OS::gettimeofday ());ACE_Time_Value timeout;//屏蔽SIGPIPEACE_Sig_Action no_sigpipe ((ACE_SignalHandler) SIG_IGN);ACE_Sig_Action original_action;no_sigpipe.register_action (SIGPIPE, &original_action);//循环执行 直至达到最大限度的数目或是最大限度的时间己流逝for (;;) {if (message_index == 0) {timeout = ACE_OS::gettimeofday ();timeout += FLUSH_TIMEOUT;}ACE_Message_Block *mblk = 0;if (getq (mblk, &timeout) == -1) {if (errno == ESHUTDOWN) {if (connector_->reconnect () == -1) break;continue;} else if (errno != EWOULDBLOCK) break;else if (message_index == 0) continue;} else {if (mblk->size () == 0&& mblk->msg_type () == ACE_Message_Block::MB_STOP){ mblk->release (); break; }chunk[message_index] = mblk;++message_index;}if (message_index >= ACE_IOV_MAX ||(ACE_OS::gettimeofday () - time_of_last_send>= ACE_Time_Value(FLUSH_TIMEOUT))) {if (this->send (chunk, message_index) == -1) break;time_of_last_send = ACE_OS::gettimeofday ();}}/循环完还存在 通过send统一处理if (message_index > 0)this->send (chunk, message_index);//解开SIGPIPE屏蔽no_sigpipe.restore_action (SIGPIPE, original_action);return 0;
}
我们省略了 AC_Ouput_Handler::send()实现,因为它和 CLD_Handler::send是一样的方法,发送日志记录给日志服务器,如果连接在发送过程中被关闭,它就重新建立与服务器的连接。事实上,AC_Output_Handler::svc()与CLD_Handler::forward()类似。其主要的不同是上面的 17 到 20行会检査队列是否被 pulsed,以对日志服务器连接断开作出响应。
pulse 由下列的AC_Output_Handler::handle_input()方法执行,该方法由单体反应器在服务器关闭与客户日志 daemon 的连接时分派。当连接通过AC_Connector::reconnect()重新建立连接时,ACE_Connector类调用AC_Output_Handler::open(),由它将消息队列的状态变网 ACTIVATED.
int AC_Output_Handler::handle_input (ACE_HANDLE h) {peer ().close (); //关闭socket句柄//将socket从反应堆中移除 并将DONT_CALL传入//当再次有连接请求进来时,会调用open方法,登记新的socketreactor ()->remove_handler(h, ACE_Event_Handler::READ_MASK| ACE_Event_Handler::DONT_CALL);msg_queue ()->pulse (); //将工作交给转发者线程return 0;
}
ACE_Svc_Handler:close()方法将在运行svc()挂钩方法的线程退出时,ACE Acceptor-Connectnr框架自动调用。该方法清理所有动态分配的资源,比如同步化消息队列及其内容,从单体反应器中移除 AC_Output_Handler。
AC_CLD_Acceptor 这个类具备以下能力:
它是创建 AC_Input_Handler的单个实例的工厂
它被动地接受来自客户的连接。
它激活 AC_Input_Handler 的单个实例,后者问单体 ACE Reactor 登记所有的连接
//继承于ACE_Acceptor
class AC_CLD_Acceptor: public ACE_Acceptor<AC_Input_Handler, ACE_SOCK_ACCEPTOR> {
public:// Constructor.AC_CLD_Acceptor (AC_Output_Handler *handler = 0): output_handler_ (handler), input_handler_ (handler) {}protected:typedef ACE_Acceptor<AC_Input_Handler, ACE_SOCK_ACCEPTOR>PARENT;// <ACE_Acceptor> factory method.virtual int make_svc_handler (AC_Input_Handler *&sh);// <ACE_Reactor> close hook method.virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,ACE_Reactor_Mask = 0);// Pointer to the output handler.AC_Output_Handler *output_handler_;// Single input handler.AC_Input_Handler input_handler_;
};
AC_CLD_Acceptor是ACE_Acceptor的子类,所以它继承了上述描述的所有能力。因为我们只需要 AC_Input_Handler 的一个实例,我们ACE_Acceptor::make_svc_handler()方法重新定义如下:
//确保不管所连接的客户数目是多少,都只存在 AC_Input_Handler 的一个实例
int AC_CLD_Acceptor::make_svc_handler (AC_Input_Handler *&sh)
{ sh = &input_handler_; return 0; }
这个方法将服务处理器设置到 input_handler_数据成员的地址,从而确保不管所连接的客户数目是多少,都只存在 AC_Input_Handler 的一个实例。
如果在接受连接或向反应器登记创柄和事件处理器的过程中发生错误,反应器会自功调用下面的AC_CLD_Acceptor::handle_close()方法:
int AC_CLD_Acceptor::handle_close (ACE_HANDLE,ACE_Reactor_Mask) {PARENT::handle_close (); //调用父类的handle_close 来关闭接收器//调用input_handler_ close 关闭AC_Input_Handler的资源//并关闭AC_Output_Handler 的消息队列和svc线程input_handler_.close (); return 0;
}
AC_CLD_Connect类 这个类具备以下功能:
它主动地建立(并在需要时重新建立)并认证与日志服务器的连接。
它激活 AC_Output _Handler 的单个实例,这个实例相对于 AC_Input_Handler 所进行的日志记录接收,并发地将日志记录转发给日志服务器。
class AC_CLD_Connector: public ACE_Connector<AC_Output_Handler, ACE_SOCK_CONNECTOR> {
public:typedef ACE_Connector<AC_Output_Handler, ACE_SOCK_CONNECTOR>PARENT;// Constructor.// 发出OPenSSLAPI调用后,将ssl_ctx_、ssl_作为成员传递AC_CLD_Connector (AC_Output_Handler *handler = 0): handler_ (handler), ssl_ctx_ (0), ssl_ (0) {}// Destructor frees the SSL resources.virtual ~AC_CLD_Connector () {SSL_free (ssl_);SSL_CTX_free (ssl_ctx_);}//FUZZ: disable check_for_lack_ACE_OS// Initialize the Connector.virtual int open (ACE_Reactor *r = ACE_Reactor::instance (),int flags = 0);//FUZZ: enable check_for_lack_ACE_OS// Re-establish a connection to the logging server.int reconnect ();protected:// Connection establishment and authentication hook method.virtual int connect_svc_handler(AC_Output_Handler *&svc_handler,const ACE_SOCK_Connector::PEER_ADDR &remote_addr,ACE_Time_Value *timeout,const ACE_SOCK_Connector::PEER_ADDR &local_addr,int reuse_addr, int flags, int perms);virtual int connect_svc_handler(AC_Output_Handler *&svc_handler,AC_Output_Handler *&sh_copy,const ACE_SOCK_Connector::PEER_ADDR &remote_addr,ACE_Time_Value *timeout,const ACE_SOCK_Connector::PEER_ADDR &local_addr,int reuse_addr, int flags, int perms);// Pointer to <AC_Output_Handler> we're connecting.AC_Output_Handler *handler_;// Address at which logging server listens for connections.ACE_INET_Addr remote_addr_;// The SSL "context" data structure.SSL_CTX *ssl_ctx_;// The SSL data structure corresponding to authenticated SSL// connections.// 与经过身份验证的SSL连接相对应的SSL数据结构.SSL *ssl_;
};
CLD_Connector::open()为法实现执行常规的 ACE_Connector初始化,并且使用了OpenSSL来建立客户的身份。
//因为没有用户界面 假定它的服务器证书和附随的密钥位于一组缺省的文件中
//应用可以通过替换缺省的宏定义来替换缺省的文件名
// CLD_CERTIFICATE_FILENAME 认证证书文件名 CLD_KEY_FILENAME密钥文件名
#if !defined (CLD_CERTIFICATE_FILENAME)
# define CLD_CERTIFICATE_FILENAME "cld-cert.pem"
#endif /* !CLD_CERTIFICATE_FILENAME */
#if !defined (CLD_KEY_FILENAME)
# define CLD_KEY_FILENAME "cld-key.pem"
#endif /* !CLD_KEY_FILENAME */int AC_CLD_Connector::open (ACE_Reactor *r, int flags) {//调用缺省的open来初始化ACE_Acceptif (PARENT::open (r, flags) != 0) return -1;//初始化OpenSSL库OpenSSL_add_ssl_algorithms ();//设置SLL版本3连接,并创建于要认证的连接相应的SSL结构ssl_ctx_ = SSL_CTX_new (SSLv23_client_method ());if (ssl_ctx_ == 0) return -1;//设置用于在建立连接标识服务器的证书和附随的私钥,并随机认证与正确的证书是匹配的//这些代码和证书是以pem格式编码放在指定文件中的 if (SSL_CTX_use_certificate_file (ssl_ctx_,CLD_CERTIFICATE_FILENAME,SSL_FILETYPE_PEM) <= 0|| SSL_CTX_use_PrivateKey_file (ssl_ctx_,CLD_KEY_FILENAME,SSL_FILETYPE_PEM) <= 0|| !SSL_CTX_check_private_key (ssl_ctx_))return -1;//设置一个新的SSL结构。在通过OpenSSL API建立的SSL连接时,该数据结构//被用于AC_CLD_Connector::connnect_svc_handler的挂钩方法ssl_ = SSL_new (ssl_ctx_);if (ssl_ == 0) return -1;return 0;
}
这些代码本质上使用了与 TPC_Logging_Acceptor::open()的实现相同的逻辑来初始化和确认 OpensSL 数据结构。
与CLD_Connector 不同,AC_CLD_Connector 无需实现 connect()方法。反,它复用了ACE_Connector:connect()模板方法。
当连接通过 ACEConnector 被完成时,框架会调用下面的AC_CLD_Connector::connect_svc_handler()挂钩方法:该方法使用了OpenSSL来实现一种认证协议,确认客户日志 daemon有权与服务器日志 daemon连接。另外还校验了服务器的身份。这个协议的服务器日志 daemon 端出现在 TPC_Logging_Acceptor:accept_svc_handler()中,而客户|志daemon端如下所示:
int AC_CLD_Connector::connect_svc_handler(AC_Output_Handler *&svc_handler,const ACE_SOCK_Connector::PEER_ADDR &remote_addr,ACE_Time_Value *timeout,const ACE_SOCK_Connector::PEER_ADDR &local_addr,int reuse_addr, int flags, int perms) {//为缺省的connect_svc_handler建立tcp连接if (PARENT::connect_svc_handler(svc_handler, remote_addr, timeout,local_addr, reuse_addr, flags, perms) == -1) return -1;//复位SSL连接 以用于新的连接SSL_clear (ssl_);SSL_set_fd (ssl_, svc_handler->get_handle ());//配置SSL数据结构,以在建立SSL连接时强制进行对服务器的认证SSL_set_verify (ssl_, SSL_VERIFY_PEER, 0);//执行实际的SSL连接 如果认证失败 连接也会失败if (SSL_connect (ssl_) == -1|| SSL_shutdown (ssl_) == -1) return -1; //如果认证成功关闭连接//保存相连的日志服务器地址 以用于重连AC_CLD_Connector::reconnect()remote_addr_ = remote_addr;return 0;
}
通过重新定义 open()和 connect_svc_handler()挂钩方法,我们可以给我们的客户日志 daemon 增加认证,而又不影响其实现的任何其他的部分。这样的可扩展性展示了在ACE_Connector 类设计中所使用的 Template Method 模式的威力。
下面的 AC_CLD_Connector:reconnect()方法使用了与CLD_Connector:reconnect()所用的相同的指数后退算法来避免发生日志服务器被连接请求“淹没”的情况:
int AC_CLD_Connector::reconnect () {// Maximum number of times to retry connect. 最大重连次数const size_t MAX_RETRIES = 5;ACE_Time_Value timeout (1);size_t i;//进行重连操作for (i = 0; i < MAX_RETRIES; ++i) {ACE_Synch_Options options (ACE_Synch_Options::USE_TIMEOUT,timeout);if (i > 0) ACE_OS::sleep (timeout);if (this->connect (handler_, remote_addr_, options) == 0)break;timeout *= 2; // Exponential backoff. 每次重连时间翻倍}return i == MAX_RETRIES ? -1 : 0;
}
AC_Client_Logging_Daemon。这个类是一个个facade、它将上面描述的其他类集成在一起来实现新的客户日志 daemon。其定义如下所示:
class AC_Client_Logging_Daemon : public ACE_Service_Object {
protected:// Factory that passively connects the <AC_Input_Handler>.AC_CLD_Acceptor acceptor_;// Factory that actively connects the <AC_Output_Handler>.AC_CLD_Connector connector_;// The <AC_Output_Handler> connected by <AC_CLD_Connector>.AC_Output_Handler output_handler_;public:// Constructor.AC_Client_Logging_Daemon (): acceptor_ (&output_handler_),connector_ (&output_handler_) {}// Service Configurator hook methods.virtual int init (int argc, ACE_TCHAR *argv[]);virtual int fini ();//virtual int info (ACE_TCHAR **bufferp, size_t length = 0) const;//virtual int suspend ();//virtual int resume ();
};
AC_Client_Logging_Daemon 继承自 ACE_Service_Object。因此,可以通过 svc.conf 文件来对其进行动态配置,svc.conf文件由之前描述的描述的 ACE Service Confgurator 框架处理。”AC_Cient_Logging_Daemon 的实例被动态链接时,ACE Service Confgurator 框架调用如下所示的AC_Client_Logging Daemon::init()挂钩方法:
int AC_Client_Logging_Daemon::init(int argc, ACE_TCHAR *argv[]) {//指定缺省客户端daemon侦听端口cld_port 缺省服务器端口sld_port 主机名sld_hostu_short cld_port = ACE_DEFAULT_SERVICE_PORT;u_short sld_port = ACE_DEFAULT_LOGGING_SERVER_PORT;ACE_TCHAR sld_host[MAXHOSTNAMELEN];ACE_OS::strcpy (sld_host, ACE_LOCALHOST);//使用ACE_Get_Opt 解析svc.conf文件的传递的选项,-p -s -r的任意信息//是argc和argv数组的-p -s -r的任意选项 如果传入了则会进行相应的修改ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("p:r:s:"), 0);get_opt.long_option (ACE_TEXT ("client_port"), 'p',ACE_Get_Opt::ARG_REQUIRED);get_opt.long_option (ACE_TEXT ("server_port"), 'r',ACE_Get_Opt::ARG_REQUIRED);get_opt.long_option (ACE_TEXT ("server_name"), 's',ACE_Get_Opt::ARG_REQUIRED);for (int c; (c = get_opt ()) != -1;)switch (c) {case 'p': // Client logging daemon acceptor port number.cld_port = static_cast<u_short> (ACE_OS::atoi (get_opt.opt_arg ()));break;case 'r': // Server logging daemon acceptor port number.sld_port = static_cast<u_short> (ACE_OS::atoi (get_opt.opt_arg ()));break;case 's': // Server logging daemon hostname.ACE_OS::strsncpy(sld_host, get_opt.opt_arg (), MAXHOSTNAMELEN);break;}//知道服务器主机名和两个端口号 构建建立连接所需的地址ACE_INET_Addr cld_addr (cld_port);ACE_INET_Addr sld_addr (sld_port, sld_host);//初始化acceptor_和connector_if (acceptor_.open (cld_addr) == -1) return -1;AC_Output_Handler *oh = &output_handler_;if (connector_.connect (oh, sld_addr) == -1){ acceptor_.close (); return -1; }return 0;
}
当客户日志daemon 被移除时,ACE Seryice Confieurator框果会调用下面的AC_Client_Logging_Daemon::fini()挂钩方法:
int AC_Client_Logging_Daemon::fini ()
{ return acceptor_.close (); }
fni()方法调用AC_CLD_Acceptor 继承的close()方法。该方法随即调用AC_CLD_Acceptor::handle_close()来触发消息队列和转发者线程的关闭。ACE Service Configurator 框架在 fini()返回之后删除 AC_Client_Logging_Daemon 实例。图 7.11 描述了这个关闭序列。
现在我们已经实现了客户日志daemon的所有类,可以增加ACE FACTORY_DEFINE宏了
ACE_FACTORY_DEFINE (AC_CLD, AC_Client_Logging_Daemon)
这个宏自动定义_make_AC_Client_Logging_Daemmon()工厂函数,该雨数被用在下面的 svc.conf文件中:
dynamic AC_Client_Logging_Daemon Service_Object *
AC_CLD:_make_AC_Client_Logging_Daemon ()
"-p SEDTENT LOGGING_SERVER_PORT"
这个文件指 ACE Scrvice Configurator 椎架通过以下步骤配置客户日志daemon:
1、它将AC_CLD DLL动态地链接进进的地址空间中
2、它使用副栏中描述的 ACE_DLL 类从 ACE_CLD DLL 符号表中提取_make_AC_Client_Logging_Daemon()工厂函数。
3、调用该函数,以获取一个指问功态分配的AC_Client_Logging_Dacmon 的指针。
4、框架随即通过该指针调用 AC_Client_Logging_Daemon:init(),片将字符串“-p”,以及跟在厅面的,展开了的 CLIENT_LOGGING_DAEMON_PORT 环境变量,作为其 argc/argv 参数传入:这个环境变量指定客户日点daemon用于侦听客户应用连接消求的端口号。
5、如果 init()成功的话,AC_CHient_Logging_Daemon指针被存储在ACE_Service_Repository中,在"AC_Client_Logging_Daemon”名下。
现在我们已经可以显示在 SR_Confgurable_Logging_Server.cpp 中的 main()函数。它与用于其他服务的 Contigurable_Logging_Server.cpp程序类似,但是需要采用一些略为不同的战术。到讨论了在事件处理器不控制自己的生命周期的情况下,可能会出现的一些问题: AC_Client_Logging_Daemon服务是出 ACE Conigurator 框架在服务被激活时动态分配的。但是,其接受器和服务处理器是服务中的成员对象,因此没有对自己的生命周期进行控制。尽管我们的设计小心地管理了处理器的所有生命周期活动,因为ACE_WFMO_Reactor的延级消理语义,在Windows 上仍有可能发生问题。因此,我们将ACE_Reactor 单体显式地没置为ACE_Select_Reactor。
#include "ace/OS_main.h"
#include "ace/OS_Memory.h"
#include "ace/Reactor.h"
#include "ace/Select_Reactor.h"
#include "ace/Service_Config.h"int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) {//ACE_Select_Reactor对象是动态分配的 第三个值是分配失败返回的值 也就是main返回的值ACE_Select_Reactor *select_reactor;ACE_NEW_RETURN (select_reactor, ACE_Select_Reactor, 1);//ACE_Reactor对象也是动态分配的 ACE_Select_Reactor作为要使用的实现传递 //第二个参数是告诉新的ACE_Reactor实例,在反应器关闭时要删除对象ACE_Reactor *reactor;ACE_NEW_RETURN (reactor, ACE_Reactor (select_reactor, 1), 1);//关闭已有的任何ACE_Reactor单体 并使用新的基于ACE_Select_Reactor替换它ACE_Reactor::close_singleton (); ACE_Reactor::instance (reactor, 1);//配置服务ACE_Service_Config::open (argc, argv);//运行反应器事件循环ACE_Reactor::instance ()->run_reactor_event_loop ();return 0;
}
本章中的各个示例阐释了许多强大的ACE Acceptor-Connector技术和策略:
使用单个ACE_Svc_Handler 来处理许多网络连接。
将非动态的服务处理器用于反应器
pulsing服务处理器的消息队列,以与处理器的服务线程进行通作
使用指数后退策略重新连接TCP socket。
用 ACE 会自动删除的反应器替换单体 ACE_Reactor,