山西省建设监理官方网站,深圳市国外网站建设服务机构,中国兰州网招聘,如何注册网页网址提示#xff1a;文章写完后#xff0c;目录可以自动生成#xff0c;如何生成可参考右边的帮助文档 文章目录 前言信道管理的字段申明/删除交换机申明/删除队列绑定/解绑消息的发布消息确认订阅队列取消订阅信道内存管理类打开信道关闭信道/获取指定信道 总结 前言
信道是在… 提示文章写完后目录可以自动生成如何生成可参考右边的帮助文档 文章目录 前言信道管理的字段申明/删除交换机申明/删除队列绑定/解绑消息的发布消息确认订阅队列取消订阅信道内存管理类打开信道关闭信道/获取指定信道 总结 前言
信道是在通信连接上更细粒度的一个划分也就是一个通信连接上可以由多个信道这些信道都是复用的同一条连接为了充分利用资源。 在用户眼中没有了网络通信的概念了相当于信道他屏蔽了底层的网络通信细节。 用户只需要使用信道提供的服务不需要关心底层的网络通信细节。 在用户眼中信道和信道就是完全独立的。 信道管理的字段
需要有一个信道唯一标识。 信道关联的消费者当信道关闭的时候需要从消费者管理中销毁这个消费者。 信道关联的连接在信道提供的操作中有一个订阅指定队列和给回复响应中。我们需要用到这个连接。 protobufCodec协议处理句柄和连接一样的用处。我们给客户端回复响应就是通过这个句柄中提供的send操作他会为我们添加协议报头。 消费者管理句柄信道提供的操作中需要使用到。 虚拟机管理局并信道提供的操作中需要使用到。 线程池管理句柄在收到消息后需要给客户端推送把推送打包成一个任务放入到线程池 。
class Channel{private:std::string _cid; // 信道唯一标识Consumer::ptr _consumer; // 信道关联的消费者 muduo::net::TcpConnectionPtr _conn; // 信道关联的连接ProtobufCodecPtr _codec; // protobuf协议处理句柄ConsumerManager::ptr _cmp; // 消费者管理句柄VirtualHost::ptr _host; // 虚拟机管理句柄ThreadPool::ptr _pool; // 线程池管理句柄}信道提供了10个操作供用户使用。分别是申明/删除交换机申明/删除队列 绑定/取消绑定消息发布消息确认订阅队列和取消订阅。
申明/删除交换机
信道这里都是收到的一个一个的请求我们从请求中提取所需字段然后通过虚拟机句柄消费者句柄来进行一个操作。 // 声明/销毁交换机void declareExchange(const declareExchangeRequestPtr req){bool ret _host-declareExchange(req-exchange_name(), req-exchange_type(), req-durable(), req-auto_delete(), req-args());return basicResponse(ret, req-rid(), req-cid());}void deleteExchange(const deleteExchangeRequestPtr req){_host-deleteExchange(req-exchange_name());return basicResponse(true, req-rid(), req-cid());}可以看到信道会调用一个basicResponse接口来进行一个响应. 这个响应是通过protobufCodeC来进行的通过他提供的一个send接口来进行发送需要传入信道的连接和响应结构对象。 // 给客户端回复响应
void basicResponse(bool ok, const std::string rid, const std::string cid)
{basicCommonResponse resp;resp.set_rid(rid);resp.set_cid(cid);resp.set_ok(ok);_codec-send(_conn, resp);
}申明/删除队列
申明队列的时候也需要初始化队列消费者管理队列信息管理已经在虚拟机管理中初始化了。 删除队列的时候也需要删除消费者管理对象。
// 创建/删除队列void declareQueue(const declareQueueRequestPtr req){bool ret _host-declareQueue(req-queue_name(), req-durable(), req-exclusive(), req-auto_delete(), req-args());if (ret false){return basicResponse(false, req-rid(), req-cid());}// 初始化队列的消费者管理句柄_cmp-initQueueConsumer(req-queue_name());return basicResponse(true, req-rid(), req-cid());}void deleteQueue(const deleteQueueRequestPtr req){//删除队列的同时也要删除队列的消费者_cmp-destroyQueueConsumer(req-queue_name());_host-deleteQueue(req-queue_name());return basicResponse(true, req-rid(), req-cid());}绑定/解绑
// 绑定队列信息/解除绑定队列信息
void queueBind(const queueBindRequestPtr req)
{_host-bind(req-exchange_name(), req-queue_name(), req-binding_key());return basicResponse(true, req-rid(), req-cid());
}
void queueUnBind(const queueUnBindRequestPtr req)
{_host-unbind(req-exchange_name(), req-queue_name());return basicResponse(true, req-rid(), req-cid());
}消息的发布
在服务端的信道收到了消息发布的请求后需要获取到请求中的交换机字段获取到交换机绑定的所有队列信息。然后通过路由匹配模块来进行匹配匹配成功的队列就会通过虚拟机句柄进行消息发布操作把消息插入到队列消息中的带推送链表中。 // 消息的发布
void basicPublish(const basicPublishRequestPtr req)
{// 获取要发布到的交换机Exchange::ptr ep _host-selectExchange(req-exchange_name());if (ep nullptr){return basicResponse(false, req-rid(), req-cid());}// 进行路由交换判断消息可以发布到交换机绑定的哪个队列MsgQueueBindingMap mqbm _host-getExchangeBindings(req-exchange_name());BasicProperties *properties nullptr;std::string routing_key;if (req-has_properties()){properties req-mutable_properties();routing_key properties-routing_key();}for (auto binding : mqbm){if (Router::route(ep-type, routing_key, binding.second-binding_key)){// DLOG(%d,routing_key:%s binding_key:%s,ep-type,routing_key.c_str(),binding.second-binding_key.c_str());// 将消息添加到队列消息中_host-basicPublish(binding.first, properties, req-body());// 向线程池中提添加一个消息消费任务(向指定队列订阅者推送消息)auto task std::bind(Channel::consume, this, binding.first);_pool-push(task);}}return basicResponse(true, req-rid(), req-cid());
}然后向线程池中推送一个任务。这个任务就是向该队列的队列消费者进行消息的推送。首先从队列消息类中取出一条消息然后从队列消费者中取出一个消费者。通过调用这个消费者中回调函数来进行一个时间推送。这个回调函数就是在订阅队列是服务器绑定的。最后如果消费者的确认应答标志位为1的话会进行消息确认。 // 向指定队列的某个订阅者推送消息
void consume(const std::string qname)
{// 1. 从队列中取出一条消息mq::MessagePtr mp _host-basicConsume(qname);if (mp.get() nullptr){DLOG(执行消费任务失败%s 队列没有消息, qname.c_str());return;}// 2. 从队列订阅者中取出一个订阅者mq::Consumer::ptr cp _cmp-choose(qname);if (cp.get() nullptr){DLOG(执行消费任务失败%s 队列没有消费者, qname.c_str());return;}// 3. 调用订阅者对应的消息处理函数实现消息的推送cp-_cb(cp-_ctag, mp-mutable_payload()-mutable_properties(), mp-payload().body());// 4. 判断如果订阅者是自动确认---不需要等待确认直接删除消息否则需要外部收到消息确认后再删除if (cp-_auto_ack)_host-basicAck(qname, mp-payload().properties().id());
}消息确认
就是通过虚拟机管理句柄调用队列消息提供的操作 // 消息的确认void basicAck(const basicAckRequestPtr req){_host-basicAck(req-queue_name(), req-message_id());return basicResponse(true, req-rid(), req-cid());}订阅队列
订阅客户端可以通过信道提供的basicConsume服务来订阅一个队列。
// 订阅队列
void basicConsume(const basicConsumeRequestPtr req)
{// 判断队列是否存在bool ret _host-existsQueue(req-queue_name());if (ret false){return basicResponse(false, req-rid(), req-cid());}// 创建队列的消费者
auto cb std::bind(Channel::callback, this, std::placeholders::_1,std::placeholders::_2, std::placeholders::_3);
_consumer _cmp-create(req-queue_name(), req-consumer_tag(), req-auto_ack(), cb);
return basicResponse(true, req-rid(), req-cid());
}在服务端就需要创建一个消费者。而消费者中有一个回调函数这个回调函数就是服务器绑定的。这个回调函数就是往客户端推送消息。 // 当有订阅队列请求来时消费者的回调函数
void callback(const std::string tag, const BasicProperties *bp, const std::string body)
{basicConsumerResponse resp;resp.set_cid(_cid);resp.set_consumer_tag(tag);resp.set_body(body);if (bp){resp.mutable_properties()-set_id(bp-id());resp.mutable_properties()-set_delivery_mode(bp-delivery_mode());resp.mutable_properties()-set_routing_key(bp-routing_key());}_codec-send(_conn, resp);
}取消订阅
取消订阅就是删除队列消费者中的指定消费者。 // 取消订阅
void basicCancel(const basicCancelRequestPtr req){_cmp-remove(req-consumer_tag(), req-queue_name());return basicResponse(true, req-rid(), req-cid());}信道内存管理类
需要一个哈希表信道唯一标识和信道对象的映射。 他提供打开信道关闭信道和获取指定信道三个操作。
class ChannelManager
{
private:std::mutex _mutex;std::unordered_mapstd::string, Channel::ptr _channels;
}打开信道
这些参数都是连接管理传递进来的。创建一个信道管理对象。 bool openChannel(const std::string id, const VirtualHost::ptr host, const ConsumerManager::ptr cmp, const ProtobufCodecPtr codec, const muduo::net::TcpConnectionPtr conn,const ThreadPool::ptr pool) {std::unique_lockstd::mutex lock(_mutex);auto it _channels.find(id);if (it ! _channels.end()) {DLOG(信道%s 已经存在!, id.c_str());return false;}auto channel std::make_sharedChannel(id, host, cmp, codec, conn, pool);_channels.insert(std::make_pair(id, channel));return true;
}关闭信道/获取指定信道
void closeChannel(const std::string id){std::unique_lockstd::mutex lock(_mutex);_channels.erase(id);}Channel::ptr getChannel(const std::string id) {std::unique_lockstd::mutex lock(_mutex);auto it _channels.find(id);if (it _channels.end()) {return Channel::ptr();}return it-second;}总结
这个信道管理是服务端的信道管理而在客户端也会有一个信道客户端的信道和服务端的信道是一一对应的。但他们的操作却不一样客户端的信道是为用户提供服务他屏蔽饿底层的网络细节用户只需要调用信道提供的操作不需要关心网络通信。而服务器的信道就是在真正进行业务处理的操作的。