当前位置: 首页 > news >正文

当阳建设中学网站linux视频播放网站

当阳建设中学网站,linux视频播放网站,长沙微推广平台,温岭做网站十二、day12 之前的粘包处理是基于消息头包含的消息体长度进行对应的切包操作#xff0c;但并不完整。一般来说#xff0c;消息头仅包含数据域的长度#xff0c;但是如果要进行逻辑处理#xff0c;就需要传递一个id字段表示要处理的消息id#xff0c;当然可以不在包头传i…十二、day12 之前的粘包处理是基于消息头包含的消息体长度进行对应的切包操作但并不完整。一般来说消息头仅包含数据域的长度但是如果要进行逻辑处理就需要传递一个id字段表示要处理的消息id当然可以不在包头传id字段将id序列化到消息体也是可以的但是我们为了便于处理也便于回调逻辑层对应的函数最好是将id写入包头。 之前我们设计的消息结构是这样的 而本节需要加上id字段 在此之前先完整的复习一下基于boost::asio实现的服务器逻辑层结构 1. 服务器架构设计 1asio底层通信 前面的asio底层通信过程如下图所示 Asio底层的通信过程 1首先在应用层调用async_read时相当于在io_context中注册了一个读事件表示程序希望在指定socket上进行异步读取操作并提供一个读回调函数以在读取完成后做相应的处理 2io_context用于管理所有异步操作和相应的回调函数且当async_read被调用时asio会将该socket、相应的读事件和回调函数注册到系统内部的模型中数据结构当io_context启动时即io_context.run时asio根据系统使用对应的模型管理这些事件windows是iocplinux是epoll 3模型进入一个死循环会监听所有注册的socket并监测其状态可读可写如果socket的状态发生变化事件被触发模型将该事件放入就绪事件队列中 4io_context::run 在轮询就绪事件队列时会依次调用每个就绪事件的回调函数已经放在就绪事件队列中每个回调函数都包含了处理读操作的逻辑比如读取数据、处理错误等。 2逻辑层结构 而服务器架构除了上面的内容之外一般还有一个逻辑层。 一般在解析完对端发送的数据之后还要对该请求做更进一步地处理比如根据不同的消息id执行不同的逻辑层函数或不同的操作比如读数据库、写数据库还比如游戏中可能需要给玩家叠加不同的buff、增加积分等等这些都需要交给逻辑层处理而不仅仅是把消息发给对端。 服务器架构 上图所示的是一个完成的服务器架构一般需要将逻辑层独立出来因为如果在解析完对端数据后需要执行一些复杂的操作比如玩家需要叠加各自buff或者技能此时可能会耗时1s甚至更多如果没有独立的逻辑层进行操作那么系统会一直停留在执行回调函数那一步造成阻塞直至操作结束。 而逻辑层是独立的回调函数只需将数据投递给逻辑队列回调函数将数据放入队列中之后系统会运行下一步便不会被阻塞逻辑系统会自动从队列中取数据并做相应操作如果需要在执行完操作之后做相应回复那么逻辑系统会调用写事件并注册写回调给asio网络层网络层就是asio底层通信的网络层步骤。 以上操作是基于单线程如果是在多线程的情况下阻塞的情况会不会好一些 asio的多线程有两种模式。 1第一种模式是启动n个线程每个线程负责一个io_context每一个io_context负责一部分的socket。比如现在有两个io_context一个是负责socket的id为奇数的io_context一个是负责socket的id为偶数的io_context但同样会造成阻塞的情况。因为不管是多线程还是单线程只要在线程中有一个io_context中运行那么它负责的那部分回调函数的处理操作如果比较复杂时仍会造成阻塞的情况。 2第二种模式是一个io_context跑在多个线程中即多个线程共享一个io_context。这种模式下不会造成之前的那种阻塞情况因为在就绪事件队列中的事件不是一个线程处理了而是不同的线程共享一个就绪事件队列不同线程会触发执行不同的回调函数即使某个回调处理的比较慢但由于其他事件被不同线程处理了系统并不需要阻塞等待该回调处理完成之后在执行处理其他回调。 虽然模式二办法会解决系统阻塞、超时的问题但在现实中需要有一个逻辑层独立于网络层和应用层这样可以极大地提高网络线程的收发能力用多线程的方式管理网络层。 2. 完善粘包处理操作 之前的消息结构并不完善缺少一个消息id本节进行代码的相应改进。 首先之前的消息节点被收发共用只不过收数据用的是第一种构造函数发数据用的是第二种构造函数。为了减少耦合和歧义需要重新设计消息节点。 1) 消息节点 重新构建一个MsgNode类并派生出RecvNode 和SendNode MsgNode 表示消息节点的基类头部的消息用该结构存储 RecvNode 表示接收消息的节点 SendNode 表示发送消息的节点 #pragma once #include iostream #include string #include boost/asio.hppusing std::cout; using std::cin; using std::endl;class MsgNode { public:short _cur_len;short _total_len;char* _msg;MsgNode(short max_len) :_total_len(max_len), _cur_len(0) {_msg new char[_total_len 1](); // 加()会将分配内存的每个元素初始化为0不加不会初始化_msg[_total_len] \0;}~MsgNode() {std::cout destruct MsgNode endl;delete[] _msg;}void Clear() {::memset(_msg, 0, _total_len);_cur_len 0;} }; // 构造收节点 class RecvNode :public MsgNode { private:short _msg_id; public:RecvNode(short max_len, short msg_id); }; // 构造发节点 class SendNode :public MsgNode { private:short _msg_id; public:SendNode(const char* msg, short max_len, short msg_id); }; 具体实现为 #include MsgNode.h #include Const.hRecvNode::RecvNode(short max_len, short msg_id) :MsgNode(max_len), _msg_id(msg_id) {}// 发送的数据首地址、数据长度、消息id发送节点总长度为消息体长度头节点长度 SendNode::SendNode(const char* msg, short max_len, short msg_id) : MsgNode(max_len HEAD_TOTAL_LEN) , _msg_id(msg_id) {// 将消息id转换为网络序并存储至至发送节点内short msg_id_host boost::asio::detail::socket_ops::host_to_network_short(msg_id);memcpy(_msg, msg_id_host, HEAD_ID_LEN);// 将消息体长度转换为网络序并存储至至发送节点内short max_len_host boost::asio::detail::socket_ops::host_to_network_short(max_len);memcpy(_msg HEAD_ID_LEN, max_len_host, HEAD_DATA_LEN);// 将消息内容存储至发送节点内memcpy(_msg HEAD_ID_LEN HEAD_DATA_LEN, msg, max_len); } Const.h 定义为 #pragma once const size_t MAX_LENGTH 1024 * 2; const short MAX_RECVQUE 10000; const short MAX_SENDQUE 1000; const size_t HEAD_TOTAL_LEN 4; const size_t HEAD_ID_LEN 2; const size_t HEAD_DATA_LEN 2; 构建SendNode节点时需要将消息id和消息长度转换为网络序然后写入数据域_msg 前2字节存储idid后为消息长度偏移4字节后为消息体内容。 2Session类 Session类和前面差不多不过需要把收发的逻辑做相应的修改 首先队列_send_que、消息头结构、消息体结构需要重新声明分别使用SendNodeRecvNodeMsgNode作为元素类型。 std::queuestd::shared_ptrSendNode _send_que;std::mutex _send_lock;std::shared_ptrRecvNode _recv_msg_node; // 收到的消息结构bool _b_head_parse; // 表示是否处理完头部信息std::shared_ptrMsgNode _recv_head_node; // 收到的头部结构 Session的构造函数也需要做相应变化消息头结构的大小更改为4字节包括id和消息体长度 CSession(boost::asio::io_context ioc, CServer* server) : _socket(ioc), _server(server), _b_close(false),_b_head_parse(false) {// random_generator是函数对象加()就是函数再加一个()就是调用该函数boost::uuids::uuid a_uuid boost::uuids::random_generator()();_uuid boost::uuids::to_string(a_uuid);_recv_head_node std::make_sharedMsgNode(HEAD_TOTAL_LEN);} 重新定义Send函数两个Send的重载都需要重新定义 参数列表增加msgid构造发送节点时需输入三个参数msg, max_length, msgid发送内容内容长度消息id void CSession::Send(char* msg, int max_length, short msgid) {bool pending false; // 发送标志true时有未完成的发送操作false为空// 使用lock_guard锁住_send_lock确保_send_lock发送队列访问的线程安全的// 锁的存在确保了多个线程不会同时修改发送队列std::lock_guardstd::mutex lock(_send_lock);int send_que_size _send_que.size();if (send_que_size MAX_SENDQUE) {cout session: _uuid send que fulled, size is MAX_SENDQUE endl;return;}// 判断队列是否有未完成的发送操作if (_send_que.size() 0) {pending true;}_send_que.push(std::make_sharedSendNode(msg, max_length, msgid)); // 将发送消息存储至队列if (pending) { // 如果有未完成的发送直接返回return;}// 异步发送auto msgnode _send_que.front();boost::asio::async_write(_socket, boost::asio::buffer(msgnode-_msg, msgnode-_total_len),std::bind(CSession::haddle_write, this, std::placeholders::_1, shared_from_this())); } // 当}结束后_send_lock解锁发送队列解锁void CSession::Send(std::string msg, short msgid) {bool pending false; // 发送标志true时有未完成的发送操作false为空// 使用lock_guard锁住_send_lock确保_send_lock发送队列访问的线程安全的// 锁的存在确保了多个线程不会同时修改发送队列std::lock_guardstd::mutex lock(_send_lock);int send_que_size _send_que.size();if (send_que_size MAX_SENDQUE) {cout session: _uuid send que fulled, size is MAX_SENDQUE endl;return;}// 判断队列是否有未完成的发送操作if (_send_que.size() 0) {pending true;}_send_que.push(std::make_sharedSendNode(msg.c_str(), msg.length(),msgid)); // 将发送消息存储至队列if (pending) { // 如果有未完成的发送直接返回return;}// 异步发送auto msgnode _send_que.front();boost::asio::async_write(_socket, boost::asio::buffer(msgnode-_msg, msgnode-_total_len),std::bind(CSession::haddle_write, this, std::placeholders::_1, shared_from_this())); } // 当}结束后_send_lock解锁发送队列解锁 读回调也需更改在文章10中haddle_write函数的基础上做修改可参考该文章 https://zhuanlan.zhihu.com/p/722233898 void CSession::HandleRead(const boost::system::error_code error, size_t bytes_transferred,std::shared_ptrCSession _self_shared) {if (!error) {// 打印缓存区的数据并将该线程暂停2s//PrintRecvData(_data, bytes_transferred);//std::chrono::milliseconds dura(2000);//std::this_thread::sleep_for(dura);// 每触发一次handale_read它会返回实际读取的字节数bytes_transferredcopy_len表示已处理的长度每处理一字节copy_len便加一int copy_len 0; // 已经处理的字符数while (bytes_transferred 0) { // 只要读取到数据就对其处理if (!_b_head_parse) { // 判断消息头部是否已处理_b_head_parse默认为false// 异步读取到的字节数 已接收到的头部长度 头部总长度if (bytes_transferred _recv_head_node-_cur_len HEAD_TOTAL_LEN) { // 收到的数据长度小于头部长度说明头部还未全部读取// 如果未完全接收消息头则将接收到的数据复制到头部缓冲区// _recv_head_node-_msg更新当前头部的接收长度并继续异步读取剩余数据。memcpy(_recv_head_node-_msg _recv_head_node-_cur_len, _data copy_len, bytes_transferred);_recv_head_node-_cur_len bytes_transferred;// 缓冲区清零无需更新copy_len追踪已处理的字符数因为之前读取的数据已经全部写入头部节点下一个// 读入的消息从头开始copy_len0往头节点写::memset(_data, 0, MAX_LENGTH);// 继续读消息_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(CSession::headle_read, this,std::placeholders::_1, std::placeholders::_2, _self_shared));return;}// 如果接收到的数据量足够处理消息头部则计算头部剩余的未接收字节// 并将其从 _data 缓冲区复制到头部消息缓冲区 _recv_head_node-_msgint head_remain HEAD_TOTAL_LEN - _recv_head_node-_cur_len; // 头部剩余未复制的长度// 填充头部节点memcpy(_recv_head_node-_msg _recv_head_node-_cur_len, _data copy_len, head_remain);copy_len head_remain; // 更新已处理的data长度bytes_transferred - head_remain; // 更新剩余未处理的长度short msg_id 0; // 获取消息idmemcpy(msg_id, _recv_head_node-_msg, HEAD_ID_LEN);//网络字节序转化为本地字节序msg_id boost::asio::detail::socket_ops::network_to_host_short(msg_id);cout msg_id is msg_id endl;// 判断id是否合法if (msg_id MAX_LENGTH) {std::cout invaild msg_id is msg_id endl;_server-ClearSession(_uuid);return;}short msg_len 0; // 获取头部数据消息长度memcpy(msg_len, _recv_head_node-_msg HEAD_ID_LEN, HEAD_DATA_LEN);//网络字节序转化为本地字节序msg_len boost::asio::detail::socket_ops::network_to_host_short(msg_len);cout msg_len is msg_len endl;if (msg_len MAX_LENGTH) { // 判断头部长度是否非法std::cout invalid data length is msg_len endl;_server-ClearSession(_uuid);return;}_recv_msg_node std::make_sharedRecvNode(msg_len, msg_id); // 已知数据长度msg_len构建消息内容载体//消息的长度小于头部规定的长度说明数据未收全则先将部分消息放到接收节点里if (bytes_transferred msg_len) {memcpy(_recv_msg_node-_msg _recv_msg_node-_cur_len, _data copy_len, bytes_transferred);_recv_msg_node-_cur_len bytes_transferred;// copy_len不用更新缓冲区会清零下一个读入data的数据从头开始写入copy_len也会被初始化为0::memset(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),std::bind(CSession::headle_read, this, std::placeholders::_1, std::placeholders::_2, _self_shared));_b_head_parse true; //头部处理完成return;}// 接收的长度多于消息内容长度memcpy(_recv_msg_node-_msg _recv_msg_node-_cur_len, _data copy_len, msg_len);_recv_msg_node-_cur_len msg_len;copy_len msg_len;bytes_transferred - msg_len;_recv_msg_node-_msg[_recv_msg_node-_total_len] \0;// cout receive data is _recv_msg_node-_msg endl;// protobuf序列化//MsgData msgdata;//std::string receive_data;//msgdata.ParseFromString(std::string(_recv_msg_node-_msg, _recv_msg_node-_total_len));//std::cout receive msg id is msgdata.id () msg data is msgdata.data() endl;//std::string return_str Server has received msg, msg data is msgdata.data();//MsgData msgreturn;//msgreturn.set_id(msgdata.id());//msgreturn.set_data(return_str);//msgreturn.SerializeToString(return_str);//Send(return_str);// jsoncpp序列化Json::Reader reader;Json::Value root;reader.parse(std::string(_recv_msg_node-_msg, _recv_msg_node-_total_len), root);std::cout recevie msg id is root[id].asInt() msg data is root[data].asString() endl;root[data] Server has received msg, msg data is root[data].asString();std::string return_str root.toStyledString();Send(return_str, root[id].asInt());//Send(_recv_msg_node-_msg, _recv_msg_node-_total_len); // 回传// 清理已处理的头部消息并重置准备解析下一条消息_b_head_parse false;_recv_head_node-Clear();// 如果当前数据已经全部处理完重置缓冲区 _data并继续异步读取新的数据if (bytes_transferred 0) {::memset(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),std::bind(CSession::headle_read, this, std::placeholders::_1, std::placeholders::_2, _self_shared));return;}continue; // 异步读取的消息未处理完继续填充头节点乃至新的消息节点}//已经处理完头部处理上次未接受完的消息数据int remain_msg _recv_msg_node-_total_len - _recv_msg_node-_cur_len;if (bytes_transferred remain_msg) { //接收的数据仍不足剩余未处理的memcpy(_recv_msg_node-_msg _recv_msg_node-_cur_len, _data copy_len, bytes_transferred);_recv_msg_node-_cur_len bytes_transferred;::memset(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),std::bind(CSession::headle_read, this, std::placeholders::_1, std::placeholders::_2, _self_shared));return;}// 接收的数据多于剩余未处理的长度memcpy(_recv_msg_node-_msg _recv_msg_node-_cur_len, _data copy_len, remain_msg);_recv_msg_node-_cur_len remain_msg;bytes_transferred - remain_msg;copy_len remain_msg;_recv_msg_node-_msg[_recv_msg_node-_total_len] \0;//cout receive data is _recv_msg_node-_msg endl;// protobuf序列化//MsgData msgdata;//std::string receive_data;//msgdata.ParseFromString(std::string(_recv_msg_node-_msg, _recv_msg_node-_total_len));//std::cout receive msg id is msgdata.id() msg data is msgdata.data() endl;//std::string return_str Server has received msg, msg data is msgdata.data();//MsgData msgreturn;//msgreturn.set_id(msgdata.id());//msgreturn.set_data(return_str);//msgreturn.SerializeToString(return_str);//Send(return_str);//jsoncpp序列化Json::Reader reader;Json::Value root;reader.parse(std::string(_recv_msg_node-_msg, _recv_msg_node-_total_len), root);std::cout recevie msg id is root[id].asInt() msg data is root[data].asString() endl;root[data] Server has received msg, msg data is root[data].asString();std::string return_str root.toStyledString();Send(return_str, root[id].asInt());//此处可以调用Send发送测试//Send(_recv_msg_node-_msg, _recv_msg_node-_total_len);//继续轮询剩余未处理数据_b_head_parse false;_recv_head_node-Clear();if (bytes_transferred 0) {::memset(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),std::bind(CSession::headle_read, this, std::placeholders::_1, std::placeholders::_2, _self_shared));return;}continue;}}else {std::cout handle read failed, error is error.what() endl;Close();_server-ClearSession(_uuid);} } HandleRead函数中新增一段读取消息id的代码 首先当消息头节点_recv_head_node填充完毕后获取头节点中存储的消息id并转换为本地字节序并判断id的合法性然后解析消息长度并构建消息体节点_recv_msg_node读取剩下的消息体内容 short msg_id 0; // 获取消息idmemcpy(msg_id, _recv_head_node-_msg, HEAD_ID_LEN);//网络字节序转化为本地字节序msg_id boost::asio::detail::socket_ops::network_to_host_short(msg_id);cout msg_id is msg_id endl;// 判断id是否合法if (msg_id MAX_LENGTH) {std::cout invaild msg_id is msg_id endl;_server-ClearSession(_uuid);return;} 3客户端 客户端也需额外收发消息id #include boost/asio.hpp #include iostream #include json/json.h #include json/value.h #include json/reader.husing namespace boost::asio::ip; using std::cout; using std::endl; const int MAX_LENGTH 1024 * 2; // 发送和接收的长度为1024 * 2字节 const int HEAD_LENGTH 2; const int HEAD_TOTAL 4;int main() {try {boost::asio::io_context ioc; // 创建上下文服务// 127.0.0.1是本机的回路地址也就是服务器和客户端在一个机器上tcp::endpoint remote_ep(address::from_string(127.0.0.1), 10086); // 构造endpointtcp::socket sock(ioc);boost::system::error_code error boost::asio::error::host_not_found; // 错误主机未找到sock.connect(remote_ep, error);if (error) {cout connect failed, code is error.value() error msg is error.message();return 0;}Json::Value root;root[id] 1001;root[data] hello world;std::string request root.toStyledString();size_t request_length request.length();char send_data[MAX_LENGTH] { 0 };int msgid 1001;int msgid_host boost::asio::detail::socket_ops::host_to_network_short(msgid);memcpy(send_data, msgid_host, 2);//转为网络字节序int request_host_length boost::asio::detail::socket_ops::host_to_network_short(request_length);memcpy(send_data 2, request_host_length, 2);memcpy(send_data 4, request.c_str(), request_length);boost::asio::write(sock, boost::asio::buffer(send_data, request_length 4));char reply_head[HEAD_TOTAL]; // 首先读取对端发送消息的总长度size_t reply_length boost::asio::read(sock, boost::asio::buffer(reply_head, HEAD_TOTAL));msgid 0;memcpy(msgid, reply_head, HEAD_LENGTH);short msglen 0; // 消息总长度memcpy(msglen, reply_head 2, HEAD_LENGTH); // 将消息总长度赋值给msglen//转为本地字节序msglen boost::asio::detail::socket_ops::network_to_host_short(msglen);char msg[MAX_LENGTH] { 0 }; // 构建消息体不含消息总长度size_t msg_length boost::asio::read(sock, boost::asio::buffer(msg, msglen));Json::Reader reader;reader.parse(std::string(msg, msg_length), root);std::cout msg id is root[id] msg is root[data] endl;getchar();}catch (std::exception e) {std::cerr Exception: e.what() endl;}return 0; } 4测试
http://www.w-s-a.com/news/350565/

相关文章:

  • 池州哪里有做网站注册公司有哪些风险
  • 做古代风格头像的网站对网站政务建设的建议
  • 网站搜索栏怎么做设计个网站要多少钱
  • 阿里巴巴网站建设目标wamp wordpress
  • 自己做的网站怎么挂网上金蝶erp
  • 网站的页面由什么组成淘宝网网站建设的需求分析
  • 软文网站推广法dede5.7内核qq个性门户网站源码
  • 个人备案网站名称校园网站建设特色
  • vr超市门户网站建设班级网站怎么做ppt模板
  • 网站建设一般是用哪个软件刚开始做写手上什么网站
  • 用jsp做的网站源代码下载有哪些做红色旅游景点的网站
  • 网站开发的技术选型黄石市网站建设
  • 做直播网站需要证书吗专做宝宝的用品网站
  • 网站标题用什么符号网站制作交易流程
  • dede模板网站教程jsp网站搭建
  • 上海网站开发外包公司鲜花导购网页制作
  • 宿州外贸网站建设公司个人注册网站一般做什么
  • 小公司做网站用哪种服务器什么是网站代理
  • 青岛李村网站设计公司cms建站平台
  • 做saas网站可行吗许昌抖音推广公司
  • 网站建设找谁做seo基础知识培训
  • 微网站怎么做的好建设网站不会写代码
  • 广州外贸网站制作wordpress信息搜索插件
  • 福建高端网站建设个人公众号怎么制作教程
  • 企业网站有哪些举几个例子wordpress ie兼容插件
  • 高端的深圳网站页面设计福清市建设局官方网站
  • 安装网站到服务器合肥建设干部学校网站
  • 影视网站如何做销售案例网站
  • 建设网站对比方案龙岗网站开发公司
  • 网站开发标准网站建设公司兴田德润可信赖