当前位置: 首页 > news >正文

做软件跟做网站哪个难网站建设及网页设计

做软件跟做网站哪个难,网站建设及网页设计,dw建网站怎么做,wordpress防注册项目扩展二#xff1a;消息拉取功能的实现 一、回顾一下消息推送功能是如何实现的二、设计消息拉取功能1.服务器如何处理2.定义Request和Response1.定义Request2.proto文件 三、服务器实现消息拉取1.业务模块的实现#xff1a;信道模块2.消费者管理模块实现O(1)获取消费者1.目… 项目扩展二消息拉取功能的实现 一、回顾一下消息推送功能是如何实现的二、设计消息拉取功能1.服务器如何处理2.定义Request和Response1.定义Request2.proto文件 三、服务器实现消息拉取1.业务模块的实现信道模块2.消费者管理模块实现O(1)获取消费者1.目前的情形2.加一个哈希表3.如何做4.代码 3.信道模块实现4.broker服务器注册响应业务函数 四、客户端修改五、修改消息推送逻辑--设计1.考量2.要不要给BOTH消息推送机制呢?3.设计 六、实现1.修改消息proto文件2.修改队列消息管理模块1.成员的修改2.recovery恢复历史消息的修改3.发布消息的修改3.front的修改4.clear的修改 3.总体消息管理模块修改1.PublishMessage加一个参数2.获取链表队头消息修改 4.虚拟机模块的修改1.发布消息增加一个参数1.虚拟机模块2.虚拟机管理模块 2.推送和拉取消息的修改1.虚拟机模块2.虚拟机管理模块 5.proto网络通信协议修改1.BasicPublishRequest 6.信道模块修改1.发布消息的修改2.publishCallback的修改1.坑点--连锁BUG2.解决方案3.实现接口4.publishCallback的修改 3.拉取消息的修改 7.客户端修改 七、验证1.消息拉取功能与恢复功能联合测试1.测试11.生产者2.消费者3.演示 2.测试2 -- 演示 2.PULL测试2.BOTH测试 一、回顾一下消息推送功能是如何实现的 这其中一共有三个角色 二、设计消息拉取功能 给客户端多提供一个服务消息拉取服务 其实就是从该消费者订阅的队列当中取出一个消息推送给该消费者 只不过这个消息拉取是由消费者主动向我们服务器发起请求的 因此我们要考虑两点 服务器如何主动向消费者发送消息网络通信协议中Request和Response的定义 1.服务器如何处理 要完成主动向消费者发送消息这一任务需要两个模块消费者管理模块和虚拟机模块 从消费者管理模块当中取出消费者从虚拟机模块当中取出消息 然后调用该消费者的消费处理回调函数向客户端发送BasicConsumeResponse给客户端它需要的参数告诉客户端“你可以消费了” 总体而言并不难因为我们早已把具体功能模块化然后需要使用哪些功能找对应的负责人【模块句柄】即可 这就是面向对象的模块化编程的独特魅力 2.定义Request和Response Response的话依然只需要BasicCommonResponse和BasicConsumeResponse即可 BasicCommonResponse是基础响应对客户端的每条Request都有ACK 因此我们只需要完成定义Request的任务即可 1.定义Request 首先req_id和channel_id必然需要。下面我们根据消息拉取这一功能考虑所需参数 首先消费者拉取消息因此我们需要找到该消费者所以需要vhost_namequeue_name和consumer_tag 有了这三个字段我们就能从消费者管理模块拿取该消费者对应信息 然后我们就可以推送消息了无需消费者再提供任何信息 2.proto文件 因此我们往proto文件当中新增一个message消息体 //8. 消息的拉取 message BasicPullRequest {string req_id 1;string channel_id 2;string vhost_name 3;string queue_name 4;string consumer_tag 5; }然后编译 protoc --cpp_out. mq_proto.proto 三、服务器实现消息拉取 1.业务模块的实现信道模块 我们的信道是实现具体业务提供具体服务模块他内部整合了虚拟机模块和消费者管理模块。 因此我们只需要加一个函数它的任务就是 根据consumer_tag这三个字段 从消费者管理模块当中取出消费者从虚拟机模块当中取出消息调用该消费者的消费处理回调函数如果该消费者有自动确认标志则进行自动确认 其中第3解耦且耗时第4步我们想要它在第三步结束之后才开始 我们想要解放信道服务线程因此我们把第4步跟第3步放到一起交给异步工作线程 2.消费者管理模块实现O(1)获取消费者 我们的消费者管理模块还没有实现获取消费者这一接口而这一接口在增加了消息拉取功能之后又很常用 且基于之前的数据结构来实现效率为O(N)所以我们需要改进一下提高效率 1.目前的情形 std::vectorConsumer::ptr _consumer_vec; size_t _seq;之前为了实现RR轮转的负载均衡我们通过vector和一个轮询序号实现了队列消费者管理模块 新增删除都是O(N)【因为新增和删除消费者的需求频率并不高所以没什么大碍】主要是负载均衡select是O(1)所以设计总体来说还可以 但是目前我们想要增加的是根据consumer_tag来O(1)获取消费者而vector只能O(N)所以不满足需求 2.加一个哈希表 这里的哈希表的value_type不能是vector的迭代器因为vector的扩容和删除都有迭代器失效问题 删除导致的迭代器失效问题还好解决但是扩容导致的迭代器失效问题不好解决因为vector的扩容被它通过封装屏蔽掉了倒是也能检测【通过不断检测capcity()】不过非常不优雅 所以哈希表的value_type搞成Consumer::ptr的话虽然可以快速查找但是这样的话哈希表的用途就不大的 因为哈希表只能让查询操作变为O(1)但是删除还是O(N)因为vector还是要遍历删除的 3.如何做 哈希表不能跟vector打好配合所以我们将vector改为list将RR轮转的负载均衡变为LRU式的负载均衡 每次select时从队头取最近最少使用的消费者访问之后放到队尾 4.代码 _consumer_list的队头是最近最少使用的队尾是最近访问的 因此 select获取消费者之后将该消费者挪到队尾get获取消费者之后将该消费者挪到队尾新增消费者将该消费者放到队头因为该消费者的负载必为0 class QueueConsumerManager { public:using ptr std::shared_ptrQueueConsumerManager;QueueConsumerManager(const std::string vhost_name, const std::string qname): _vhost_name(vhost_name), _qname(qname) {}// 1. 新增消费者[只有想要订阅当前队列消息的消费者才会调用该函数]Consumer::ptr createConsumer(const std::string consumer_tag, const ConsumerCallback callback, bool auto_ack){// 1. 加锁并查找是否有该消费者std::unique_lockstd::mutex ulock(_mutex);auto iter_map _consumer_map.find(consumer_tag);if (iter_map ! _consumer_map.end()){iter_type iter_list iter_map-second; // iter_list是链表的迭代器return *iter_list;}// 2. 从队头插入该消费者Consumer::ptr cp std::make_sharedConsumer(consumer_tag, callback, _vhost_name, _qname, auto_ack);_consumer_list.push_front(cp);_consumer_map.insert(std::make_pair(consumer_tag, _consumer_list.begin()));return cp;}void removeConsumer(const std::string consumer_tag){// 加锁并删除该消费者std::unique_lockstd::mutex ulock(_mutex);auto iter_map _consumer_map.find(consumer_tag);if (iter_map _consumer_map.end())return;iter_type iter_list iter_map-second; // iter_list是链表的迭代器// 删除_consumer_list.erase(iter_list);_consumer_map.erase(iter_map);}Consumer::ptr selectConsumer(){// 0. 加锁并判断是否为空std::unique_lockstd::mutex ulock(_mutex);if (_consumer_list.empty()){default_warning(获取消费者失败因为该队列没有消费者虚拟机名称%s 队列名,_vhost_name.c_str(),_qname.c_str());return Consumer::ptr();}// 1. 拿到队头消费者Consumer::ptr cp _consumer_list.front();// 2. 将队头消费者移到队尾_consumer_list.splice(_consumer_list.end(), _consumer_list, _consumer_list.begin());// 因为splice是转移节点不会导致迭代器失效所以无需更新哈希表return cp;}bool exist(const std::string consumer_tag){std::unique_lockstd::mutex ulock(_mutex);return _consumer_map.count(consumer_tag) 0;}bool empty(){std::unique_lockstd::mutex ulock(_mutex);return _consumer_map.empty();}void clear(){std::unique_lockstd::mutex ulock(_mutex);_consumer_map.clear();}// 支持通过消费者tag来获取消费者这里用哈希表来提高查询效率// 这里的哈希表的value_type不能是vector的迭代器因为vector的扩容和删除都有迭代器失效问题// 删除导致的迭代器失效问题还好解决但是扩容导致的迭代器失效问题不好解决因为vector的扩容被它通过封装屏蔽掉了倒是也能检测【通过不断检测capcity()】不过非常不优雅// 所以哈希表的value_type搞成Consumer::ptr的话虽然可以快速查找但是这样的话哈希表的用途就不大的// 因为哈希表只能让查询操作变为O(1)但是删除还是O(N)因为vector还是要遍历的// 那能否就单纯只有一个vector呢不能消费者删除需求并不高所以曾经我们用的vector但是有了消息拉取功能之后获取消费者的需求就很高了// 所以不能只有一个vector必须要有一个哈希表// 而哈希表不能跟vector打好配合所以我们将vector改为list将RR轮转的负载均衡变为LRU式的负载均衡// 每次select时从队头取最近最少使用的消费者访问之后放到队尾Consumer::ptr getConsumer(const std::string consumer_tag){std::unique_lockstd::mutex ulock(_mutex);auto iter_map _consumer_map.find(consumer_tag);if (iter_map ! _consumer_map.end()){iter_type iter_list iter_map-second; // iter_list是链表的迭代器// 将iter_list移动到队尾_consumer_list.splice(_consumer_list.end(), _consumer_list, iter_list);return *iter_list;}return Consumer::ptr();}private:using iter_type std::listConsumer::ptr::iterator;std::string _vhost_name;std::string _qname;std::mutex _mutex;std::listConsumer::ptr _consumer_list;std::unordered_mapstd::string, iter_type _consumer_map; };然后在总体消费者管理模块当中添加 Consumer::ptr getConsumer(const std::string vhost_name, const std::string qname, const std::string consumer_tag) {std::ostringstream oss;oss 获取消费者失败因为未能找到该队列消费者管理模块qname qname \n;QueueConsumerManager::ptr qcmp getQueueConsumerManager(vhost_name, qname, oss);if (qcmp.get() nullptr){return Consumer::ptr();}return qcmp-getConsumer(consumer_tag); }3.信道模块实现 拿到消费者和消息封装异步任务抛入线程池 异步任务 调用该消费者的消费处理回调函数auto_ack的问题 返回基础响应 不要忘了服务器所描述的消费者的消费处理回调函数的功能仅仅是 向消费者发送基础消费处理响应BasicConsumeResponse void basicPull(const BasicPullRequestPtr req) {// 1. 拿到该消费者Consumer::ptr cp _consumer_manager_ptr-getConsumer(req-vhost_name(), req-queue_name(), req-consumer_tag());if(cp.get()nullptr){default_error(拉取消息失败因为消费者不存在消费者tag%s,req-consumer_tag().c_str());basicResponse(req-req_id(), req-channel_id(), false);return;}// 2. 拿到消息MessagePtr mp _vhost_manager_ptr-basicConsume(req-vhost_name(), req-queue_name());if(mp.get()nullptr){default_error(拉取消息失败因为该队列没有待推送消息队列名%s,req-queue_name().c_str());basicResponse(req-req_id(), req-channel_id(), false);return;}// 3. 封装异步任务,抛入线程池auto func [cp, mp, req, this](){// 3. 调用该消费者的消费处理回调函数cp-_callback(cp-_consumer_tag, mp-mutable_valid()-mutable_properities(), mp-valid().body());// 4. auto_ack的问题if (cp-_auto_ack){this-_vhost_manager_ptr-basicAck(req-vhost_name(), req-queue_name(), mp-valid().properities().msg_id());}};_pool_ptr-put(func);// 4. 基础相应basicResponse(req-req_id(),req-channel_id(),true); }4.broker服务器注册响应业务函数 _dispatcher.registerMessageCallbackBasicPullRequest(std::bind(Server::OnBasicPull,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));// 8. 消息拉取 void OnBasicPull(const muduo::net::TcpConnectionPtr conn, const BasicPullRequestPtr req, muduo::Timestamp) {// 1. 先看有无该连接Connection::ptr myconn _connection_manager_ptr-getConnecion(conn);if (myconn.get() nullptr){default_info(确认消息时,没有找到连接对应的Connection对象);return;}// 2. 获取信道Channel::ptr mychannel myconn-getChannel(req-channel_id());if (mychannel.get() nullptr){default_info(确认消息失败因为获取信道失败);return;}mychannel-basicPull(req); }四、客户端修改 客户端只需要修改Channel即可 // 消息拉取 bool BasicPull() {if (_consumer.get() nullptr){default_error(消息拉取失败该信道没有关联消费者);return false;}BasicPullRequest req;std::string rid UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);req.set_vhost_name(_consumer-_vhost_name);req.set_consumer_tag(_consumer-_consumer_tag);req.set_queue_name(_consumer-_queue_name);// 发送请求_codec-send(_conn, req);std::ostringstream oss;BasicCommonResponsePtr resp waitResponse(rid);if (resp-ok()){default_info(消息拉取成功: %s,_consumer-_consumer_tag.c_str());}else{default_info(消息拉取失败: %s,_consumer-_consumer_tag.c_str());}return resp-ok(); }五、修改消息推送逻辑–设计 1.考量 我们之前的消息推送逻辑是当生产者发布消息之后我们会立刻找消费者去推送该消息如果没有消费者那么就会丢弃该消息 这个逻辑在之前只有消息推送功能时是正确的因为不丢弃消息就是浪费资源 而现在我们实现了消息拉取功能此时这种情况就可以不丢弃消息而将其存放到队列当中等待消费者进行拉取 但是推送的消息要不要跟拉取的消息分割一下呢 让生产者发布消息时选择是推送的还是拉取的还是都有 因为一些消息具有实时性更希望快速被处理就可以放到待推送当中。而那些对实时性没这么高要求的就可以放到待确认 2.要不要给BOTH消息推送机制呢? BOTH的意思是这个消息既放到待推送链表当中又放到待拉取链表当中 因为我们的消息推送是放到异步线程池当中去跑的所以存在拉取快于推送的情况 哪个快听谁的如果服务器先推送那么这个消息就按推送走 如果服务器先拉取那么这个消息就按拉取走 因为我们拉取之后会将该消息从推送当中删除 推送之后会将该消息从拉取当中删除而且操作都因为加了互斥锁而成为了原子操作 所以不会存在同一消息被消费2次的情形 因为消息被消费只有3种情况 推送推送时会将消息从拉取当中删除所以不怕该消息同时被拉取推送进行之前被拉取此时推送就找不到消息了没事推送失败之后被拉取推送时会从拉取当中删除推送失败之后会将消息放到拉取当中所以没事 因此BOTH是可以的所以我们给上BOTH 3.设计 如何修改呢 消息推送如果失败也是将其放到待拉取消息链表当中等待消费者主动拉取 持久化消息恢复之后将其放到待拉取消息链表当中等待消费者主动拉取 现在有个问题 既然持久化的消息在恢复之后是直接放到待拉取消息链表当中的那么有必要将消息的推送机制一并持久化吗 没必要 六、实现 1.修改消息proto文件 // 3. 消息的推送机制推送/拉取/推拉 enum PublishMechanism {UNKNOWNMECHANISM 0;PUSH 1;PULL 2;BOTH 3; }// 4. 消息的基本属性 message BasicProperities {string msg_id 1;DeliveryMode mode 2;string routing_key 3; }message Message {message ValidLoad{string body 1;BasicProperities properities 2;string valid 3;// 因为bool的true/false在protobuf当中持久化后的长度不同,因此我们不用bool,而是用0代表无效,1代表有效}ValidLoad valid 1;uint64 offset 2;uint64 len 3; PublishMechanism mechanism 4; }protoc --cpp_out. mq_msg.proto2.修改队列消息管理模块 1.成员的修改 内部类 struct iter_node {using iter_type std::listMessagePtr::iterator;iter_type push_iter;iter_type pull_iter; };std::listMessagePtr _waitpush_list; std::listMessagePtr _waitpull_list; std::unordered_mapMessagePtr,iter_node _waitpublish_map;2.recovery恢复历史消息的修改 把别忘了gc恢复的待拉取消息链表存入_waitpublish_map当中!! void recovery() {std::unique_lockstd::mutex ulock(_mutex);// 1. 恢复历史消息将消息放入待拉取消息链表当中_waitpull_list _mapper.gc();// 2. 遍历待拉取消息链表将其中的消息都放入_waitpublish_map当中for (auto iter_list _waitpull_list.begin(); iter_list ! _waitpull_list.end(); iter_list){MessagePtr mp *iter_list;_waitpublish_map[mp].pull_iter iter_list;_waitpublish_map[mp].push_iter _waitpush_list.end();}// 3. 将gc后的消息放到持久化哈希表中for (auto mp : _waitpull_list){_durable_map[mp-valid().properities().msg_id()] mp;}// 2. 更新持久化消息总数和有效消息总数_total_count _valid_count _durable_map.size(); }3.发布消息的修改 bool publishMessage(const BasicProperities *bp, const std::string body, DeliveryMode mode, PublishMechanism mechanism) {// 消息能否持久化取决于队列是否持久化,因此消息的持久化与否不仅要看bp当中的mode,还要看DeliveryMode mode// 只有当DeliveryMode mode是持久化时,才看bp当中的mode,否则一律不持久化// 1. 构建消息智能指针MessagePtr mp std::make_sharedMessage();mp-mutable_valid()-set_body(body);mp-mutable_valid()-set_valid(1);mp-mutable_valid()-mutable_properities()-set_msg_id(bp-msg_id());mp-mutable_valid()-mutable_properities()-set_routing_key(bp-routing_key());DeliveryMode final_mode (mode DURABLE bp-mode() DURABLE) ? DURABLE : UNDURABLE;mp-mutable_valid()-mutable_properities()-set_mode(final_mode);mp-set_mechanism(mechanism);// 加锁std::unique_lockstd::mutex ulock(_mutex);// 2. 看是否需要持久化if (final_mode DURABLE){if (!_mapper.insert(mp)){default_error(发布消息失败, 因为消息持久化失败, 消息ID: %s,bp-msg_id().c_str());return false;}// 放到持久化哈希表中_durable_map[bp-msg_id()] mp;_total_count;_valid_count;}// 3. 根据消息发布机制来进行消息存储// 1. 放到待推送链表if (mechanism PUSH){_waitpush_list.push_back(mp);_waitpublish_map[mp].push_iter std::prev(_waitpush_list.end());_waitpublish_map[mp].pull_iter _waitpull_list.end();}else if (mechanism PULL){_waitpull_list.push_back(mp);_waitpublish_map[mp].push_iter _waitpush_list.end();_waitpublish_map[mp].pull_iter std::prev(_waitpull_list.end());}else if (mechanism BOTH){_waitpush_list.push_back(mp);_waitpull_list.push_back(mp);_waitpublish_map[mp].push_iter std::prev(_waitpush_list.end());_waitpublish_map[mp].pull_iter std::prev(_waitpull_list.end());}else{default_error(发布消息失败因为消息的发布机制未知, 消息ID: %s,bp-msg_id().c_str());return false;}return true; }3.front的修改 抽离出一个共用函数 MessagePtr front(std::listMessagePtr main_list, std::listMessagePtr sub_list, bool ispush) {std::unique_lockstd::mutex ulock(_mutex);// 0.加锁并判空if (main_list.empty()){return MessagePtr();}// 1.从链表取消息设置待确认状态MessagePtr mp main_list.front();main_list.pop_front();_waitack_map[mp-valid().properities().msg_id()] mp;// 2.在另一个链表当中进行删除auto iter_hash _waitpublish_map.find(mp);// 假设它是push,那么另一个链表就是pulliter_node::iter_type iter_list iter_hash-second.pull_iter;if (!ispush){iter_list iter_hash-second.push_iter;}if (iter_list ! sub_list.end()){sub_list.erase(iter_list);}// 3.在哈希表当中删除_waitpublish_map.erase(iter_hash);return mp; }从待推送链表取消息 MessagePtr push_list_front() {return front(_waitpush_list,_waitpull_list,true); }从待拉取链表取消息 MessagePtr pull_list_front() {return front(_waitpull_list,_waitpush_list,false); }4.clear的修改 // 需要提供销毁该队列所有信息的方法(删除队列时要用) void clear() {std::unique_lockstd::mutex ulock(_mutex);_mapper.removeFile();_waitpush_list.clear();_waitpull_list.clear();_waitpublish_map.clear();_waitack_map.clear();_durable_map.clear();_valid_count _total_count 0; }3.总体消息管理模块修改 1.PublishMessage加一个参数 bool publishMessage(const std::string qname, const BasicProperities *bp, const std::string body, DeliveryMode mode, PublishMechanism mechanism) {QueueMessageManager::ptr qmmp;{std::unique_lockstd::mutex ulock(_mutex);auto iter _qmsg_map.find(qname);if (iter _qmsg_map.end()){default_error(发布消息失败,因为该队列的消息管理模块句柄尚未初始化);return false;}qmmp iter-second;}return qmmp-publishMessage(bp, body, mode, mechanism); }2.获取链表队头消息修改 MessagePtr push_list_front(const std::string qname) {QueueMessageManager::ptr qmmp;{std::unique_lockstd::mutex ulock(_mutex);auto iter _qmsg_map.find(qname);if (iter _qmsg_map.end()){default_error(获取待推送消息失败,因为该队列的消息管理模块句柄尚未初始化);return MessagePtr();}qmmp iter-second;}return qmmp-push_list_front(); }MessagePtr pull_list_front(const std::string qname) {QueueMessageManager::ptr qmmp;{std::unique_lockstd::mutex ulock(_mutex);auto iter _qmsg_map.find(qname);if (iter _qmsg_map.end()){default_error(获取待拉取消息失败,因为该队列的消息管理模块句柄尚未初始化);return MessagePtr();}qmmp iter-second;}return qmmp-pull_list_front(); }至此,消息模块就搞完了,下面顺着这个层状结构往上找,去修改虚拟机 4.虚拟机模块的修改 1.发布消息增加一个参数 1.虚拟机模块 bool basicPublish(const std::string qname, const BasicProperities *bp, const std::string body,PublishMechanism mechanism) {// 在这里能够知道队列的持久化方式,因此就能够传递durable了// 1. 查找该队列的ptr,看是否存在,拿到durable// 2. 发布消息MsgQueue::ptr mqp _mqmp-getMsgQueue(qname);if (mqp.get() nullptr){default_error(发布消息失败,因为该队列不存在, 队列名: %s,qname.c_str());return false;}return _mmp-publishMessage(qname, bp, body, (mqp-durable) ? DURABLE : UNDURABLE,mechanism); }2.虚拟机管理模块 bool basicPublish(const std::string vname, const std::string qname, const BasicProperities *bp, const std::string body, PublishMechanism mechanism) {std::ostringstream oss;oss 发布消息失败,因为虚拟机不存在, 队列名称: qname , 虚拟机名称: vname \n;VirtualHost::ptr vhp getVirtualHost(vname, oss);if (vhp.get() nullptr){return false;}return vhp-basicPublish(qname, bp, body, mechanism); }2.推送和拉取消息的修改 1.虚拟机模块 // 推送[消费]消息 MessagePtr basicConsume(const std::string qname) {return _mmp-push_list_front(qname); }// 拉取消息 MessagePtr basicPull(const std::string qname) {return _mmp-pull_list_front(qname); }2.虚拟机管理模块 // 推送[消费]消息 MessagePtr basicConsume(const std::string vname, const std::string qname) {std::ostringstream oss;oss 推送消息失败,因为虚拟机不存在, 队列名称: qname , 虚拟机名称: vname \n;VirtualHost::ptr vhp getVirtualHost(vname, oss);if (vhp.get() nullptr){return MessagePtr();}return vhp-basicConsume(qname); }// 拉取消息 MessagePtr basicPull(const std::string vname, const std::string qname) {std::ostringstream oss;oss 拉取消息失败,因为虚拟机不存在, 队列名称: qname , 虚拟机名称: vname \n;VirtualHost::ptr vhp getVirtualHost(vname, oss);if (vhp.get() nullptr){return MessagePtr();}return vhp-basicPull(qname); }然后虚拟机模块就搞定了,再往上走,修改信道 5.proto网络通信协议修改 因为信道是网络服务模块且生产者要提供的参数多了一个所以我们需要修改一下proto网络通信协议 1.BasicPublishRequest 其实就是给BasicPublishRequest加一个PublishMechanism 字段而已 //7. 消息的发布与确认 message BasicPublishRequest {string req_id 1;string channel_id 2;string vhost_name 3;string exchange_name 4; //需要用户指定消息发布到哪个交换机上然后我们给他进行路由匹配放到对应队列当中string body 5;BasicProperities properities 6;PublishMechanism mechanism 7;// 消息的发布方式 }protoc --cpp_out. mq_proto.proto6.信道模块修改 1.发布消息的修改 给basicPublish多传一个参数req-mechanism()只有当该消息是PUSH的发布机制时才需要将publishCallback封装为异步任务抛入线程池 void basicPublish(const BasicPublishRequestPtr req) {// 1. 先找到该交换机的交换机类型Exchange::ptr ep _vhost_manager_ptr-getExchange(req-vhost_name(), req-exchange_name());if (ep.get() nullptr){default_error(发布消息失败因为交换机不存在\n交换机名称%s,req-exchange_name().c_str());basicResponse(req-req_id(), req-channel_id(), false);return;}// 2. 先找到消息发布的交换机 绑定的所有队列MsgQueueBindingMap qmap _vhost_manager_ptr-getAllBindingsByExchange(req-vhost_name(), req-exchange_name());// 3. 遍历所有队列,进行路由匹配与消息投递for (auto kv : qmap){Binding::ptr bp kv.second;BasicProperities *properities nullptr;::std::string routing_key;if (req-has_properities()){properities req-mutable_properities();routing_key properities-routing_key();}if (Router::route(routing_key, bp-binding_key, ep-type)){// 把消息投递到指定队列_vhost_manager_ptr-basicPublish(req-vhost_name(), bp-queue_name, properities, req-body(), req-mechanism());// 判断该消息是否需要推送if (req-mechanism() PUSH || req-mechanism() BOTH){// 5. 向线程池添加一个消息消费任务,消费任务交给线程池中的线程去做,解放Channel线程去做更重要的任务auto func ::std::bind(Channel::publishCallback, this, req-vhost_name(), bp-queue_name);_pool_ptr-put(func);}}}// 返回响应即可basicResponse(req-req_id(), req-channel_id(), true); }2.publishCallback的修改 注意注意 当publishCallback推送消息找不到消费者时要将该消息放到待拉取消息链表当中 放过去的时候将消息推送机制改为PULL 1.坑点–连锁BUG 这里有一个连锁BUG问题 我们复用basicPublish的时候会复用到消息管理模块当中的新增消息 如果我们的消息是持久化的那么就会重复持久化持久化时又会修改消息结构体当中的offset字段因此ACK的时候就只能删除持久化消息副本而无法删除原件 此时就BUG了那么我们想当然地就会这么想 那我把DeliveryMode改成UNDURABLE不就行了 mp-mutable_valid()-mutable_properities()-set_mode(UNDURABLE);不行的因为我们ACK的时候是否需要删除持久化消息是看该消息的DeliveryMode 因此这样的话ACK时就无法删除该消息了也是BUG 怎么办呢 2.解决方案 从需求解决问题 我们想要的其实就是把一个MessagePtr放到待拉取消息链表当中因此让消息管理模块提供这么一个接口不就行了吗 从复用方面解决问题 我们依然选择持久化但是在调用basicPublish之前先调用一下basicAck将原件先ACK了 从ACK方面方面解决问题 修改ACK是否删除持久化消息不依据DeliveryMode而是依赖于该消息是否在持久化哈希表当中 下面我们进行选择 效率第2种由于需要复用ACK也就需要进行IO操作效率低所以淘汰第2种业务/代码优雅角度第3种必须依赖于持久化哈希表而不能依赖于DeliveryMode不好而且修改DeliveryMode不利于排查BUG时的调试不好 因此我们选择第1种 3.实现接口 QueueMessageManager // 提供向待拉取消息链表当中插入数据 void insert_pull(const MessagePtr mp) {std::unique_lockstd::mutex ulock(_mutex);_waitpull_list.push_back(mp);_waitpublish_map[mp].pull_iterstd::prev(_waitpull_list.end());_waitpublish_map[mp].push_iter_waitpush_list.end(); }MessageManager // 提供向待拉取消息链表当中插入数据 bool insert_pull(const std::string qname, const MessagePtr mp) {QueueMessageManager::ptr qmmp;{std::unique_lockstd::mutex ulock(_mutex);auto iter _qmsg_map.find(qname);if (iter _qmsg_map.end()){default_error(发布消息失败,因为该队列的消息管理模块句柄尚未初始化);return false;}qmmp iter-second;}qmmp-insert_pull(mp);return true; }VirtualHost // 将消息放入待拉取消息链表当中 bool insert_pull(const std::string qname,const MessagePtr mp) {return _mmp-insert_pull(qname,mp); }VirtualHostManager // 将消息放入待拉取消息链表当中 bool insert_pull(const std::string vname, const std::string qname, const MessagePtr mp) {std::ostringstream oss;oss 将消息放入待拉取消息链表失败,因为虚拟机不存在, 队列名称: qname , 虚拟机名称: vname \n;VirtualHost::ptr vhp getVirtualHost(vname, oss);if (vhp.get() nullptr){return false;}return vhp-insert_pull(qname, mp); }4.publishCallback的修改 // 推送消息(取出消息,取出消费者,调用对应消费者的消费处理回调函数) // 不能先取消费者因为那样会导致 在无消费者的情况下待推送消息在链表当中堆积的情况 // 而通过先取消息再取消费者将消息放到待拉取消息链表当中等待有消费者拉取 void publishCallback(const ::std::string vname, const ::std::string qname) {// 1.取出消息MessagePtr mp _vhost_manager_ptr-basicConsume(vname, qname);if (mp.get() nullptr){default_info(消息的消费失败, 因为消息队列为空,没有消息: %s,qname.c_str());return;}// 2.取出消费者Consumer::ptr cp _consumer_manager_ptr-selectConsumer(vname, qname);if (cp.get() nullptr){default_info(该队列中暂无消费者,将该消息放入该队列的待拉取消息链表当中 %s,qname.c_str());if (mp-mechanism() PUSH || mp-mechanism() BOTH){// 这里要将该消息重新添加到待拉取消息链表当中_vhost_manager_ptr-insert_pull(vname, qname, mp);}return;}// 3.调用消费者的消费处理回调函数cp-_callback(cp-_consumer_tag, mp-mutable_valid()-mutable_properities(), mp-valid().body());default_info(调用消费者的消费处理回调函数成功 %s,qname.c_str());// 4.如果消费者有自动确认标志,则进行自动确认if (cp-_auto_ack true){_vhost_manager_ptr-basicAck(vname, qname, mp-valid().properities().msg_id());} }3.拉取消息的修改 void basicPull(const BasicPullRequestPtr req) {// 1. 拿到该消费者Consumer::ptr cp _consumer_manager_ptr-getConsumer(req-vhost_name(), req-queue_name(), req-consumer_tag());if(cp.get()nullptr){default_error(拉取消息失败因为消费者不存在消费者tag%s,req-consumer_tag().c_str());basicResponse(req-req_id(), req-channel_id(), false);return;}// 2. 拿到消息MessagePtr mp _vhost_manager_ptr-basicPull(req-vhost_name(), req-queue_name());if(mp.get()nullptr){default_error(拉取消息失败因为该队列没有待推送消息队列名%s,req-queue_name().c_str());basicResponse(req-req_id(), req-channel_id(), false);return;}// 3. 封装异步任务,抛入线程池auto func [cp, mp, req, this](){// 3. 调用该消费者的消费处理回调函数cp-_callback(cp-_consumer_tag, mp-mutable_valid()-mutable_properities(), mp-valid().body());// 4. auto_ack的问题if (cp-_auto_ack){this-_vhost_manager_ptr-basicAck(req-vhost_name(), req-queue_name(), mp-valid().properities().msg_id());}};_pool_ptr-put(func);// 4. 基础相应basicResponse(req-req_id(),req-channel_id(),true); }7.客户端修改 就是给Channel多加一个参数而已 bool BasicPublish(const std::string vhost_name, const std::string exchange_name, const BasicProperities *bp, const std::string body,PublishMechanism mechanism) {BasicPublishRequest req;std::string rid UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);req.set_vhost_name(vhost_name);req.set_exchange_name(exchange_name);req.set_body(body);req.set_mechanism(mechanism);if (bp ! nullptr){req.mutable_properities()-set_msg_id(bp-msg_id());req.mutable_properities()-set_mode(bp-mode());req.mutable_properities()-set_routing_key(bp-routing_key());}// 发送请求_codec-send(_conn, req);BasicCommonResponsePtr resp waitResponse(rid);if (resp-ok()){default_info(发布消息成功 %s,body.c_str());}else{default_info(发布消息失败 %s,body.c_str());}return resp-ok(); }七、验证 1.消息拉取功能与恢复功能联合测试 我们的验证方式是 先让生产者跑然后再让消费者跑消费者能够拉取消息则成功让生产者跑制造持久化未确认消息然后服务器重启恢复历史消息然后消费者跑消费者能够拉取消息则成功 1.测试1 1.生产者 消息的发布机制就给PUSH了 #include connection.hpp using namespace ns_mq; #include thread #include vector using namespace std;// host1 void publisher1(const Connection::ptr conn, const std::string thread_name) {// 1. 创建信道Channel::ptr cp conn-getChannel();// 2. 创建虚拟机,交换机,队列,并进行绑定cp-declareVirtualHost(host1, ./host1/resource.db, ./host1/message);cp-declareExchange(host1, exchange1, TOPIC, true, false, {});cp-declareMsgQueue(host1, queue1, true, false, false, {});cp-declareMsgQueue(host1, queue2, true, false, false, {});cp-bind(host1, exchange1, queue1, news.sport.#);cp-bind(host1, exchange1, queue2, news.*.zhangsan);// 3. 发送10条消息BasicProperities bp;bp.set_mode(DURABLE);bp.set_routing_key(news.sport.basketball);for (int i 0; i 10; i){bp.set_msg_id(UUIDHelper::uuid());cp-BasicPublish(host1, exchange1, bp, Hello - std::to_string(i), PUSH);}// 4. 关闭信道conn-returnChannel(cp); }// host2 void publisher2(const Connection::ptr conn, const std::string thread_name) {// 1. 创建信道Channel::ptr cp conn-getChannel();// 2. 创建虚拟机,交换机,队列,并进行绑定cp-declareVirtualHost(host2, ./host2/resource.db, ./host2/message);cp-declareExchange(host2, exchange1, TOPIC, true, false, {});cp-declareMsgQueue(host2, queue1, true, false, false, {});cp-declareMsgQueue(host2, queue2, true, false, false, {});cp-bind(host2, exchange1, queue1, news.sport.#);cp-bind(host2, exchange1, queue2, news.*.zhangsan);// 3. 发送10条消息BasicProperities bp;bp.set_mode(DURABLE);bp.set_routing_key(news.sport.basketball);for (int i 0; i 10; i){bp.set_msg_id(UUIDHelper::uuid());cp-BasicPublish(host2, exchange1, bp, Hello - std::to_string(i), PUSH);}// 4. 关闭信道conn-returnChannel(cp); }int main() {AsyncWorker::ptr worker std::make_sharedAsyncWorker();Connection::ptr myconn std::make_sharedConnection(127.0.0.1, 8888, worker);vectorthread thread_v;thread_v.push_back(thread(publisher1, myconn, thread1));thread_v.push_back(thread(publisher2, myconn, thread2));for (auto t : thread_v)t.join();return 0; }2.消费者 订阅完队列之后每隔1s拉取一次消息 #include connection.hpp using namespace ns_mq; #include thread #include vector #include thread using namespace std;// 因为要拿到信道才能进行确认,所以这里需要把Channel::ptr bind过来 void Callback(const Channel::ptr cp, const std::string consumer_tag, const BasicProperities *bp, const std::string body) {// 1. 消费消息std::string id;if (bp ! nullptr){id bp-msg_id();}std::cout consumer_tag 消费了消息: body , 消息ID: id \n;// 2. 确认消息if (bp ! nullptr)std::cout cp-BasicAck(id) \n; }void consumer1(const Connection::ptr conn, const std::string thread_name) {Channel::ptr cp conn-getChannel();default_debug(consumer1: 信道ID,cp-cid().c_str());// 2. 创建虚拟机,交换机,队列,并进行绑定cp-declareVirtualHost(host1, ./host1/resource.db, ./host1/message);cp-declareExchange(host1, exchange1, TOPIC, true, false, {});cp-declareMsgQueue(host1, queue1, true, false, false, {});cp-declareMsgQueue(host1, queue2, true, false, false, {});cp-bind(host1, exchange1, queue1, news.sport.#);cp-bind(host1, exchange1, queue2, news.*.zhangsan);// 3. 创建消费者cp-BasicConsume(host1, consumer1, queue1,std::bind(Callback, cp, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3), false);// 4. 等待消息while (true){cp-BasicPull();std::this_thread::sleep_for(std::chrono::seconds(1));}// 5. 关闭信道conn-returnChannel(cp); }void consumer2(const Connection::ptr conn, const std::string thread_name) {Channel::ptr cp conn-getChannel();default_debug(consumer2: 信道ID,cp-cid().c_str());// 2. 创建虚拟机,交换机,队列,并进行绑定cp-declareVirtualHost(host2, ./host2/resource.db, ./host2/message);cp-declareExchange(host2, exchange1, TOPIC, true, false, {});cp-declareMsgQueue(host2, queue1, true, false, false, {});cp-declareMsgQueue(host2, queue2, true, false, false, {});cp-bind(host2, exchange1, queue1, news.sport.#);cp-bind(host2, exchange1, queue2, news.*.zhangsan);// 3. 创建消费者cp-BasicConsume(host2, consumer2, queue1,std::bind(Callback, cp, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3), false);// 4. 等待消息while (true){cp-BasicPull();std::this_thread::sleep_for(std::chrono::seconds(1));}// 5. 关闭信道conn-returnChannel(cp); }int main() {AsyncWorker::ptr worker std::make_sharedAsyncWorker();// 1. 创建连接和信道Connection::ptr conn std::make_sharedConnection(127.0.0.1, 8888, worker);vectorthread thread_v;thread_v.push_back(thread(consumer1, conn, thread1));thread_v.push_back(thread(consumer2, conn, thread2));for (auto t : thread_v)t.join();return 0; }3.演示 先让生产者跑然后再让消费者跑消费者能够拉取消息则成功 2.测试2 – 演示 让生产者跑制造持久化未确认消息然后服务器重启恢复历史消息然后消费者跑消费者能够拉取消息则成功 2.PULL测试 我们的验证方式是客户端纯拉取,所有消息必须是由拉取进行消费的 因为客户端拉取消息是每1s拉取一次,所以拉取消息会持续10s,如果是推送的话,那么一瞬间就会搞定 先让消费者跑再让生产者跑 只需要把生产者发布消息时的发布机制改一下即可 cp-BasicPublish(host2, exchange1, bp, Hello - std::to_string(i), PULL);演示 2.BOTH测试 我们在publishCallback当中故意让工作线程等上5s这样就能让拉取快于推送了 因此 void publishCallback(const ::std::string vname, const ::std::string qname) {std::this_thread::sleep_for(std::chrono::seconds(5)); //模拟5s后异步线程才开始执行该函数,测试BOTH时使用,用来让拉取快于推送为了保证生产者主线程退出之前异步工作线程能够执行完这些publishCallback 因此我们让生产者结束之后陷入死循环 // 4. 测试BOTH时:等待异步线程执行完publishCallback while (true) {std::this_thread::sleep_for(std::chrono::seconds(1000)); }// 5. 关闭信道 conn-returnChannel(cp);演示: 验证成功 本篇博客分了两大点来进行扩展是为了让大家更有一步步的代入感不至于一上来就这么突兀 所以代码篇幅较大希望大家理解 动图比较卡顿是因为帧数太少因为CSDN不支持上传5MB以上的图片所以只能减帧压缩体积抱歉 以上就是项目扩展二消息拉取功能的实现的全部内容
http://www.w-s-a.com/news/802229/

相关文章:

  • 织梦网站模板后台密码找回企业vi设计公司性价比高
  • php 爬取网站所有链接传奇手游发布网站
  • 免费软文网站wordpress中文名注册
  • 企业网站建设研究目的意义怎样设计一个公司网站
  • 怎么架构网站便民信息发布平台
  • 网站 建设 现状网站推广合同需要缴纳印花税吗
  • 熊猫头表情包制作网站wordpress 缺省目录
  • 网站浏览图片怎么做的群晖wordpress升级5.0
  • 25个优秀个人网站设计模板网站建设定位分析论文
  • 在线网站备案站长seo综合查询工具
  • 网站根 html网站建设行业数据
  • 网站公司做的网站有最字设计说明室内设计
  • 在线网站代码生成我想做个百度网站怎么做
  • 网站的建设费用分为长治市建设厅官方网站
  • 做网站都有哪些费用建设免费手机网站
  • 网站 组成代码做网站图片怎么插
  • 2020中国企业500强榜单南宁seo标准
  • 北美购物网站排名烟台专业的网站建站公司
  • 门户网站设计特点营销策划咨询机构
  • 天津做网站就到徽信xiala5中国营销型网站
  • 外汇网站建设制作深圳三站合一网站建设
  • 深圳坂田网站设计公司有哪些学校网站建设管理办法
  • 太原建设银行网站中山营销型网站设计
  • 广东省建设厅官方网站多少钱江苏省江建集团有限公司建设网站
  • 网站开发主流服装网站开发课程设计
  • 在iis里面创建网站wordpress响应式视频
  • 学设计哪个网站好网页设计音乐网站
  • 可以自己做斗图的网站上海模板建站多少钱
  • 山东川畅信息技术有限公司网站建设网站开发任务书
  • 网站排版设计欣赏搭建公司介绍网站