服务端代码:
#include "stdafx.h"
#include <iostream>
#include "ace/Message_Queue.h"
#include "ace/Asynch_IO.h"
#include "ace/OS.h"
#include "ace/Proactor.h"
#include "ace/Asynch_Acceptor.h"
using namespace std;
class HA_Proactive_Service :public ACE_Service_Handler
{
public:
~HA_Proactive_Service()
{
if (this->handle() != ACE_INVALID_HANDLE)
{
//ACE_OS:closesocket();
}
}
virtual void open(ACE_HANDLE h, ACE_Message_Block&)
{
this->handle(h);
if (this->reader_.open(*this) != 0 || this->writer_.open(*this) != 0)
{
delete this;
return;
}
ACE_Message_Block* mb;
ACE_NEW_NORETURN(mb, ACE_Message_Block(1024));
if (this->reader_.read(*mb, mb->space()) != 0)
{
printf("read fail");
mb->release();
delete this;
return;
}
}
virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result & result)
{
ACE_Message_Block& mb = result.message_block();
if (!result.success() || result.bytes_transferred() == 0)
{
mb.release();
delete this;
}
else
{
mb.copy(""); //为字符串添加结束标记'/0'
ACE_OS::printf("rev:%s\n", mb.rd_ptr());
if ((this->writer_.write(mb, mb.length())) != 0)
{
printf("write fail");
mb.release();
}
else
{
printf("Before ACE_NEW_NORETURN\n");
ACE_Message_Block* new_mb;
ACE_NEW_NORETURN(new_mb, ACE_Message_Block(1024));
this->reader_.read(*new_mb, new_mb->space());
}
}
return;
}
virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result & result)
{
printf("Enter handle_write_stream\n");
result.message_block().release();
return;
}
private:
ACE_Asynch_Read_Stream reader_;
ACE_Asynch_Write_Stream writer_;
};
int main(int argc,char *argv[])
{
printf("Enter main\n");
int port = 3000;
ACE_Asynch_Acceptor<HA_Proactive_Service>acceptor;
if (acceptor.open(ACE_INET_Addr(port)) == -1)
return -1;
while (true)
ACE_Proactor::instance()->handle_events();
return 0;
}
客户端代码:
#include "stdafx.h"
#include <iostream>
#include "ace/Message_Queue.h"
#include "ace/Asynch_IO.h"
#include "ace/OS.h"
#include "ace/Proactor.h"
#include "ace/Asynch_Acceptor.h"
#include "ace/Asynch_Connector.h"
using namespace std;
class HA_Proactive_Service :public ACE_Service_Handler
{
public:
~HA_Proactive_Service()
{
if (this->handle() != ACE_INVALID_HANDLE)
{
//ACE_OS:closesocket();
}
}
virtual void open(ACE_HANDLE h, ACE_Message_Block&)
{
this->handle(h);
if (this->writer_.open(*this) != 0)
{
delete this;
return;
}
printf("connected\n");
for(int i = 0; i < 10; i++)
{
printf("i is %d\n", i);
ACE_OS::sleep(1);
time_t now = ACE_OS::gettimeofday().sec();
char* time = ctime(&now);
ACE_Message_Block* mb = new ACE_Message_Block(100);
mb->copy(time);
if (this->writer_.write(*mb,mb->length()) != 0)
{
printf("Begin write fail\n");
delete this;
return;
}
}
return;
}
virtual void handle_write_dgram(const ACE_Asynch_Read_Stream::Result& result)
//virtual void handle_write_stream(const ACE_Asynch_Read_Stream::Result& result)
{
printf("Enter handle_write_dgram");
ACE_Message_Block& mb = result.message_block();
mb.release();
return;
}
private:
ACE_Asynch_Write_Stream writer_;
ACE_Asynch_Read_Stream reader_;
};
int main(int argc, char* argv[])
{
ACE_INET_Addr addr(3000, "192.168.31.235");
HA_Proactive_Service* Client = new HA_Proactive_Service();
ACE_Asynch_Connector<HA_Proactive_Service>connector;
connector.open();
if (connector.connect(addr) == -1)
{
return -1;
}
while (true)
{
ACE_Proactor::instance()->handle_events();
}
return 0;
}
服务端打印:
客户端打印:
1)ACE_Service_Handler 是框架提供的接口,而Open,handle_read_stream,handle_write_stream就是接口的模版函数。
2)流操作和处理函数都使用了同一个类:ACE_Asynch_Read_Stream类,对于UDP流,只要使用ACE_Asynch_Read_Dgram类即可。