目录
- EDP发送
- DataWrtier
- EDP接收
EDP对象的创建在FastDDS服务发现之PDP和EDP的创建中有详细介绍,PDP的收发在FastDDS服务发现之PDP和EDP的收发中有详细介绍,本文主要分析Simple EDP报文的发送和消息接收。
EDP发送
在Fast DDS中,Writer和Reader是通信的端点,他们通过topic和data type进行交互。一个writer会发布一个特定topic的数据,而reader则会订阅这个topic的数据。
当EDP开始时,每个participant都会公布其reader和writer的信息(包括topic和data type),并接收其他participant的reader和writer的信息。然后EDP会比较这些信息,如果一个writer的topic和data type与一个reader的topic和data type相匹配,那么这个writer和reader就会被匹配起来,它们就可以进行数据通信。
SEDP的发现阶段是在创建DataWriter和DataReader时进行的。
DataWrtier
DataWrtier对象完成创建之后,开始EDP的发现过程,具体调用流程如下:
PublisherImpl::create_datawriter
—— DataWriterImpl::enable()
—— RTPSParticipantImpl::registerWriter()
—— BuiltinProtocols::addLocalWriter
– EDP::newLocalWriterProxyData
《TODO: EDP发送时序图》
从EDP::newLocalWriterProxyData
函数开始代码分析
bool EDP::newLocalWriterProxyData(RTPSWriter* writer,const TopicAttributes& att,const WriterQos& wqos)
{auto init_fun = [this, writer, &att, &wqos](WriterProxyData* wpd,bool updating,const ParticipantProxyData& participant_data){const NetworkFactory& network = mp_RTPSParticipant->network_factory();const auto& watt = writer->getAttributes();wpd->guid(writer->getGuid());wpd->key() = wpd->guid();if (watt.multicastLocatorList.empty() && watt.unicastLocatorList.empty()){wpd->set_locators(participant_data.default_locators);}else{wpd->set_multicast_locators(watt.multicastLocatorList, network);wpd->set_announced_unicast_locators(watt.unicastLocatorList);fastdds::rtps::network::external_locators::add_external_locators(*wpd,watt.external_unicast_locators);}wpd->RTPSParticipantKey() = mp_RTPSParticipant->getGuid();wpd->topicName(att.getTopicName());wpd->typeName(att.getTopicDataType());wpd->topicKind(att.getTopicKind());if (att.type_id.m_type_identifier._d() != static_cast<uint8_t>(0x00)){wpd->type_id(att.type_id);}if (att.type.m_type_object._d() != static_cast<uint8_t>(0x00)){wpd->type(att.type);}if (att.type_information.assigned()){wpd->type_information(att.type_information);}wpd->typeMaxSerialized(writer->getTypeMaxSerialized());wpd->m_qos.setQos(wqos, true);wpd->userDefinedId(watt.getUserDefinedID());wpd->persistence_guid(watt.persistence_guid);
#if HAVE_SECURITYif (mp_RTPSParticipant->is_secure()){wpd->security_attributes_ = watt.security_attributes().mask();wpd->plugin_security_attributes_ = watt.security_attributes().plugin_endpoint_attributes;}else{wpd->security_attributes_ = 0UL;wpd->plugin_security_attributes_ = 0UL;}
#endif // if HAVE_SECURITYif (att.auto_fill_type_information){// TypeInformation, TypeObject and TypeIdentifierif (!att.type_information.assigned()){const types::TypeInformation* type_info =types::TypeObjectFactory::get_instance()->get_type_information(wpd->typeName().c_str());if (type_info != nullptr){wpd->type_information() = *type_info;}}}if (att.auto_fill_type_object){bool has_type_id = true;if (att.type_id.m_type_identifier._d() == static_cast<uint8_t>(0x00)){has_type_id = false;const types::TypeIdentifier* type_id =types::TypeObjectFactory::get_instance()->get_type_identifier_trying_complete(wpd->typeName().c_str());if (type_id != nullptr){has_type_id = true;wpd->type_id().m_type_identifier = *type_id;}}if (att.type.m_type_object._d() == static_cast<uint8_t>(0x00)){bool type_is_complete = has_type_id &&wpd->type_id().m_type_identifier._d() == types::EK_COMPLETE;const types::TypeObject* type_obj =types::TypeObjectFactory::get_instance()->get_type_object(wpd->typeName().c_str(), type_is_complete);if (type_obj != nullptr){wpd->type().m_type_object = *type_obj;}}}return true;};//ADD IT TO THE LIST OF READERPROXYDATAGUID_t participant_guid;WriterProxyData* writer_data = this->mp_PDP->addWriterProxyData(writer->getGuid(), participant_guid, init_fun);if (writer_data == nullptr){return false;}#ifdef FASTDDS_STATISTICS// notify monitor service about the new local entity proxyif (nullptr != this->mp_PDP->get_proxy_observer()){this->mp_PDP->get_proxy_observer()->on_local_entity_change(writer_data->guid(), true);}
#endif //FASTDDS_STATISTICS//PAIRINGif (this->mp_PDP->getRTPSParticipant()->should_match_local_endpoints()){pairing_writer_proxy_with_any_local_reader(participant_guid, writer_data);} pairingWriter(writer, participant_guid, *writer_data);//DO SOME PROCESSING DEPENDING ON THE IMPLEMENTATION (SIMPLE OR STATIC)processLocalWriterProxyData(writer, writer_data);return true;
}
这段代码分为五部分进行分析。
- 调用
PDP::addWriterProxyData
创建或找到一个WriterProxyData
对象。PDP发现的participant的信息会存放在participant_proxies_
中,所以先从participant_proxies_
中查找PDP发现的participant,EDP阶段发现的对端的RTPSWriter和RTPSReader会存放到ParticipantProxyData
的ProxyHashTable<WriterProxyData>* m_writers
和ProxyHashTable<ReaderProxyData>* m_readers
中,所以会优先查找是否已经将writers/reader存放到ProxyHashTable<WriterProxyData>* m_writers
和ProxyHashTable<ReaderProxyData>* m_readers
中。如果有则直接使用,如果没有则构造一个WriterProxyData
对象,再调用EDP::newLocalWriterProxyData
中的lambda表达式init_fun
进行初始化。这里的初始化是把当前writer的信息赋给WriterProxyData
对象,包括writer的guid,topic name,data type,qos等,这些信息都是需要通过EDP发送给其他已经通过PDP发现了的participant端用于进行EDP的匹配的。所以,保存在ParticipantProxyData
中的ProxyHashTable<ReaderProxyData>
用于将当前的endpooint和EDP监听收到对端的Endpoint进行对比匹配使用。
WriterProxyData* PDP::addWriterProxyData(const GUID_t& writer_guid,GUID_t& participant_guid,std::function<bool(WriterProxyData*, bool, const ParticipantProxyData&)> initializer_func)
{EPROSIMA_LOG_INFO(RTPS_PDP, "Adding writer proxy data " << writer_guid);WriterProxyData* ret_val = nullptr;// notify statistics modulegetRTPSParticipant()->on_entity_discovery(writer_guid, ParameterPropertyList_t());std::lock_guard<std::recursive_mutex> guardPDP(*this->mp_mutex);for (ParticipantProxyData* pit : participant_proxies_){if (pit->m_guid.guidPrefix == writer_guid.guidPrefix){// Copy participant data to be used outside.participant_guid = pit->m_guid;// Check that it is not already there:auto wpi = pit->m_writers->find(writer_guid.entityId);if (wpi != pit->m_writers->end()){ret_val = wpi->second;if (!initializer_func(ret_val, true, *pit)){return nullptr;}RTPSParticipantListener* listener = mp_RTPSParticipant->getListener();if (listener){WriterDiscoveryInfo info(*ret_val);info.status = WriterDiscoveryInfo::CHANGED_QOS_WRITER;listener->onWriterDiscovery(mp_RTPSParticipant->getUserRTPSParticipant(), std::move(info));check_and_notify_type_discovery(listener, *ret_val);}return ret_val;}// Try to take one entry from the poolif (writer_proxies_pool_.empty()){size_t max_proxies = writer_proxies_pool_.max_size();if (writer_proxies_number_ < max_proxies){// Pool is empty but limit has not been reached, so we create a new entry.++writer_proxies_number_;ret_val = new WriterProxyData(mp_RTPSParticipant->getAttributes().allocation.locators.max_unicast_locators,mp_RTPSParticipant->getAttributes().allocation.locators.max_multicast_locators,mp_RTPSParticipant->getAttributes().allocation.data_limits);}else{EPROSIMA_LOG_WARNING(RTPS_PDP, "Maximum number of writer proxies (" << max_proxies <<") reached for participant " << mp_RTPSParticipant->getGuid() << std::endl);return nullptr;}}else{// Pool is not empty, use entry from poolret_val = writer_proxies_pool_.back();writer_proxies_pool_.pop_back();}// Copy network configuration from participant to writer proxyret_val->networkConfiguration(pit->m_networkConfiguration);// Add to ParticipantProxyData(*pit->m_writers)[writer_guid.entityId] = ret_val;if (!initializer_func(ret_val, false, *pit)){return nullptr;}RTPSParticipantListener* listener = mp_RTPSParticipant->getListener();if (listener){WriterDiscoveryInfo info(*ret_val);info.status = WriterDiscoveryInfo::DISCOVERED_WRITER;listener->onWriterDiscovery(mp_RTPSParticipant->getUserRTPSParticipant(), std::move(info));check_and_notify_type_discovery(listener, *ret_val);}return ret_val;}}return nullptr;
}
- 查找当前participant中是否有匹配的reader。这一步在
EDP::pairing_writer_proxy_with_any_local_reader
中实现
bool EDP::pairing_writer_proxy_with_any_local_reader(const GUID_t& participant_guid,WriterProxyData* wdata)
{(void)participant_guid;EPROSIMA_LOG_INFO(RTPS_EDP, wdata->guid() << " in topic: \"" << wdata->topicName() << "\"");mp_RTPSParticipant->forEachUserReader([&, wdata](RTPSReader& r) -> bool{auto temp_reader_proxy_data = get_temporary_reader_proxies_pool().get();GUID_t readerGUID = r.getGuid();if (mp_PDP->lookupReaderProxyData(readerGUID, *temp_reader_proxy_data)){MatchingFailureMask no_match_reason;fastdds::dds::PolicyMask incompatible_qos;bool valid = valid_matching(temp_reader_proxy_data.get(), wdata, no_match_reason, incompatible_qos);const GUID_t& writer_guid = wdata->guid();temp_reader_proxy_data.reset();if (valid){
#if HAVE_SECURITYif (!mp_RTPSParticipant->security_manager().discovered_writer(readerGUID, participant_guid,*wdata, r.getAttributes().security_attributes())){EPROSIMA_LOG_ERROR(RTPS_EDP, "Security manager returns an error for reader " << readerGUID);}
#elseif (r.matched_writer_add(*wdata)){EPROSIMA_LOG_INFO(RTPS_EDP_MATCH,"WP:" << wdata->guid() << " match R:" << r.getGuid() << ". WLoc:" <<wdata->remote_locators());//MATCHED AND ADDED CORRECTLY:if (r.getListener() != nullptr){MatchingInfo info;info.status = MATCHED_MATCHING;info.remoteEndpointGuid = writer_guid;r.getListener()->onReaderMatched(&r, info);const SubscriptionMatchedStatus& sub_info =update_subscription_matched_status(readerGUID, writer_guid, 1);r.getListener()->onReaderMatched(&r, sub_info);}}
#endif // if HAVE_SECURITY}else{if (no_match_reason.test(MatchingFailureMask::incompatible_qos) && r.getListener() != nullptr){r.getListener()->on_requested_incompatible_qos(&r, incompatible_qos);}if (r.matched_writer_is_matched(writer_guid)&& r.matched_writer_remove(writer_guid)){
#if HAVE_SECURITYmp_RTPSParticipant->security_manager().remove_writer(readerGUID, participant_guid,writer_guid);
#endif // if HAVE_SECURITY//MATCHED AND ADDED CORRECTLY:if (r.getListener() != nullptr){MatchingInfo info;info.status = REMOVED_MATCHING;info.remoteEndpointGuid = writer_guid;r.getListener()->onReaderMatched(&r, info);const SubscriptionMatchedStatus& sub_info =update_subscription_matched_status(readerGUID, writer_guid, -1);r.getListener()->onReaderMatched(&r, sub_info);}}}}// keep lookingreturn true;});return true;
}
- 查找ParticipantProxyData中是否有匹配的reader,在EDP::pairingWriter中实现
bool EDP::pairingWriter(RTPSWriter* W,const GUID_t& participant_guid,const WriterProxyData& wdata)
{(void)participant_guid;EPROSIMA_LOG_INFO(RTPS_EDP, W->getGuid() << " in topic: \"" << wdata.topicName() << "\"");std::lock_guard<std::recursive_mutex> pguard(*mp_PDP->getMutex());ResourceLimitedVector<ParticipantProxyData*>::const_iterator pit = mp_PDP->ParticipantProxiesBegin();if (!this->mp_PDP->getRTPSParticipant()->should_match_local_endpoints()){pit++;}for (; pit != mp_PDP->ParticipantProxiesEnd(); ++pit){for (auto& pair : *(*pit)->m_readers){ReaderProxyData* rdatait = pair.second;const GUID_t& reader_guid = rdatait->guid();if (reader_guid == c_Guid_Unknown){continue;}MatchingFailureMask no_match_reason;fastdds::dds::PolicyMask incompatible_qos;bool valid = valid_matching(&wdata, rdatait, no_match_reason, incompatible_qos);if (valid){
#if HAVE_SECURITYif (!mp_RTPSParticipant->security_manager().discovered_reader(W->getGuid(), (*pit)->m_guid,*rdatait, W->getAttributes().security_attributes())){EPROSIMA_LOG_ERROR(RTPS_EDP, "Security manager returns an error for writer " << W->getGuid());}
#elseif (W->matched_reader_add(*rdatait)){EPROSIMA_LOG_INFO(RTPS_EDP_MATCH,"RP:" << rdatait->guid() << " match W:" << W->getGuid() << ". WLoc:" <<rdatait->remote_locators());//MATCHED AND ADDED CORRECTLY:if (W->getListener() != nullptr){MatchingInfo info;info.status = MATCHED_MATCHING;info.remoteEndpointGuid = reader_guid;W->getListener()->onWriterMatched(W, info);const GUID_t& writer_guid = W->getGuid();const PublicationMatchedStatus& pub_info =update_publication_matched_status(reader_guid, writer_guid, 1);W->getListener()->onWriterMatched(W, pub_info);}}
#endif // if HAVE_SECURITY}else{if (no_match_reason.test(MatchingFailureMask::incompatible_qos) && W->getListener() != nullptr){W->getListener()->on_offered_incompatible_qos(W, incompatible_qos);}//EPROSIMA_LOG_INFO(RTPS_EDP,RTPS_CYAN<<"Valid Matching to writerProxy: "<<wdatait->m_guid<<RTPS_DEF<<endl);if (W->matched_reader_is_matched(reader_guid) && W->matched_reader_remove(reader_guid)){
#if HAVE_SECURITYmp_RTPSParticipant->security_manager().remove_reader(W->getGuid(), participant_guid, reader_guid);
#endif // if HAVE_SECURITY//MATCHED AND ADDED CORRECTLY:if (W->getListener() != nullptr){MatchingInfo info;info.status = REMOVED_MATCHING;info.remoteEndpointGuid = reader_guid;W->getListener()->onWriterMatched(W, info);const GUID_t& writer_guid = W->getGuid();const PublicationMatchedStatus& pub_info =update_publication_matched_status(reader_guid, writer_guid, -1);W->getListener()->onWriterMatched(W, pub_info);}}}}}return true;
}
- 组装EDP报文并发送
bool EDPSimple::processLocalWriterProxyData(RTPSWriter* local_writer,WriterProxyData* wdata)
{EPROSIMA_LOG_INFO(RTPS_EDP, wdata->guid().entityId);(void)local_writer;auto* writer = &publications_writer_;#if HAVE_SECURITYif (local_writer->getAttributes().security_attributes().is_discovery_protected){writer = &publications_secure_writer_;}
#endif // if HAVE_SECURITYCacheChange_t* change = nullptr;bool ret_val = serialize_writer_proxy_data(*wdata, *writer, true, &change);if (change != nullptr){writer->second->add_change(change);}return ret_val;
}
EDP接收
EDP对象的创建阶段,会创建两个EDP的listener对象:EDPSimplePUBListener和EDPSimpleSUBListener,用于监听EDP endpoints,所以收到EDP报文后,会回调到EDPSimplePUBListener::onNewCacheChangeAdded中:
void EDPSimpleSUBListener::onNewCacheChangeAdded(RTPSReader* reader,const CacheChange_t* const change_in)
{CacheChange_t* change = (CacheChange_t*)change_in;//std::lock_guard<std::recursive_mutex> guard(*this->sedp_->subscriptions_reader_.first->getMutex());EPROSIMA_LOG_INFO(RTPS_EDP, "");if (!computeKey(change)){EPROSIMA_LOG_WARNING(RTPS_EDP, "Received change with no Key");}ReaderHistory* reader_history =
#if HAVE_SECURITYreader == sedp_->subscriptions_secure_reader_.first ?sedp_->subscriptions_secure_reader_.second :
#endif // if HAVE_SECURITYsedp_->subscriptions_reader_.second;if (change->kind == ALIVE){PREVENT_PDP_DEADLOCK(reader, change, sedp_->mp_PDP);// Note: change is removed from history inside this method.add_reader_from_change(reader, reader_history, change, sedp_);}else{//REMOVE WRITER FROM OUR READERS:EPROSIMA_LOG_INFO(RTPS_EDP, "Disposed Remote Reader, removing...");GUID_t reader_guid = iHandle2GUID(change->instanceHandle);//Removing change from historyreader_history->remove_change(change);reader->getMutex().unlock();this->sedp_->mp_PDP->removeReaderProxyData(reader_guid);reader->getMutex().lock();}
}
这个函数中在判断change->kind == ALIVE之后核心实现为调用EDPBaseSUBListener::add_reader_from_change:
void EDPBaseSUBListener::add_reader_from_change(RTPSReader* reader,ReaderHistory* reader_history,CacheChange_t* change,EDP* edp,bool release_change /*=true*/)
{//LOAD INFORMATION IN TEMPORAL WRITER PROXY DATAconst NetworkFactory& network = edp->mp_RTPSParticipant->network_factory();CDRMessage_t tempMsg(change->serializedPayload);auto temp_reader_data = edp->get_temporary_reader_proxies_pool().get();if (temp_reader_data->readFromCDRMessage(&tempMsg, network,edp->mp_RTPSParticipant->has_shm_transport(), true, change->vendor_id)){if (temp_reader_data->guid().guidPrefix == edp->mp_RTPSParticipant->getGuid().guidPrefix){EPROSIMA_LOG_INFO(RTPS_EDP, "From own RTPSParticipant, ignoring");return;}auto copy_data_fun = [&temp_reader_data, &network](ReaderProxyData* data,bool updating,const ParticipantProxyData& participant_data){if (!temp_reader_data->has_locators()){temp_reader_data->set_remote_locators(participant_data.default_locators, network, true);}if (updating && !data->is_update_allowed(*temp_reader_data)){EPROSIMA_LOG_WARNING(RTPS_EDP,"Received incompatible update for ReaderQos. reader_guid = " << data->guid());}*data = *temp_reader_data;return true;};//LOOK IF IS AN UPDATED INFORMATIONGUID_t participant_guid;ReaderProxyData* reader_data =edp->mp_PDP->addReaderProxyData(temp_reader_data->guid(), participant_guid, copy_data_fun);// Release the temporary proxytemp_reader_data.reset();// Remove change from history.reader_history->remove_change(reader_history->find_change(change), release_change);// At this point we can release reader lock, cause change is not usedreader->getMutex().unlock();if (reader_data != nullptr) //ADDED NEW DATA{edp->pairing_reader_proxy_with_any_local_writer(participant_guid, reader_data);}else{EPROSIMA_LOG_WARNING(RTPS_EDP, "From UNKNOWN RTPSParticipant, removing");}// Take again the reader lock.reader->getMutex().lock();}
}