目录
- 1、编写一个示例
- 1.1、IDL接口定义
- 1.2、MPC文件介绍
- 1.3、生成解决方案
- 2、通讯测试
- 2.1、使用repo server 通讯
- 2.2、使用repo ip+port方式
- 2.3、对等发现face
1、编写一个示例
1.1、IDL接口定义
假设我们现在有以下结构:
struct MessagerOne
{
int subject_id;
string time;
string text;
int index;
double start Counter;
double frequency;};
struct Heartbeat
{int use_id;int index;};
那么我们将其编写为idl格式,文件名为DDSData.h,文件内容为:
module DDSData{@topicstruct MessagerOne{@key long subject_id;string time;string text;long index;double startCounter;double frequency;}; @topicstruct Heartbeat{@key long use_id;long index;};
};
然后生成DDSData _Export.h文件: 此处的名称规则为DDSData _Export.h
perl D:\OpenDDS\ACE_wrappers\bin\generate_export_file.pl DDSData > DDSData_Export.h
生成的文件内容:
1.2、MPC文件介绍
dcps 项目就是用来生成动态库的,dcps_java项目是用于生成jar包的,另外俩个项目就是一个简易的订阅/发布例子。
这里缺少 Publisher.cpp、 Subscriber.cpp、DataReaderListenerImpl.cpp、DataReaderListenerImpl.h四个文件,后面会贴出代码。
project(*idl): dcps {requires += no_opendds_safety_profileTypeSupport_Files {DDSData.idl}custom_only = 1
}project(*java): dcps_java {idlflags += -Wb,stub_export_include=DDSData_Export.h \-Wb,stub_export_macro=DDSData_Exportdcps_ts_flags += -Wb,export_export_include=DDSData_Export.h \-Wb,export_export_macro=DDSData_Exportidl2jniflags += -Wb,stub_export_include=DDSData_Export.h \-Wb,stub_export_macro=DDSData_Exportdynamicflags += DDSDATA_BUILD_DLLTypeSupport_Files {DDSData.idl}specific{jarname = DDSData}Source_Files {}
}project(*publisher) : dcpsexe, dcps_tcp, dcps_rtps_udp {requires += no_opendds_safety_profileexename = publisherafter += *idlTypeSupport_Files {DDSData.idl}Source_Files {Publisher.cpp}
}project(*subscriber) : dcpsexe, dcps_tcp, dcps_rtps_udp {requires += no_opendds_safety_profileexename = subscriberafter += *publisherTypeSupport_Files {DDSData.idl}Source_Files {DataReaderListenerImpl.cppSubscriber.cpp}
}
Publisher.cpp
#include "DDSData_TS.hpp"
#include "ace/Log_Msg.h"
#include "ace/OS_NS_unistd.h"#ifdef ACE_AS_STATIC_LIBS
# include "dds/DCPS/RTPS/RtpsDiscovery.h"
# include "dds/DCPS/transport/rtps_udp/RtpsUdp.h"
#endif// FUZZ: disable check_for_improper_main_declaration
int main(int, char*[])
{// Initialize the TS interfaceFACE::RETURN_CODE_TYPE status;FACE::TS::Initialize("face_config.ini", status);if (status != FACE::RC_NO_ERROR) {return static_cast<int>(status);}// Create the pub connectionFACE::CONNECTION_ID_TYPE connId;FACE::CONNECTION_DIRECTION_TYPE dir;FACE::MESSAGE_SIZE_TYPE max_msg_size;FACE::TS::Create_Connection("pub", FACE::PUB_SUB, connId, dir,max_msg_size, FACE::INF_TIME_VALUE, status);if (status != FACE::RC_NO_ERROR) {return static_cast<int>(status);}// Message to sendMessenger::Message msg;msg.text = "Hello, World!";msg.subject_id = 14;msg.count = 1;// Send messageFACE::TRANSACTION_ID_TYPE txn;ACE_DEBUG((LM_INFO, "Publisher: about to Send_Message()\n"));FACE::TS::Send_Message(connId, FACE::INF_TIME_VALUE, txn, msg,max_msg_size, status);if (status != FACE::RC_NO_ERROR) {return static_cast<int>(status);}// Give message time to be processed before exitingACE_OS::sleep(15);// Destroy the pub connectionFACE::TS::Destroy_Connection(connId, status);if (status != FACE::RC_NO_ERROR) {return static_cast<int>(status);}return EXIT_SUCCESS;
}
// FUZZ: enable check_for_improper_main_declaration
Subscriber.cpp
#include "DDSData_TS.hpp"
#include "ace/Log_Msg.h"#ifdef ACE_AS_STATIC_LIBS
# include "dds/DCPS/RTPS/RtpsDiscovery.h"
# include "dds/DCPS/transport/rtps_udp/RtpsUdp.h"
#endif// FUZZ: disable check_for_improper_main_declaration
int main(int, char*[])
{// Initialize the TS interfaceFACE::RETURN_CODE_TYPE status;FACE::TS::Initialize("face_config.ini", status);if (status != FACE::RC_NO_ERROR) {return static_cast<int>(status);}// Create the sub connectionFACE::CONNECTION_ID_TYPE connId;FACE::CONNECTION_DIRECTION_TYPE dir;FACE::MESSAGE_SIZE_TYPE max_msg_size;FACE::TS::Create_Connection("sub", FACE::PUB_SUB, connId, dir,max_msg_size, FACE::INF_TIME_VALUE, status);if (status != FACE::RC_NO_ERROR) {return static_cast<int>(status);}// Receive a messageconst FACE::TIMEOUT_TYPE timeout = FACE::INF_TIME_VALUE;FACE::TRANSACTION_ID_TYPE txn;Messenger::Message msg;ACE_DEBUG((LM_INFO, "Subscriber: about to Receive_Message()\n"));FACE::TS::Receive_Message(connId, timeout, txn, msg, max_msg_size, status);if (status != FACE::RC_NO_ERROR) {return static_cast<int>(status);}// Output the messageACE_DEBUG((LM_INFO, "%C\t%d\n", msg.text.in(), msg.count));// Destroy the sub connectionFACE::TS::Destroy_Connection(connId, status);if (status != FACE::RC_NO_ERROR) {return static_cast<int>(status);}return EXIT_SUCCESS;
}
// FUZZ: enable check_for_improper_main_declaration
DataReaderListenerImpl.h
#ifndef DATAREADER_LISTENER_IMPL_H
#define DATAREADER_LISTENER_IMPL_H#include <ace/Global_Macros.h>#include <dds/DdsDcpsSubscriptionC.h>
#include <dds/DCPS/LocalObject.h>
#include <dds/DCPS/Definitions.h>class DataReaderListenerImpl: public virtual OpenDDS::DCPS::LocalObject<DDS::DataReaderListener> {
public:virtual void on_requested_deadline_missed(DDS::DataReader_ptr reader,const DDS::RequestedDeadlineMissedStatus& status);virtual void on_requested_incompatible_qos(DDS::DataReader_ptr reader,const DDS::RequestedIncompatibleQosStatus& status);virtual void on_sample_rejected(DDS::DataReader_ptr reader,const DDS::SampleRejectedStatus& status);virtual void on_liveliness_changed(DDS::DataReader_ptr reader,const DDS::LivelinessChangedStatus& status);virtual void on_data_available(DDS::DataReader_ptr reader);virtual void on_subscription_matched(DDS::DataReader_ptr reader,const DDS::SubscriptionMatchedStatus& status);virtual void on_sample_lost(DDS::DataReader_ptr reader,const DDS::SampleLostStatus& status);
};#endif /* DATAREADER_LISTENER_IMPL_H */
DataReaderListenerImpl.cpp
#include <ace/Log_Msg.h>
#include <ace/OS_NS_stdlib.h>#include "DataReaderListenerImpl.h"
#include "MessengerTypeSupportC.h"
#include "MessengerTypeSupportImpl.h"#include <iostream>void
DataReaderListenerImpl::on_requested_deadline_missed(DDS::DataReader_ptr /*reader*/,const DDS::RequestedDeadlineMissedStatus& /*status*/)
{
}void
DataReaderListenerImpl::on_requested_incompatible_qos(DDS::DataReader_ptr /*reader*/,const DDS::RequestedIncompatibleQosStatus& /*status*/)
{
}void
DataReaderListenerImpl::on_sample_rejected(DDS::DataReader_ptr /*reader*/,const DDS::SampleRejectedStatus& /*status*/)
{
}void
DataReaderListenerImpl::on_liveliness_changed(DDS::DataReader_ptr /*reader*/,const DDS::LivelinessChangedStatus& /*status*/)
{
}void
DataReaderListenerImpl::on_data_available(DDS::DataReader_ptr reader)
{Messenger::MessageDataReader_var reader_i =Messenger::MessageDataReader::_narrow(reader);if (!reader_i) {ACE_ERROR((LM_ERROR,ACE_TEXT("ERROR: %N:%l: on_data_available() -")ACE_TEXT(" _narrow failed!\n")));ACE_OS::exit(1);}Messenger::Message message;DDS::SampleInfo info;const DDS::ReturnCode_t error = reader_i->take_next_sample(message, info);if (error == DDS::RETCODE_OK) {std::cout << "SampleInfo.sample_rank = " << info.sample_rank << std::endl;std::cout << "SampleInfo.instance_state = " << OpenDDS::DCPS::InstanceState::instance_state_mask_string(info.instance_state) << std::endl;if (info.valid_data) {// 数据的输出 取决于你的IDL定义 std::cout << "Message: subject = " << message.subject.in() << std::endl<< " subject_id = " << message.subject_id << std::endl<< " from = " << message.from.in() << std::endl<< " count = " << message.count << std::endl<< " text = " << message.text.in() << std::endl;}} else {ACE_ERROR((LM_ERROR,ACE_TEXT("ERROR: %N:%l: on_data_available() -")ACE_TEXT(" take_next_sample failed!\n")));}
}void
DataReaderListenerImpl::on_subscription_matched(DDS::DataReader_ptr /*reader*/,const DDS::SubscriptionMatchedStatus& /*status*/)
{
}void
DataReaderListenerImpl::on_sample_lost(DDS::DataReader_ptr /*reader*/,const DDS::SampleLostStatus& /*status*/)
{
}
1.3、生成解决方案
通过mwc生成vs解决方案
perl D:\OpenDDS\ACE_wrappers\bin\mwc.pl -type vs2019 -features java=1
生成后如下图所示:(文件名称忽略掉,不是一个工程)
打开*.sln,编译Messenger_Idl工程,无报错说明C++端的已生成成功。编译Messenger_Java工程,无报错说明java端的已生成成功。
2、通讯测试
2.1、使用repo server 通讯
运行程序DCPSInfoRepo,subscriber,publisher;
先在A机器上运行DCPSInfoRepo和subscriber;然后在B机器上运行publisher。
(1)A机器上先运行&DDS_ROOT/bin/DCPSInfoRepo-ORBDebugLevel 10 -ORBLogFile DCPSInfoRepo.log -o repo.ior;
(2)拷贝A机器上repo.ior文件到B机器上;
(3)A机器上运行./subscriber -ORBDebugLevel 10 -DCPSDebugLevel 10 – DCPSTransportDebugLevel 6 -ORBLogFile subscriber.log
(4)B机器上运行./publisher -ORBDebugLevel 10 -DCPSDebugLevel 10 – ORBLogFile publisher.log
注意事项:
(1) DCPSInfoRepo生成的repo.ior文件必须存放在subscriber和publisher工程文件目录下;
(2) 必须先运行DCPSInfoRepo,然后拷贝repo.ior文件到其他机器上;
rtps.ini文件内容(DCPSInfoRepo后面的路径为你的实际路径)
[common]
DCPSInfoRepo=file://D:\OpenDDS\DevGuideExamples\DCPS\Messenger\repo.ior[transport/1]
transport_type=tcp
2.2、使用repo ip+port方式
运行程序DCPSInfoRepo,subscriber,publisher;
先在A机器上运行DCPSInfoRepo和subscriber;然后在B机器上运行publisher。
DCPSInfoRepo:
DCPSInfoRepo -ORBListenEndpoints iiop://192.168.175.135:12345 -ORBDebugLevel 10 -ORBLogFile DCPSInfoRepo.log
subscriber订阅:
.\subscriber.exe -DCPSConfigFile rtps.ini
publisher发布:
.\publisher.exe -DCPSConfigFile rtps.ini
rtps.ini文件内容
[common]
DCPSDebugLevel=0
DCPSInfoRepo=corbaloc::192.168.0.11:12345/DCPSInfoRepo
DCPSChunks=20
DCPSChunkAssociationMutltiplier=10
DCPSLivelinessFactor=80
DCPSBit=0
DCPSGlobalTransportConfig=$file[domain/42]
DiscoveryConfig=uni_rtps[rtps_discovery/uni_rtps]
SedpMulticast=0
ResendPeriod=2[transport/the_rtps_transport]
transport_type=rtps_udp
use_multicast=0[topic/Message]
platform_view_guid=103
type_name=DDSData::MessageOne
max_message_size=300
2.3、对等发现face
运行程序subscriber,publisher;
先在A机器上运行subscriber;然后在B机器上运行publisher。
rtps.ini文件内容
[common]
DCPSGlobalTransportConfig=$file
DCPSDefaultDiscovery=DEFAULT_RTPS[domain/3]
DiscoveryConfig=uni_rtps[rtps_discovery/uni_rtps]
SedpMulticast=0
ResendPeriod=2[transport/the_rtps_transport]
transport_type=rtps_udp
use_multicast=0[topic/Message]
platform_view_guid=103
type_name=DDSData::MessageOne
max_message_size=300
关于Java端的代码具体可参考OpenDDS\java\tests\messenger下的demo。