公司做网站建设,android应用开发基础,现在什么语言做网站最好,2018年政务公开与网站建设总结CyberRT共享内存类图
共享内存消息发布 数据用共享内存发布时#xff0c;首先会创建ShmTransmitter对象#xff0c;包含两个主要成员segment和notifier#xff0c;Segment用于创建共享内存#xff08;上面绿色部分#xff09;#xff0c;Notifer 最终构建ReadableInfo通…CyberRT共享内存类图
共享内存消息发布 数据用共享内存发布时首先会创建ShmTransmitter对象包含两个主要成员segment和notifierSegment用于创建共享内存上面绿色部分Notifer 最终构建ReadableInfo通知给其他进程。 使用哪个ConditionNotifier- notify或MulticastNotifier-notify是在创建时根据配置文件决定的。 ConditionNotifier 在构建时会创建Indicator对象保存到共享内存中。 调ConditionNotifier- notify实际时将ReadableInfo保存到Indicator对象。
ConditionNotifier 共享内存数据接收 在接收数据时也会创建同样的共享内存。如果共享内存存在则直接打开。 在接收端也有同样的共享内存操作ConditionNotifier 。 ShmDispatcher会持有多个通道segment用std::unordered_mapchannelid, segment表示。 同时启动一个后台线程ThreadFunc 线程轮询处理消息回调。
void ShmDispatcher::ThreadFunc() {ReadableInfo readable_info;// 轮询处理while (!is_shutdown_.load()) {// 100ms, Listen会转换100000 ms对比seq如果不等处理消息。每次轮询会等待递减50ms。if (!notifier_-Listen(100, readable_info)) {ADEBUG listen failed.;continue;}if (readable_info.host_id() ! host_id_) {ADEBUG shm readable info from other host.;continue;}//从共享内存Indicator中读出的数据uint64_t channel_id readable_info.channel_id();uint32_t block_index readable_info.block_index();{ReadLockGuardAtomicRWLock lock(segments_lock_);if (segments_.count(channel_id) 0) {continue;}// check block index// std::unordered_mapuint64_t, uint32_t previous_indexes_; // 保存key: channelID, value: block_indexif (previous_indexes_.count(channel_id) 0) {previous_indexes_[channel_id] UINT32_MAX;}uint32_t previous_index previous_indexes_[channel_id];if (block_index ! 0 previous_index ! UINT32_MAX) {if (block_index previous_index) {ADEBUG Receive SAME index block_index of channel channel_id;} else if (block_index previous_index) {ADEBUG Receive PREVIOUS message. last: previous_index , now: block_index;} else if (block_index - previous_index 1) {ADEBUG Receive JUMP message. last: previous_index , now: block_index;}}previous_index block_index;ReadMessage(channel_id, block_index);}}
}MulticastNotifier共享内存数据接收
MulticastNotifier时采用多播socket实现的默认
std::string mcast_ip(239.255.0.100);
uint16_t mcast_port 8888;创建两个socket notify_fd_ 用于发生消息listen_addr用于接收消息。 在发送端调用Notify时时调的MulticastNotifier::Nofify(const ReadableInfo info)
bool MulticastNotifier::Notify(const ReadableInfo info) {if (is_shutdown_.load()) {return false;}std::string info_str;info.SerializeTo(info_str);ssize_t nbytes sendto(notify_fd_, info_str.c_str(), info_str.size(), 0,(struct sockaddr*)notify_addr_, sizeof(notify_addr_));return nbytes 0;
}接收端用同样的方式轮询
bool MulticastNotifier::Listen(int timeout_ms, ReadableInfo* info) {if (is_shutdown_.load()) {return false;}if (info nullptr) {AERROR info nullptr.;return false;}struct pollfd fds;fds.fd listen_fd_;fds.events POLLIN;int ready_num poll(fds, 1, timeout_ms);if (ready_num 0) {char buf[32] {0}; // larger than ReadableInfo::kSizessize_t nbytes recvfrom(listen_fd_, buf, 32, 0, nullptr, nullptr);if (nbytes -1) {AERROR fail to recvfrom, strerror(errno);return false;}return info-DeserializeFrom(buf, nbytes);} else if (ready_num 0) {ADEBUG timeout, no readableinfo.;} else {if (errno EINTR) {AINFO poll was interrupted.;} else {AERROR fail to poll, strerror(errno);}}return false;
}bool Block::TryLockForWrite() {int32_t rw_lock_free kRWLockFree;//lock_num_ rw_lock_free, kWriteExclusive赋值给lock_num_返回true//lock_num_ ! rw_lock_free, lock_num_赋值给rw_lock_free,返回falseif (!lock_num_.compare_exchange_weak(rw_lock_free, kWriteExclusive,std::memory_order_acq_rel,std::memory_order_relaxed)) {ADEBUG lock num: lock_num_.load();return false;}return true;
}总结 1、CyberRT的共享内存读写都时需要加锁的。 2、每次写数据可以是不连续的block 3、每次当Block.lock_num_ 0空闲0有读操作 -1 : 写操作。 效率不是高。