fastdds:传输层SHM和DATA-SHARING的区别
下图是fastdds官方的图,清晰地展示了dds支持的传输层:
根据通信双方的相对位置(跨机器、同机器跨进程、同进程)的不同选择合适的传输层,是通信中间件必须要考虑的事情。
跨机器:udp、tcp
跨机器通信,只能通过网络, fastdds支持UDP和TCP。
同机器,跨进程:SHM、DATA-SHARING
在同一个机器中,使用UDP和TCP进行通信,当然也是可以的。但是,从性能的角度来考虑,更推荐使用SHM,SHM和UDP/TCP相比有2点优势:
1、减少系统调用次数
共享内存创建完成之后,后边使用的时候直接是内存操作,而UCP/TCP每次发送或者接收的的时候,都要通过系统调用。
2、支持超长消息
UDP/TCP受协议栈的影响,可能会对消息进行分片和组装。
SHM也不完全是优点,也有自己的缺点,比如实现复杂:
1、使用UDP/TCP来收发包,功能成熟,拿来就用。使用SHM进行通信的话,很多功能需要自己实现,比如进程间的同步(通过信号量来实现),buffer的管理(segment)等。
同进程:INTRA-PROCESS
如果writer和reader在同一个进程,那么可以使用这种传输方式,writer线程会直接调用reader的回调函数,在同一个线程中。
如下图所示,使用如下命令gdb --args ./delivery_mechanisms pubsub -m INTRA-PROCESS,并对函数PubSubApp::on_data_available设置断点,可以看到调用栈,使用INTRA-PROCESS的时候,subscriber的接收回调函数直接被发送线程调用,在同一个线程中。这是一种效率最高的方式,去掉了中间缓存的环节。
在默认情况下,在同进程的通信使用intra-process,同机器跨进程使用SHM,跨机器使用UDP。
从上图可以看出,同机器的不同进程间通信可以使用SHM,也可以使用DATA-SHARING,那么两者有什么区别呢?
本文使用fastdds中的例子delivery_mechanisms,这个例子可以测试不同的传输层。
本文先介绍SHM和DATA-SHARING传输层的数据收发流程,最后总结两者之间的异同。
1SHM收发流程
简单来说,如果要看SHM的发送流程,那么可以以函数SharedMemTransport::send为中心进行梳理,通过gdb对这个函数设置断点,可以查看这个函数的调用栈,阅读这个函数的代码,可以看这个函数的发送操作;如果要看SHM的接收过程,可以通过函数SharedMemChannelResource::perform_listen_operation来梳理,对于TCP,UDP,SHM来说,三种传输层都实现了自己的函数perform_listen_operation,这个函数监听接收数据。
上图中左边是TranportDescriptor,右边是Transport。用户在创建传输层的时候,并不需要直接创建一个Transport,而是构造一个传输层的描述符即可,fastdds内部根据描述符来创建传输层。这是典型的设计模式中的建造者模式,当构造一个对象时,如果需要传递较多的参数,并且参数之间还有一些依赖关系,那么构造函数实现起来就会比较复杂,这个时候就可以使用建造者模式,在建造者中对参数进行检查,如果没有问题则直接构造对象,让构造函数只专注于对象的构造,至于参数的检查工作,放到构造器中来完成。
上图中左图的SenderResource负责发送数据,右图的ChannelResource负责接收数据。
UDP、TCP和SHM的类有平行的关系,他们也有自己的TransportDescriptor、Tranport、SenderResource、ChannelResource类。 通过一个基类或者接口派生出3个传输层,体现了c++中多态的使用。
Locator用来描述一个端点,其中kind包括LOCATOR_KIND_TCPv4、LOCATOR_KIND_UDPv4、 LOCATOR_KIND_TCPv6、LOCATOR_KIND_UDPv6、LOCATOR_KIND_SHM;port就是通信的端口,比如7400、7410这些;address是地址,在UDP、TCP中就是ip地址。只要知道要发送对象的Locator信息,那么数据就可以发送出去。
Locator {int32_t kind;uint32_t port;std::string address;
}
1.1发送
下图是对SharedMemTransport::send设置断点,看到的调用栈。
发送流程:
①用户调用DataWriter::write进行发送
②在DataWriterHistory中,将要发送的数据封装到一个CacheChange_t中
③在发送侧,dds和rtps之间的接口类是dds侧的DataWriterHistory,rtps侧的WriterHistory
④rtps中的BaseWriter进行发送
⑤RtpsParticipantImpl::sendSync进行发送,SenderResource属于Participant中的资源,在Participant中调用SenderResource的send函数,最终调用到SharedMemTransport的send
然后我们再来看看SharedMemTransport::send函数内部的实现:
bool SharedMemTransport::send(const std::vector<NetworkBuffer>& buffers,uint32_t total_bytes,fastdds::rtps::LocatorsIterator* destination_locators_begin,fastdds::rtps::LocatorsIterator* destination_locators_end,const std::chrono::steady_clock::time_point& max_blocking_time_point)
{...fastdds::rtps::LocatorsIterator& it = *destination_locators_begin;bool ret = true;std::shared_ptr<SharedMemManager::Buffer> shared_buffer;try{while (it != *destination_locators_end){if (IsLocatorSupported(*it)){if (shared_buffer == nullptr){shared_buffer = copy_to_shared_buffer(buffers, total_bytes, max_blocking_time_point);}ret &= send(shared_buffer, *it);...}++it;}}...return ret;}
①对于第一次循环,需要通过函数copy_to_shared_buffer将数据保存到共享内存中
在函数copy_to_shared_buffer中打印shared_mem_segment_的信息,可以看到segment name是fastdds_aec0cadd732cebe2,这个就是发送数据要拷贝的共享内存。所以说,使用共享内存发送数据时,会进行一次数据拷贝。
将数据拷贝到共享内存之后,就是通知接收者,通知接收者需要两个步骤来实现:
①构造一个BufferDescrptor(简称BD),并将BD发送到对应的端口
②环形监听者
构造一个BufferDescriptor:
唤醒监听者:
1.2接收
对Subscriber的接收回调设置断点,调用栈如下:
2DATA-SHARING收发流程
DATA-SHARING不是一个标准的传输层,没有TransportDescriptor、Transport、SenderResource、ChannelResource这些资源。
2.1发送
delivery mechanisms这个例子,publish的时候,并不是构造了一个临时的变量发送出去的,而是通过load_sample首先从底层申请了一块内存,而这块内存就是从data sharing的writer pool中申请的,这样数据直接就是保存在这里的,payload pool就是从这里申请的,申请的是data sharing的writer pool,而writer pool就是基于共享内存创建的。所以说,在发送时,没有拷贝。
std::shared_ptr<IPayloadPool> DataWriterImpl::get_payload_pool()
{
if (!payload_pool_)
{
// Avoid calling the serialization size functors on PREALLOCATED mode
fixed_payload_size_ =
pool_config_.memory_policy == PREALLOCATED_MEMORY_MODE ? pool_config_.payload_initial_size : 0u;
// Get payload pool reference and allocate space for our history
if (is_data_sharing_compatible_)
{
payload_pool_ = DataSharingPayloadPool::get_writer_pool(pool_config_);
}
else
{
payload_pool_ = TopicPayloadPoolRegistry::get(topic_->get_name(), pool_config_);
if (!std::static_pointer_cast<ITopicPayloadPool>(payload_pool_)->reserve_history(pool_config_, false))
{
payload_pool_.reset();
}
}
// Prepare loans collection for plain types only
if (type_->is_plain(data_representation_))
{
loans_.reset(new LoanCollection(pool_config_));
}
}
return payload_pool_;
}
在这个函数里把数据发送到对方
DeliveryRetCode StatefulWriter::deliver_sample_nts(
CacheChange_t* cache_change,
RTPSMessageGroup& group,
LocatorSelectorSender& locator_selector, // Object locked by FlowControllerImpl
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time)
{
DeliveryRetCode ret_code = DeliveryRetCode::DELIVERED;
if (there_are_local_readers_)
{
deliver_sample_to_intraprocesses(cache_change);
}
// Process datasharing then
if (there_are_datasharing_readers_)
{
deliver_sample_to_datasharing(cache_change);
}
if (there_are_remote_readers_)
{
ret_code = deliver_sample_to_network(cache_change, group, locator_selector, max_blocking_time);
}
check_acked_status();
return ret_code;
}
发送侧通知接收侧
(gdb) bt
#0 eprosima::fastdds::rtps::DataSharingNotifier::notify (this=0x7fffe00029b0) at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/DataSharing/DataSharingNotifier.hpp:78
#1 0x00007ffff7788956 in eprosima::fastdds::rtps::ReaderLocator::datasharing_notify (this=0x7fffe0002568) at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/writer/ReaderLocator.cpp:237
#2 0x00007ffff77a3cf8 in eprosima::fastdds::rtps::ReaderProxy::datasharing_notify (this=0x7fffe0002560) at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/writer/ReaderProxy.hpp:388
#3 0x00007ffff779761a in eprosima::fastdds::rtps::StatefulWriter::deliver_sample_to_datasharing (this=0x555555789150, change=0x555555786f70)
at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/writer/StatefulWriter.cpp:612
#4 0x00007ffff779eab1 in eprosima::fastdds::rtps::StatefulWriter::deliver_sample_nts (this=0x555555789150, cache_change=0x555555786f70, group=..., locator_selector=..., max_blocking_time=...)
at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/writer/StatefulWriter.cpp:2130
#5 0x00007ffff762d4af in eprosima::fastdds::rtps::FlowControllerImpl<eprosima::fastdds::rtps::FlowControllerSyncPublishMode, eprosima::fastdds::rtps::FlowControllerFifoSchedule>::add_new_sample_impl<eprosima::fastdds::rtps::FlowControllerSyncPublishMode> (this=0x55555567f9d0, writer=0x555555789150, change=0x555555786f70, max_blocking_time=...)
at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp:1191
#6 0x00007ffff762a44c in eprosima::fastdds::rtps::FlowControllerImpl<eprosima::fastdds::rtps::FlowControllerSyncPublishMode, eprosima::fastdds::rtps::FlowControllerFifoSchedule>::add_new_sample (
this=0x55555567f9d0, writer=0x555555789150, change=0x555555786f70, max_blocking_time=...) at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp:1012
#7 0x00007ffff7796415 in eprosima::fastdds::rtps::StatefulWriter::unsent_change_added_to_history (this=0x555555789150, change=0x555555786f70, max_blocking_time=...)
at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/writer/StatefulWriter.cpp:386
#8 0x00007ffff7644784 in eprosima::fastdds::rtps::WriterHistory::notify_writer (this=0x555555788ca0, a_change=0x555555786f70, max_blocking_time=...)
at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/history/WriterHistory.cpp:201
#9 0x00007ffff71efd05 in eprosima::fastdds::rtps::WriterHistory::add_change_with_commit_hook<eprosima::fastdds::dds::DataWriterImpl::perform_create_new_change(eprosima::fastdds::rtps::ChangeKind_t, void const*, eprosima::fastdds::rtps::WriteParams&, const InstanceHandle_t&)::<lambda(eprosima::fastdds::dds::DataWriterImpl::CacheChange_t&)> >(eprosima::fastdds::rtps::CacheChange_t *, eprosima::fastdds::rtps::WriteParams &, struct {...}, std::chrono::time_point<std::chrono::_V2::steady_clock, std::chrono::duration<long, std::ratio<1, 1000000000> > >) (this=0x555555788ca0, a_change=0x555555786f70,
wparams=..., pre_commit=..., max_blocking_time=...) at /root/Fast-DDS/Fast-DDS/include/fastdds/rtps/history/WriterHistory.hpp:299
#10 0x00007ffff71ef6a0 in eprosima::fastdds::dds::DataWriterHistory::add_pub_change_with_commit_hook<eprosima::fastdds::dds::DataWriterImpl::perform_create_new_change(eprosima::fastdds::rtps::ChangeKind_t, void const*, eprosima::fastdds::rtps::WriteParams&, const InstanceHandle_t&)::<lambda(eprosima::fastdds::dds::DataWriterImpl::CacheChange_t&)> >(eprosima::fastdds::rtps::CacheChange_t *, eprosima::fastdds::rtps::WriteParams &, struct {...}, std::unique_lock<std::recursive_timed_mutex> &, const std::chrono::time_point<std::chrono::_V2::steady_clock, std::chrono::duration<long, std::ratio<1, 1000000000> > > &) (this=0x555555788ca0, change=0x555555786f70, wparams=..., pre_commit=..., lock=..., max_blocking_time=...) at /root/Fast-DDS/Fast-DDS/src/cpp/fastdds/publisher/DataWriterHistory.hpp:159
#11 0x00007ffff71e830d in eprosima::fastdds::dds::DataWriterImpl::perform_create_new_change (this=0x55555577d430, change_kind=eprosima::fastdds::rtps::ALIVE, data=0x7ffff6233ef4, wparams=..., handle=...)
at /root/Fast-DDS/Fast-DDS/src/cpp/fastdds/publisher/DataWriterImpl.cpp:1057
#12 0x00007ffff71e88e0 in eprosima::fastdds::dds::DataWriterImpl::create_new_change_with_params (this=0x55555577d430, changeKind=eprosima::fastdds::rtps::ALIVE, data=0x7ffff6233ef4, wparams=...)
at /root/Fast-DDS/Fast-DDS/src/cpp/fastdds/publisher/DataWriterImpl.cpp:1131
#13 0x00007ffff71e7ba9 in eprosima::fastdds::dds::DataWriterImpl::create_new_change (this=0x55555577d430, changeKind=eprosima::fastdds::rtps::ALIVE, data=0x7ffff6233ef4)
at /root/Fast-DDS/Fast-DDS/src/cpp/fastdds/publisher/DataWriterImpl.cpp:976
#14 0x00007ffff71e6312 in eprosima::fastdds::dds::DataWriterImpl::write (this=0x55555577d430, data=0x7ffff6233ef4) at /root/Fast-DDS/Fast-DDS/src/cpp/fastdds/publisher/DataWriterImpl.cpp:655
#15 0x00007ffff71d9d8f in eprosima::fastdds::dds::DataWriter::write (this=0x55555577db90, data=0x7ffff6233ef4) at /root/Fast-DDS/Fast-DDS/src/cpp/fastdds/publisher/DataWriter.cpp:84
#16 0x0000555555601386 in eprosima::fastdds::examples::delivery_mechanisms::PublisherApp::publish (this=0x555555657c60) at /root/Fast-DDS/Fast-DDS/examples/cpp/delivery_mechanisms/PublisherApp.cpp:295
#17 0x00005555556010b3 in eprosima::fastdds::examples::delivery_mechanisms::PublisherApp::run (this=0x555555657c60) at /root/Fast-DDS/Fast-DDS/examples/cpp/delivery_mechanisms/PublisherApp.cpp:266
#18 0x0000555555609105 in std::__invoke_impl<void, void (eprosima::fastdds::examples::delivery_mechanisms::Application::*)(), std::shared_ptr<eprosima::fastdds::examples::delivery_mechanisms::Application>>(std::__invoke_memfun_deref, void (eprosima::fastdds::examples::delivery_mechanisms::Application::*&&)(), std::shared_ptr<eprosima::fastdds::examples::delivery_mechanisms::Application>&&) (
__f=@0x55555565e578: &virtual table offset 16, __t=...) at /usr/include/c++/11/bits/invoke.h:74
#19 0x000055555560903d in std::__invoke<void (eprosima::fastdds::examples::delivery_mechanisms::Application::*)(), std::shared_ptr<eprosima::fastdds::examples::delivery_mechanisms::Application> > (
__fn=@0x55555565e578: &virtual table offset 16) at /usr/include/c++/11/bits/invoke.h:96
#20 0x0000555555608f9d in std::thread::_Invoker<std::tuple<void (eprosima::fastdds::examples::delivery_mechanisms::Application::*)(), std::shared_ptr<eprosima::fastdds::examples::delivery_mechanisms::Application> > >::_M_invoke<0ul, 1ul> (this=0x55555565e568) at /usr/include/c++/11/bits/std_thread.h:259
#21 0x0000555555608f52 in std::thread::_Invoker<std::tuple<void (eprosima::fastdds::examples::delivery_mechanisms::Application::*)(), std::shared_ptr<eprosima::fastdds::examples::delivery_mechanisms::Application> > >::operator() (this=0x55555565e568) at /usr/include/c++/11/bits/std_thread.h:266
#22 0x0000555555608f32 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (eprosima::fastdds::examples::delivery_mechanisms::Application::*)(), std::shared_ptr<eprosima::fastdds::examples::delivery_mechanisms::Application> > > >::_M_run (this=0x55555565e560) at /usr/include/c++/11/bits/std_thread.h:211
#23 0x00007ffff60dc253 in ?? () from /lib/x86_64-linux-gnu/libstdc++.so.6
#24 0x00007ffff5c94ac3 in start_thread (arg=<optimized out>) at ./nptl/pthread_create.c:442
#25 0x00007ffff5d26850 in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:81
(gdb)
DATA-SHARING初始化writer pool
(gdb) bt
#0 eprosima::fastdds::rtps::WriterPool::init_shared_segment<eprosima::fastdds::rtps::SharedSegment<boost::interprocess::basic_managed_shared_memory<char, boost::interprocess::rbtree_best_fit<boost::interprocess::mutex_family, boost::interprocess::offset_ptr<void, unsigned int, unsigned long, 0ul>, 0ul>, boost::interprocess::iset_index>, boost::interprocess::shared_memory_object> > (this=0x5555556633c0,
writer=0x555555789150, shared_dir="") at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/DataSharing/./WriterPool.hpp:132
#1 0x00007ffff760f455 in eprosima::fastdds::rtps::WriterPool::init_shared_memory (this=0x5555556633c0, writer=0x555555789150, shared_dir="")
at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/DataSharing/./WriterPool.hpp:248
#2 0x00007ffff777f315 in eprosima::fastdds::rtps::BaseWriter::init (this=0x555555789150, att=...) at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/writer/BaseWriter.cpp:388
#3 0x00007ffff777e14f in eprosima::fastdds::rtps::BaseWriter::BaseWriter (this=0x555555789150, impl=0x555555663600, guid=..., att=..., flow_controller=0x55555567f9d0, hist=0x555555788ca0,
listen=0x55555577d930) at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/writer/BaseWriter.cpp:82
#4 0x00007ffff7794db9 in eprosima::fastdds::rtps::StatefulWriter::StatefulWriter (this=0x555555789150, pimpl=0x555555663600, guid=..., att=..., flow_controller=0x55555567f9d0, history=0x555555788ca0,
listener=0x55555577d930) at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/writer/StatefulWriter.cpp:203
#5 0x00007ffff7685df0 in operator() (__closure=0x7fffffffbc50, guid=..., watt=..., flow_controller=0x55555567f9d0, persistence=0x0, is_reliable=true)
at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/participant/RTPSParticipantImpl.cpp:1246
#6 0x00007ffff768f132 in eprosima::fastdds::rtps::RTPSParticipantImpl::create_writer<eprosima::fastdds::rtps::RTPSParticipantImpl::create_writer(eprosima::fastdds::rtps::RTPSWriter**, eprosima::fastdds::rtps::WriterAttributes&, eprosima::fastdds::rtps::WriterHistory*, eprosima::fastdds::rtps::WriterListener*, const eprosima::fastdds::rtps::EntityId_t&, bool)::<lambda(const GUID_t&, eprosima::fastdds::rtps::WriterAttributes&, eprosima::fastdds::rtps::FlowController*, eprosima::fastdds::rtps::IPersistenceService*, bool)> >(eprosima::fastdds::rtps::RTPSWriter **, eprosima::fastdds::rtps::WriterAttributes &, const eprosima::fastdds::rtps::EntityId_t &, bool, const struct {...} &) (this=0x555555663600, writer_out=0x7fffffffbe88, param=..., entity_id=..., is_builtin=false, callback=...)
at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/participant/RTPSParticipantImpl.cpp:955
#7 0x00007ffff7686266 in eprosima::fastdds::rtps::RTPSParticipantImpl::create_writer (this=0x555555663600, WriterOut=0x7fffffffbe88, watt=..., hist=0x555555788ca0, listen=0x55555577d930, entityId=...,
isBuiltin=false) at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/participant/RTPSParticipantImpl.cpp:1264
#8 0x00007ffff76d5f74 in eprosima::fastdds::rtps::RTPSDomainImpl::create_rtps_writer (p=0x5555556631f0, entity_id=..., watt=..., hist=0x555555788ca0, listen=0x55555577d930)
at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/RTPSDomain.cpp:384
#9 0x00007ffff71e4834 in eprosima::fastdds::dds::DataWriterImpl::enable (this=0x55555577d430) at /root/Fast-DDS/Fast-DDS/src/cpp/fastdds/publisher/DataWriterImpl.cpp:375
#10 0x00007ffff783570a in eprosima::fastdds::statistics::dds::DataWriterImpl::enable (this=0x55555577d430) at /root/Fast-DDS/Fast-DDS/src/cpp/statistics/fastdds/publisher/DataWriterImpl.hpp:77
#11 0x00007ffff71d9ceb in eprosima::fastdds::dds::DataWriter::enable (this=0x55555577db90) at /root/Fast-DDS/Fast-DDS/src/cpp/fastdds/publisher/DataWriter.cpp:63
#12 0x00007ffff7204a76 in eprosima::fastdds::dds::PublisherImpl::create_datawriter (this=0x55555577c8a0, topic=0x555555668dd0, impl=0x55555577d430, mask=...)
at /root/Fast-DDS/Fast-DDS/src/cpp/fastdds/publisher/PublisherImpl.cpp:277
#13 0x00007ffff720480c in eprosima::fastdds::dds::PublisherImpl::create_datawriter (this=0x55555577c8a0, topic=0x555555668dd0, qos=..., listener=0x555555657c68, mask=...,
payload_pool=std::shared_ptr<eprosima::fastdds::rtps::IPayloadPool> (empty) = {...}) at /root/Fast-DDS/Fast-DDS/src/cpp/fastdds/publisher/PublisherImpl.cpp:257
#14 0x00007ffff7202b0c in eprosima::fastdds::dds::Publisher::create_datawriter (this=0x55555577ced0, topic=0x555555668dd0, qos=..., listener=0x555555657c68, mask=...,
payload_pool=std::shared_ptr<eprosima::fastdds::rtps::IPayloadPool> (empty) = {...}) at /root/Fast-DDS/Fast-DDS/src/cpp/fastdds/publisher/Publisher.cpp:119
#15 0x0000555555600ae1 in eprosima::fastdds::examples::delivery_mechanisms::PublisherApp::PublisherApp (this=0x555555657c60, config=..., topic_name="delivery_mechanisms_topic")
at /root/Fast-DDS/Fast-DDS/examples/cpp/delivery_mechanisms/PublisherApp.cpp:221
#16 0x00005555555daf5f in __gnu_cxx::new_allocator<eprosima::fastdds::examples::delivery_mechanisms::PublisherApp>::construct<eprosima::fastdds::examples::delivery_mechanisms::PublisherApp, eprosima::fastdds::examples::delivery_mechanisms::CLIParser::delivery_mechanisms_config const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&> (this=0x7fffffffdddf,
__p=0x555555657c60) at /usr/include/c++/11/ext/new_allocator.h:162
#17 0x00005555555daafc in std::allocator_traits<std::allocator<eprosima::fastdds::examples::delivery_mechanisms::PublisherApp> >::construct<eprosima::fastdds::examples::delivery_mechanisms::PublisherApp, eprosima::fastdds::examples::delivery_mechanisms::CLIParser::delivery_mechanisms_config const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&> (__a=...,
__p=0x555555657c60) at /usr/include/c++/11/bits/alloc_traits.h:516
#18 0x00005555555da3d7 in std::_Sp_counted_ptr_inplace<eprosima::fastdds::examples::delivery_mechanisms::PublisherApp, std::allocator<eprosima::fastdds::examples::delivery_mechanisms::PublisherApp>, (__gnu_cxx::_Lock_policy)2>::_Sp_counted_ptr_inplace<eprosima::fastdds::examples::delivery_mechanisms::CLIParser::delivery_mechanisms_config const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&> (this=0x555555657c50, __a=...) at /usr/include/c++/11/bits/shared_ptr_base.h:519
#19 0x00005555555d9d73 in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::__shared_count<eprosima::fastdds::examples::delivery_mechanisms::PublisherApp, std::allocator<eprosima::fastdds::examples::delivery_mechanisms::PublisherApp>, eprosima::fastdds::examples::delivery_mechanisms::CLIParser::delivery_mechanisms_config const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&> (this=0x7fffffffdf88, __p=@0x7fffffffdf80: 0x0, __a=...) at /usr/include/c++/11/bits/shared_ptr_base.h:650
#20 0x00005555555d9ade in std::__shared_ptr<eprosima::fastdds::examples::delivery_mechanisms::PublisherApp, (__gnu_cxx::_Lock_policy)2>::__shared_ptr<std::allocator<eprosima::fastdds::examples::delivery_mechanisms::PublisherApp>, eprosima::fastdds::examples::delivery_mechanisms::CLIParser::delivery_mechanisms_config const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&> (this=0x7fffffffdf80, __tag=...) at /usr/include/c++/11/bits/shared_ptr_base.h:1342
#21 0x00005555555d97e1 in std::shared_ptr<eprosima::fastdds::examples::delivery_mechanisms::PublisherApp>::shared_ptr<std::allocator<eprosima::fastdds::examples::delivery_mechanisms::PublisherApp>, eprosima::fastdds::examples::delivery_mechanisms::CLIParser::delivery_mechanisms_config const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&> (this=0x7fffffffdf80,
__tag=...) at /usr/include/c++/11/bits/shared_ptr.h:409
#22 0x00005555555d948f in std::allocate_shared<eprosima::fastdds::examples::delivery_mechanisms::PublisherApp, std::allocator<eprosima::fastdds::examples::delivery_mechanisms::PublisherApp>, eprosima::fastdds::examples::delivery_mechanisms::CLIParser::delivery_mechanisms_config const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&> (__a=...)
at /usr/include/c++/11/bits/shared_ptr.h:863
#23 0x00005555555d916d in std::make_shared<eprosima::fastdds::examples::delivery_mechanisms::PublisherApp, eprosima::fastdds::examples::delivery_mechanisms::CLIParser::delivery_mechanisms_config const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&> () at /usr/include/c++/11/bits/shared_ptr.h:879
#24 0x00005555555d885a in eprosima::fastdds::examples::delivery_mechanisms::Application::make_app (config=..., topic_name="delivery_mechanisms_topic")
at /root/Fast-DDS/Fast-DDS/examples/cpp/delivery_mechanisms/Application.cpp:41
--Type <RET> for more, q to quit, c to continue without paging--
#25 0x0000555555603cb9 in main (argc=4, argv=0x7fffffffe398) at /root/Fast-DDS/Fast-DDS/examples/cpp/delivery_mechanisms/main.cpp:54
(gdb) p shared_dir
$1 = ""
(gdb) s
136 segment_id_ = writer->getGuid();
(gdb) n
137 segment_name_ = generate_segment_name(shared_dir, segment_id_);
(gdb) n
138 std::unique_ptr<T> local_segment;
(gdb) p segment_id_
$2 = {guidPrefix = {static size = 12, value = "\001\017\325\227\065\253k[\000\000\000"}, entityId = {static size = 4, value = "\000\000\001\003"}}
(gdb) p segment_name_
$3 = "fast_datasharing_01.0f.d5.97.35.ab.6b.5b.00.00.00.00_0.0.1.3"
(gdb)
2.2接收
DataSharingListener::run
3相同点:均通过共享内存进行通信
3.1SHM
在linux下,共享内存默认使用/dev/shm临时文件系统。如下是使用SHM的时候,启动了一个publisher和subscriber,在/dev/shm中创建的文件。
(1)SHM使用了端口的概念
fastdds中对SHM的使用,借鉴了UDP的通信方式,也使用了端口这个概念。在通信中,端口的表示这个通信通道被哪个进程使用。
上图中7400、7410、7411、7412、7413端口与下文中UDP的端口,作用是相同的。
fastdds:传输层端口号计算规则_fastdds mechanism-CSDN博客
下图中显示了SHM传输层的通信时序图。可以看到port是用来传输descriptor的,真正的数据是通过fastdds_56f311faa23f4389这样的文件进行传输的。descriptor在网络通信中是经常见到的一个概念,作为数据的描述符描述了数据的基本信息,比如数据的起始地址,数据的长度等。在网卡驱动中,经常说的BD,全称是buffer descriptor,也是网络收发包的描述符,与这里的descriptor是类似的。
(2)文件的作用
①锁,进程间同步
上图中以sem开头,以mutex结尾的文件用于进程间同步。以sem.xxx.port7413_mutex为例,如果一个进程监听7413这个端口的数据,那么就会wait这个锁;如果有进程要向7413发数据,那么写好数据之后,便会notiy这个锁,从而接收方就会被唤醒,进而接收并处理数据。
②fastdds_port7413
这个是端口文件,用于传输descriptor,如果有进程要向7413发送数据,那么便会将descriptor写到这个文件中。
③fastdds_56f311faa23f4389
数据文件,进程要发送的数据都会写到这个文件中。
也就是说,通过共享内存发送数据的时候,首先要将数据写到这个文件中,然后再填充一个desctiptor,将descriptor文件放到port文件中,最后调用notify,环形对应port的进程。
当我们想要了解这些文件是什么用的时候,除了从文件的名字、代码、文档中去查找确认之外,还可以通过在gdb中对shm_open设置断点,从而查看调用栈的方式去学习。/dev/shm中的文件通过shm_open打开。
3.2DATA-SHARING
如下是使用DATA-SHARING,启动了一个publisher和subscriber的情况下,在/dev/shm下创建的文件。从下图中可以看出,使用DATA-SHARING 的时候,/dev/shm下的文件,除了使用SHM时的哪些文件之外,还多了一个文件,比如fast_datasharing_01.0f.d5.97.90.46.db.3a.00.00.00.00_0.0.1.3。
那么文件fast_datasharing_01.0f.d5.97.90.46.db.3a.00.00.00.00_0.0.1.3是做什么用的呢?
首先我们可以在gdb中对shm_open设断点,查看这个函数的调用栈。调用栈如下图所示:
从中我们可以看到这个函数,可以大胆猜测在使用DATA-SHARING的时候,这个文件用于数据的传输。而原来与SHM传输层相同的文件,用于服务发现阶段。
#8 0x00007ffff7610de5 in eprosima::fastdds::rtps::WriterPool::init_shared_segment<eprosima::fastdds::rtps::SharedSegment<boost::interprocess::basic_managed_shared_memory<char, boost::interprocess::rbtree_best_fit<boost::interprocess::mutex_family, boost::interprocess::offset_ptr<void, unsigned int, unsigned long, 0ul>, 0ul>, boost::interprocess::iset_index>, boost::interprocess::shared_memory_object> > (
this=0x5555556633c0, writer=0x555555789150, shared_dir="") at /root/Fast-DDS/Fast-DDS/src/cpp/rtps/DataSharing/./WriterPool.hpp:183
4不同点
(1)SHM是标准的传输层port based的传输层,DATA-SHARING不是
(2)相对于SHM,DATA-SHARING在发送侧少一次拷贝
(3)SHM数据和Descriptor保存在了不同的文件中,数据保存在fastdds_683379ac5b6c3ba5中,descriptor发送到fastdds_port7413中;而DATA-SHARING的数据和描述符保存在了同一块内存中,下边的函数,node就相当于SHM中的descriptor,可以看到node和数据所在的内存是爱着的。
void add_to_shared_history(
const CacheChange_t* cache_change)
{
assert(cache_change);
assert(cache_change->serializedPayload.data);
assert(cache_change->serializedPayload.payload_owner == this);
assert(free_history_size_ > 0);
// Fill the payload metadata with the change info
PayloadNode* node = PayloadNode::get_from_data(cache_change->serializedPayload.data);
node->status(ALIVE);
node->data_length(cache_change->serializedPayload.length);
node->source_timestamp(cache_change->sourceTimestamp);
node->writer_GUID(cache_change->writerGUID);
node->instance_handle(cache_change->instanceHandle);
if (cache_change->write_params.related_sample_identity() != SampleIdentity::unknown())
{
node->related_sample_identity(cache_change->write_params.related_sample_identity());
}
// Set the sequence number last, it signals the data is ready
node->sequence_number(cache_change->sequenceNumber);
// Add it to the history
history_[static_cast<uint32_t>(descriptor_->notified_end)] = segment_->get_offset_from_address(node);
EPROSIMA_LOG_INFO(DATASHARING_PAYLOADPOOL, "Change added to shared history"
<< " with SN " << cache_change->sequenceNumber);
advance(descriptor_->notified_end);
--
(4)SHM发送侧和接收侧的notify和wait是以sem.fastdds_portxxxx_mutex为桥梁;DATA-SHARING的notify和wait就是以数据传输文件为桥梁。