网络上建个网站买东西多少钱,宁波网站建设定制开发,太原网站制作好吗,东莞营销网站建设服务1.实现目标
仿muduo库One Thread One Loop式主从Reactor模型实现高并发服务器#xff1a; 通过实现高并发服务器的组件#xff0c;可以快速实现一个高并发服务器的搭建#xff0c;并且#xff0c;通过组内不同应用层协议的支持#xff0c;可以快速完成高性能服务器的搭建…1.实现目标
仿muduo库One Thread One Loop式主从Reactor模型实现高并发服务器 通过实现高并发服务器的组件可以快速实现一个高并发服务器的搭建并且通过组内不同应用层协议的支持可以快速完成高性能服务器的搭建由于要实现的是一个服务器所以并不涉及实际的业务代码。 Http服务器 Http协议是位于应用层的一个协议全称为一个超文本传输协议是一个简单的基于请求响应的协议运行在Tcp之上的协议它是不安全的协议。 Reactor模式 Reactor 模式是指通过一个或多个输入同时传递给服务器进行请求处理时的事件驱动处理模式。 服务端程序处理传入多路请求并将它们同步分派给请求对应的处理线程Reactor 模式也叫Dispatcher 模式也可以叫做发布者模式。 分类
单Reacto模式 单I/O多路复用业务处理优点为所有的操作都在一个线程内执行不存在线程安全问题没有死锁问题代码容易编写缺点为无法利用cup多核有效资源资源浪费严重容易达到性能瓶颈。 单Reactor多线程 单I/O多路转接线程池业务处理优点为可以充分发挥cpu多核资源缺点为多个线程之间共享数据比较麻烦单个Reactor承担了所有的事件的监听和响应在单线程之下高并发成为了性能瓶颈。 多Reactor多线程模式 多个I/O多路转接线程池业务处理充分利用CPU多核资源主从Reactor各司其职在主Reactor中处理新连接请求事件有新连接到来则分发到子Reactor中监控在子Reactor中进行客户端通信监控有事件触发则接收数据分发给Worker线程池。 可以充分利用CPU资源将相应交给子Reactor来实现。 目标定位 One Thread One Loop主从Reactor模型高并发服务器 咱们实现的是主从Reactor模型服务器也就是主Reactor线程仅仅监控监听描述符获取新建连接保证获取新连接的高效性提高服务器的并发性能主Reactor获取到新链接之后发布到子Reactor进行通信事件监控子Reactor进行对行的监控各自的文件描述符的增删查改以及上层的业务处理。 One Thread One Loop的思想就是把所有的操作都放到一个线程中进行一个线程对应一个事件处理的循环。 当前实现中因为并不确定组件使用者的使用意向因此并不提供业务层工作线程池的实现只实现主从Reactor而Worker工作线程池可由组件库的使用者的需要自行决定是否使用和实现。 功能模块划分 基于以上的理解我们要实现的是一个带有协议支持的Reactor模型高性能服务器因此将整个项目的实现划分为两个大的模块 • SERVER模块实现Reactor模型的TCP服务器 • 协议模块对当前的Reactor模型服务器提供应用层协议支持。 SERVER模块 SERVER模块SERVER模块就是将所有的连接以及线程进行管理在合适的时候做合适的事情完成服务器的实现。具体的管理分为三个模块 监听连接管理对监听链接进行通信管理。 通信连接管理对正在通信的链接进行管理。 超时连接管理对已经超时的链接进行管理。 基于以上思想又可以划分为多个模块 Buffer 模块 Boffer模块是一个缓冲区模块用于通信过程中接收和发送信息的管理。 Socket 模块 Socket模块主要对创建Socket做一个封装实现网络通信的各项功能。 Channel模块 Channel模块主要对文件描述符需要进行的I/O事件管理模块实现对描述符可读可写错误...事件的管理操作以及Poller模块对描述符进行IO事件监控就绪后根据不同的事件回调不同的处理函数功能。 Connection 模块 Connection模块是对Buffer模块Socket模块Channel模块的一个整体封装实现了对一个通信套接字的整体的管理每一个进行数据通信的套接字也就是accept获取到的新连接都会使用Connection进行管理。 Connection模块内部包含有三个由组件使用者传入的回调函数连接建立完成回调事件回调新数据回调关闭回调。 Connection模块内部包含有两个组件使用者提供的接口数据发送接口连接关闭接口Connection模块内部包含有两个用户态缓冲区用户态接收缓冲区用户态发送缓冲区Connection模块内部包含有一个Socket对象完成描述符面向系统的IO操作Connection模块内部包含有一个Channel对象完成描述符IO事件就绪的处理 具体处理流程如下 1. 实现向Channel提供可读可写错误等不同事件的IO事件回调函数然后将Channel和对应的描述符添加到Poller事件监控中。 2. 当描述符在Poller模块中就绪了IO可读事件则调用描述符对应Channel中保存的读事件处理函数进行数据读取将socket接收缓冲区全部读取到Connection管理的用户态接收缓冲区中。然后调用由组件使用者传入的新数据到来回调函数进行处理。 3. 组件使用者进行数据的业务处理完毕后通过Connection向使用者提供的数据发送接口将数据写入Connection的发送缓冲区中。 4. 启动描述符在Poll模块中的IO写事件监控就绪后调用Channel中保存的写事件处理函数将发送缓冲区中的数据通过Socket进行面向系统的实际数据发送。 Accept 模块 Acceptor模块是对Socket模块Channel模块的一个整体封装实现了对一个监听套接字的整体的管理。 Acceptor模块内部包含有一个Socket对象实现监听套接字的操作。 Acceptor模块内部包含有一个Channel对象实现监听套接字IO事件就绪的处理。 具体流程 1.实现向Channel提供可读事件的IO事件处理回调函数函数的功能其实也就是获取新连接 2. 为新连接构建一个Connection对象出来。 TimerQueue模块 TimerQueue模块是实现固定时间定时任务的模块可以理解就是要给定时任务管理器向定时任务管理器中添加一个任务任务将在固定时间后被执行同时也可以通过刷新定时任务来延迟任务的执行。 这个模块主要是对Connection对象的生命周期管理对非活跃连接进行超时后的释放功能。 TimerQueue模块内部包含有一个timerfdlinux系统提供的定时器。 TimerQueue模块内部包含有一个Channel对象实现对timerfd的IO时间就绪回调处理。 Poller模块 Poller模块是对epoll进行封装的一个模块主要实现epoll的IO事件添加修改移除获取活跃连接功能。 EverlLoop 模块 EventLoop模块可以理解就是我们上边所说的Reactor模块它是对Poller模块TimerQueue模块Socket模块的一个整体封装进行所有描述符的事件监控。 EventLoop模块必然是一个对象对应一个线程的模块线程内部的目的就是运行EventLoop的启动函数。 EventLoop模块为了保证整个服务器的线程安全问题因此要求使用者对于Connection的所有操作一定要在其对应的EventLoop线程内完成不能在其他线程中进行比如组件使用者使用Connection发送数据以及关闭连接这种操作。 EventLoop模块保证自己内部所监控的所有描述符都要是活跃连接非活跃连接就要及时释放避免资源浪费。EventLoop模块内部包含有一个eventfdeventfd其实就是linux内核提供的一个事件fd专门用于事件通知。EventLoop模块内部包含有一个Poller对象用于进行描述符的IO事件监控。EventLoop模块内部包含有一个TimerQueue对象用于进行定时任务的管理。EventLoop模块内部包含有一个PendingTask队列组件使用者将对Connection进行的所有操作都加入到任务队列中由EventLoop模块进行管理并在EventLoop对应的线程中进行执行。每一个Connection对象都会绑定到一个EventLoop上这样能保证对这个连接的所有操作都是在一个线程中完成的。 具体流程 通过Poller模块对当前模块管理内的所有描述符进行IO事件监控有描述符事件就绪后通过描述符对应的Channel进行事件处理。所有就绪的描述符IO事件处理完毕后对任务队列中的所有操作顺序进行执行。由于epoll的事件监控有可能会因为没有事件到来而持续阻塞导致任务队列中的任务不能及时得到执行因此创建了eventfd添加到Poller的事件监控中用于实现每次向任务队列添加任务的时候通过向eventfd写入数据来唤醒epoll的阻塞。 TcpServer模块 这个模块是一个整体Tcp服务器模块的封装内部封装了Acceptor模块EventLoopThrea-dpool模块。TcpServer中包含有一个EventLoop对象以备在超轻量使用场景中不需要EventLoop线程池只需要在主线程中完成所有操作的情况。TcpServer模块内部包含有一个EventLoopThreadPool对象其实就是EventLoop线程池也就是子Reactor线程池。TcpServer模块内部包含有一个Acceptor对象一个TcpServer服务器必然对应有一个监听套接字能够完成获取客户端新连接并处理的任务。TcpServer模块内部包含有一个std::shared_ptrConnection的hash表保存了所有的新建连接对应的Connection注意所有的Connection使用shared_ptr进行管理这样能够保证在hash表中删除了Connection信息后在shared_ptr计数器为0的情况下完成对Connection资源的释放操作。 具体流程 1. 在实例化TcpServer对象过程中完成BaseLoop的设置Acceptor对象的实例化以及EventLoop线程池的实例化以及std::shared_ptrConnection的hash表的实例化。 2. 为Acceptor对象设置回调函数获取到新连接后为新连接构建Connection对象设置 Connection的各项回调并使用shared_ptr进行管理并添加到hash表中进行管理并为 Connection选择一个EventLoop线程为Connection添加一个定时销毁任务为Connection添加事件监控 3. 启动BaseLoop。 SERVER模式图 Http协议模式 Http协议模式HTTP协议模块用于对高并发服务器模块进行协议支持基于提供的协议支持能够更方便的完成指定协议服务器的搭建。而HTTP协议支持模块的实现可以细分为以下几个模块。 Util模块 这个模块是一个工具模块主要提供HTTP协议模块所用到的一些工具函数比如url编解码文件读写....等。 HttpRequest模块 这个模块是HTTP请求数据模块用于保存HTTP请求数据被解析后的各项请求元素信息。 HttpResponse模块 这个模块是HTTP响应数据模块用于业务处理后设置并保存HTTP响应数据的的各项元素信息最终会被按照HTTP协议响应格式组织成为响应信息发送给客户端。 HttpContext模块 这个模块是一个HTTP请求接收的上下文模块主要是为了防止在一次接收的数据中不是一个完整的HTTP请求则解析过程并未完成无法进行完整的请求处理需要在下次接收到新数据后继续根据上下文进行解析最终得到一个HttpRequest请求信息对象因此在请求数据的接收以及解析部分需要一个上下文来进行控制接收和处理节奏。 HttpServer模块 这个模块是最终给组件使用者提供的HTTP服务器模块了用于以简单的接口实现HTTP服务器的搭建。 HttpServer模块内部包含有一个TcpServer对象TcpServer对象实现服务器的搭建 HttpServer模块内部包含有两个提供给TcpServer对象的接口连接建立成功设置上下文接口数据处理接口。 HttpServer模块内部包含有一个hash-map表存储请求与处理函数的映射表组件使用者向 HttpServer设置哪些请求应该使用哪些函数进行处理等TcpServer收到对应的请求就会使用对应的函数进行处理。 2.代码实现
2.1 SERVER服务器模块实现
1.Buffer 模块实现 总缓冲区大小是一定的开始的时候readbuff等于writebuff,为0当写入数据的时候写大小为数剧最后一个位置的后一个位置开始对依然从数据开头开始读读完之后我们不用删除当再次写入的时候写入到上一个数据的最后一个位置的下一个位置读的大小等于写的大小。写继续往后走。如果总体空间不够直接扩容,需要实现的函数接口比较多。
具体代码实现
#define BUFFER_DEFAULT_SIZE 1024
class Buffer
{
private:std::vectorchar _buffer;//总的缓冲区uint64_t _readbuff;//读缓冲区大小uint64_t _writerbuff;//写缓冲区大小
public:Buffer() : _readbuff(0), _writerbuff(0), _buffer(BUFFER_DEFAULT_SIZE) {}//初始化// 起始地址char* Begin(){return (*_buffer.begin());}// 写的起始地址char* GetWritePos(){return Begin() _writerbuff;}// 读的起始地址char* GetReadPos(){return Begin() _readbuff;}// 获取缓冲区末尾空闲空间大小--写偏移之后的空闲空间, 总体空间大小减去写偏移uint64_t TailIndexSize(){return _buffer.size() - _writerbuff;}// 获取缓冲区起始空闲空间大小--读偏移之前的空闲空间uint64_t HeadIndexSize(){return _readbuff;}// 获取可读数据大小 写偏移 - 读偏移uint64_t ReadIndexSize(){return _writerbuff - _readbuff;}// 将读偏移向后移动size大小void MoveReadOff(uint64_t size){if (size 0) return;// 向后移动的大小必须小于可读数据大小assert(size ReadIndexSize());_readbuff size;}// 将写偏移向后移动void MoveWriterOff(uint64_t size){if (size 0) return;assert(size TailIndexSize());_writerbuff size;}//确保有足够的空间进行写void EnsureWriteSpace(uint64_t size){// 末尾空间足够直接返回if (TailIndexSize() size) return;if (TailIndexSize() HeadIndexSize() size){uint64_t rsz ReadIndexSize();//写空间的大小std::copy(GetReadPos(), GetReadPos() rsz, Begin());//直接复制_readbuff 0;// 将读偏移归0_writerbuff rsz; // 将写位置置为可读数据大小 因为当前的可读数据大小就是写偏移量}else{// 总体空间不够则需要扩容不移动数据直接给写偏移之后扩容足够空间即可DBG_LOG(RESIZE %ld, _writerbuff size);//扩容_buffer.resize(_writerbuff size);}}// 写数据void Write(const void* data, uint64_t size){if (size 0) return;EnsureWriteSpace(size);const char* wd (const char* )data;std::copy(wd, wd size, GetWritePos());}// 尾插void WritePush(const void* data, uint64_t size){Write(data, size);MoveWriterOff(size);}//写stringvoid WriteString(const std::string data){return Write(data.c_str(), data.size());}//写数据并插入void WriteStringAndPush(const std::string data){WriteString(data);MoveWriterOff(data.size());}void WriteBuffer(Buffer data){return Write(data.GetReadPos(), data.ReadIndexSize());}void WriteBufferAndPush(Buffer data){WriteBuffer(data);MoveWriterOff(data.ReadIndexSize());}// 读取数据void Read(void *buf, uint64_t size){// 要求要获取的数据大小必须小于可读数据大小assert(size ReadIndexSize());std::copy(GetReadPos(), GetReadPos() size, (char *)buf);}//读并删除void ReadAndPop(void *buf, uint64_t size){Read(buf, size);MoveReadOff(size);}//读std::string ReadAsString(uint64_t size){// 要求要获取的数据大小必须小于可读数据大小assert(size ReadIndexSize());std::string str;str.resize(size);Read(str[0], size);return str;}//读std::string ReadAsStringAndPop(uint64_t size){assert(size ReadIndexSize());std::string str ReadAsString(size);MoveReadOff(size);return str;}//查找CRLFchar* FindCRLF(){char* res (char *)memchr(GetReadPos(), \n, ReadIndexSize());//返回找到的地址return res;}std::string GetLine(){char *pos FindCRLF();if (pos NULL) return ;// 1是为了把换行字符也取出来。return ReadAsString(pos - GetReadPos() 1);}std::string GetLineAndPop(){std::string str GetLine();MoveReadOff(str.size());return str;}// 清空缓冲区void Clear(){// 只需要将偏移量归0即可_readbuff 0;_writerbuff 0;}
};
2.Socket实现
Socket套接字的实现比较简单就是将网络函数接口进行封装Socket的主要功能创建套接字绑定地址和端口号监听是否有链接服务端获取新链接客户端发起连接请求接收数据发送数据关闭套接字端口号是否重用设为非阻塞等。
代码接口
#define MAX_LISTEN 1024
class Socket
{
private:int _socket;
public:Socket() : _socket(-1){}Socket(int socket) : _socket(socket){}~Socket() { Close(); }int Fd() {return _socket;}// 创建socketbool Create(){_socket socket(AF_INET, SOCK_STREAM, 0);if (_socket 0){ERR_LOG(CREATE SOCKET FAILED!!);return false;}return true;}// 绑定地址信息bool Bind(const std::string ip, uint16_t port){struct sockaddr_in local;memset(local, 0, sizeof local);local.sin_family AF_INET;local.sin_port htons(port);local.sin_addr.s_addr inet_addr(ip.c_str());socklen_t len sizeof(local);if (bind(_socket, (struct sockaddr *)local, len) 0){ERR_LOG(BIND ADDRESS FAILED!);return false;}return true;}// 监听地址bool Listen(int backlog MAX_LISTEN){if (listen(_socket, backlog) 0){ERR_LOG(SOCKET LISTEN FAILED!);return false;}else return true;}// 请求连接bool Connect(const std::string ip,const uint16_t port){struct sockaddr_in local;memset(local, 0, sizeof local);local.sin_family AF_INET;local.sin_port htons(port);local.sin_addr.s_addr inet_addr(ip.c_str());socklen_t len sizeof(local);if (connect(_socket, (struct sockaddr *)local, len) 0) // 错误{ERR_LOG(BIND ADDRESS FAILED!);return false;}return true;}// 获取新链接int Accept(){int ret accept(_socket, NULL, NULL);if (ret 0){ERR_LOG(BIND ADDRESS FAILED!);return false;}return ret;}// 读取数据bool Recv(char *buf, ssize_t size, int flag 0){int n recv(_socket, buf, size, flag);if (n 0){if (errno EAGAIN || n EINTR) return 0; // 这次没有数据return false;}return true;}// 非阻塞读取bool NoneBlockRecv(char *buf, ssize_t size){if (size 0) return 0;Recv(buf, size, MSG_DONTWAIT); // MSG_DONTWAIT表示当前为非阻塞}// 发送bool Send(const char* buf, ssize_t size, int flag 0){int n send(_socket, buf, size, flag);if (n 0){if (errno EAGAIN || n EINTR) return 0; // 这次没有数据return false;}return true;}// 非阻塞发送bool NoneBlockSend(char *buf, size_t size){if (size 0) return 0;Send(buf, size, MSG_DONTWAIT);}// 关闭void Close(){if (_socket 0){close(_socket);_socket -1;}}// 创建服务段端bool CreateServer(uint16_t port, const std::string ip 0.0.0.0, bool block_flag false){if (Create() false) return false;if (block_flag) NoneBlock(); // 设为非阻塞状态if (Bind(ip, port) false) return false;if (Listen() false) return false;ReuseAddress();return true;}// 创建客户端bool CreateClient(const uint16_t port, const std::string ip 0.0.0.0){// 1. 创建套接字2.指向连接服务器if (Create() false) return false;if (Connect(ip, port) false) return false;return true;}// 开启地址端口重用void ReuseAddress(){int val 1;setsockopt(_socket, SOL_SOCKET, SO_REUSEADDR, (void *)val, sizeof(int));val 1;setsockopt(_socket, SOL_SOCKET, SO_REUSEPORT, (void *)val, sizeof(int));}// 非阻塞void NoneBlock(){int flag fcntl(_socket, F_GETFL, 0);fcntl(_socket, F_SETFL, flag | O_NONBLOCK);}
}; 3.Channel模块
Chennal 模块是对一个文件描述符的封装现对描述符可读可写错误…事件的管理操作以及Poller模块对描述符进行IO事件监控就绪后根据不同的事件回调不同的处理函数功能。
功能对一个描述符进行监控时间管理意义对于描述符的状态更容易维护对之后的操作流程更加方便功能设计描述符是否可读是否可写监控可读监控可写解除可读解除可写解除所有文件描述符。
代码框架
class Channel {private:int _fd;uint32_t events; // 当前需要监控的事件uint32_t revents; // 当前连接触发的事件using eventCallback std::functionvoid();eventCallback _read_callback; // 可读事件被触发的回调函数eventCallback _error_callback; // 可写事件被触发的回调函数eventCallback _close_callback; // 连接关闭事件被触发的回调函数eventCallback _event_callback; // 任意事件被触发的回调函数eventCallback _write_callback; // 可写事件被触发的回调函数public:Channel(int fd) : fd(_fd) {}int Fd() {return _fd ;}void setReadCallback(const eventCallback cb);//设置读监控void setWriteCallback(const eventCallback cb);//写监控void setErrorCallback(const eventCallback cb);//错误监控void setCloseCallback(const eventCallback cb);//关闭监控void setEventCallback(const eventCallback cb);//任意时间监控bool readAble(); // 当前是否可读bool writeAble(); // 当前是否可写void enableRead(); // 启动读事件监控void enableWrite(); // 启动写事件监控void disableRead(); // 关闭读事件监控void disableWrite(); // 关闭写事件监控void disableAll(); // 关闭所有事件监控void Remove(); //移除监控void handleEvent(); // 事件处理一旦触发了某个事件就调用这个函数};4.Poller模块
描述符事件监控模块对任意描述符进行时间监控就是对epoll函数得封装时期变得更有意义。
功能设计添加事件修改事件移除事件取消定时任务。
封装思想 1. 必须拥有一个epoll的操作句柄 2. 拥有一个struct epoll_event 结构数组监控保存所有的活跃事件 3. 使用hash表管理描述符与描述符对应的事件管理Channnel对象 逻辑流程 1. 对描述符进行监控通过Channnel才能知道描述符监控什么事件 2. 当描述符就绪了通过描述符在hash表中找到对应的Channel得到了Channel才知 道什么事件如何处理当描述符就绪了返回就绪描述符对应的Channel。
代码框架
框架
class Poller {
private:int _epfd;struct epoll_event_evs[xxx];std::unordered_mapint,Channel* mp;
private:// 1. 判断要更新事件的描述符是否存在// 2. 针对epoll直接操作添加修改移除
public:// 1. 添加或者更新描述符所监控的事件void Update(Channel* channel);// 2. 移除描述符所监控的事件void Remove(Channel* )// 3. 开始监控获取就绪Channel
};
*/
/*5.EventLoop模块
这个模块和线程是一一对应的 监听了一个链接如果这个连接一旦就绪就要进行事件处理 但是如果这个描述符在多个线程中都触发了了事件进行处理就会存在线程安全问题 因此我们需要将一个链接的事件监控 以及连接事件处理以及其他操作都放在同一个线程中 如何保证一个连接的所有操作都在eventloop对应的线程中 给eventLOOP模块中都添加一个任务队列 对连接的所有操作都进行一次封装将对连接的操作当作任务都添加到任务队列中
功能进行事件监控的模块一个模块对应一个线程。意义所有的线程都在这个模块中完成功能设计将一个任务添加到任务队列中定时任务的添加取消刷新。
事件功能
事件监控使用Poller模块有事件就绪则进行事件处理执行任务队列中的任务注意点因为有可能因为等待描述符IO事件就绪执行流流程阻塞这个时候任务对立中的任务得不到执行因此得有一个事件通知的东西能够唤醒事件监控的阻塞当事件就绪需要处理的时候处理过程中如果对连接要进行某些操作这些操作必须要在Eventloop对应的线程中进行保证对连接的各项操作都是线程安全的。如果执行的操作就在本线程中不需要将操作压入队列了可以直接执行如果执行的操作不在线程中才需要加入任务池等到事件处理完了之后就行执行任务
设计框架 class Eventloop {
private:std::thread::id _thread_id; // 线程IDint _event_fd // eventfd 唤醒IO事件监控有可能的阻塞Poller _poller; // 进行所有描述符的事件监控using Functor std::functionvoid();std::vectorFunctor _task; // 任务池std::mutex _mutex; // 实现任务池操作的线程安全
public:void runAllTask();
public:Eventloop();void runInLoop(const Functorcb); // 判断将要执行的任务是否处于当前线程中如果是则执行不是则压入队列。void queueInLoop(const Functorcb); // 将操作压入任务池bool isInLoop(); //永远判断当前线程是否是EventLoop所对应的线程void updateEvent(Channel* channel); // 添加/修改描述符的事件监控void removeEvent(Channel* channel); // 移除描述符的监控void Start(); // 任务监控完毕进行处理任务 三步走事件监控-》就绪事件处理-》执行任务};6.Connection模块
Connection模块是对Buffer模块Socket模块Channel模块的⼀个整体封装实现了对一个通信套接字的整体的管理每一个进行数据通信的套接字也就是accept获取到的新连接都会使用 Connection进行管理。 • Connection模块内部包含有三个由组件使用者传入的回调函数连接建立完成回调事件回调 新数据回调关闭回调。 • Connection模块内部包含有两个组件使用者提供的接口数据发送接口连接关闭接口 • Connection模块内部包含有两个用户态缓冲区用户态接收缓冲区用户态发送缓冲区 • Connection模块内部包含有⼀个Socket对象完成描述符面向系统的IO操作 • Connection模块内部包含有⼀个Channel对象完成描述符IO事件就绪的处理
这是对通信套接字进行通信管理的一个模块对一个连接的操作都是通过这个模块来实现的。这各模块本省并不是一个单独的功能模块是一个连接管理的模块。
Connection模块一个连接有任何的事件怎么处理都是有这个模块来进行处理的因为组件的设计也不知道使用者要如何处理事件因此只能是提供一些事件回调函数由使用者设置。
设计框架
DISCONECTED -- 连接关闭状态 CONNECTING -- 连接建立成功-待处理状态
//CONNECTED -- 连接建立完成各种设置已完成可以通信的状态 DISCONNECTING -- 待关闭状态
type enum { // 连接关闭// 连接建立成功 —— 待处理状态// 连接设立完成可以通信// 待关闭状态DISCONECTED,CONNECTING,CONNECTED,DISCONECTING} ConnStatu;
using PreConnection std::shared_ptrConnection;
class Connection {private:uint64_t _conn_id; //连接的唯一ID便于连接的管理和查找bool _enable_inactive_release; // 连接是否启动非活跃销毁的判断标志默认为falseint _sockfd; // 连接关联的文件描述符ConnStatu _statu; // Socket _socket; // 套接字操作管理Channel _channel; // 连接二点事件管理Buffer _in_buffer; // 输入缓冲区 —— 存放从socket中读到的数据buffer _out_buffer; // 输出缓冲区 —— 发送给对端的是数据等到描述符事件可写再发Any _context; // 请求的接受处理上下文/*这四个回调函数是让服务器模块来设置的其实服务器模块的处理回调也是组件使用者设置的*//*换句话说这几个回调都是组件使用者使用的*/using ConnectCallback std::functionvoid(const PreConnection);using MessageCallback std::functionvoid(const PtrConnection, Buffer *);using ClosedCallback std::functionvoid(const PtrConnection);using AnyEventCallback std::functionvoid(const PtrConnection);ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;/*组件内的连接关闭回调--组件内设置的因为服务器组件内会把所有的连接管理起来一旦某个连接要关闭*//*就应该从管理的地方移除掉自己的信息*/ClosedCallback _server_closed_callback;private:// /*五个channel的事件回调函数*///描述符可读事件触发后调用的函数接收socket数据放到接收缓冲区中然后调用_message_callbackvoid HandleRead() {}void HandleRead() {}void HandleClose() {}void HandleError() {}//描述符触发任意事件: 1. 刷新连接的活跃度--延迟定时销毁任务 2. 调用组件使用者的任意事件回调void HandleEvent() { }//连接获取之后所处的状态下要进行各种设置启动读监控,调用回调函数void EstablishedInLoop() { }//这个接口才是实际的释放接口void ReleaseInLoop() {}//这个接口并不是实际的发送接口而只是把数据放到了发送缓冲区启动了可写事件监控void SendInLoop(Buffer buf) {}//这个关闭操作并非实际的连接释放操作需要判断还有没有数据待处理待发送void ShutdownInLoop() {}//启动非活跃连接超时释放规则void EnableInactiveReleaseInLoop(int sec) {}void CancelInactiveReleaseInLoop() {}void UpgradeInLoop(const Any context, const ConnectedCallback conn, const MessageCallback msg, const ClosedCallback closed, const AnyEventCallback event) {_context context;_connected_callback conn;_message_callback msg;_closed_callback closed;_event_callback event;}public:Connection(EventLoop* loop,uint64_t _conn_id,int sockfd) : _sockfd(sockfd),_enable_inactive_release(false), _loop(loop), _statu(CONNECTING), _socket(_sockfd), _channel(loop, _sockfd) {_channel.SetCloseCallback(std::bind(Connection::HandleClose, this));_channel.SetEventCallback(std::bind(Connection::HandleEvent, this));_channel.SetReadCallback(std::bind(Connection::HandleRead, this));_channel.SetWriteCallback(std::bind(Connection::HandleWrite, this));_channel.SetErrorCallback(std::bind(Connection::HandleError, this));}~Connection() { DBG_LOG(RELEASE CONNECTION:%p, this); }//获取管理的文件描述符int Fd() {return _sockfd; }// 获取连接IDint Id() {return _conn_id; }// 是否处于CONNECTED状态bool Connected() { return (_statu CONNECTED); }//设置上下文--连接建立完成时进行调用void SetContext(const Any context) { _context context; }//获取上下文返回的是指针Any *GetContext() { return _context; }void SetConnectedCallback(const ConnectedCallbackcb) { _connected_callback cb; }void SetMessageCallback(const MessageCallbackcb) { _message_callback cb; }void SetClosedCallback(const ClosedCallbackcb) { _closed_callback cb; }void SetAnyEventCallback(const AnyEventCallbackcb) { _event_callback cb; }void SetSrvClosedCallback(const ClosedCallbackcb) { _server_closed_callback cb; }//连接建立就绪后进行channel回调设置启动读监控调用_connected_callbackvoid Established() {}_loop-RunInLoop(std::bind(Connection::EstablishedInLoop, this));// 发送数据将数据放到发送缓冲区启动写事件监控void Send(const char *data, size_t len) {}//提供给组件使用者的关闭接口--并不实际关闭需要判断有没有数据待处理void Shutdown() {}void Release() {}//启动非活跃销毁并定义多长时间无通信就是非活跃添加定时任务void EnableInactiveRelease(int sec) { }//取消非活跃销毁void CancelInactiveRelease() {}void Upgrade(const Any context, const ConnectedCallback conn, const MessageCallback msg, const ClosedCallback closed, const AnyEventCallback event) {_loop-AssertInLoop();_loop-RunInLoop(std::bind(Connection::UpgradeInLoop, this, context, conn, msg, closed, event));}};7.Acception模块
对通信连接做整体管理的一个模块对一个通信连接的模块都是通过这个模块来进行的。实现了对套接字整体的管理。
意义当获取了一个新建连接的描述符后需要为这个通信连接封装一个connection对象设置不同回调。 注意因为Acceptor模块本身并不知道一个链接产生了某个事件该如何处理因此获取一个通信连接后Connection的封装以及事件回调的设置都应该由服务器模块来进行
设计框架
/*
Acceptor 模块对监听套接字进行管理1. 创建一个监听套接字2. 启动读事件监控3. 事件触发后获取新连接4. 调用新连接获取成功后的回调函数4. 为新连接创建Connection进行管理这一步不是Acceptor模块操作应该是服务器模块因为Acceptor模块只进行监听连接的管理因此获取到新连接的描述符后对于新连接描述符如何处理并不关心!对于新连接如何处理应该是服务器模块关心管理服务器模块实现了一个对于新连接描述符处理的函数将这个函数设置给Acceptor模块的回调函数
*/8.LoopThread模块
目标将eventloop模块和线程整合起来 eventloop 和 线程是一一对应的 eventloop 模块实例化的对象在构造的时候就会初始化 _thread_id; 而后面当运行一个操作的时候判断是否运行在eventloop所对应的线程中就是将线程ID与EventLoop模块中的thread_id 进行一个比较相同就表示在同一个线程不同就表示当前运行线程并不是eventloop线程 eventloop 模块在实例化对象的时候必须在线程内部 eventloop 实例化对象会设置自己的 thread_id; 如果我们先创建了多个 eventloop 对象然后创建了多个线程将各自的线程id重新给 eventloop 进行设置 存在问题在构造 eventloop对象到设置新的 thread_id 期间将是不可控的 因此必须先创建线程然后在线程的入口函数中去实例化 eventloop 对象 构造一个新的模块LoopThread。
class LoopThread {private:/*用于实现_loop获取的同步关系避免线程创建了但是_loop还没有实例化之前去获取_loop*/std::mutex _mutex; // 互斥锁std::condition_variable _cond; // 条件变量EventLoop *_loop; // EventLoop指针变量这个对象需要在线程内实例化std::thread _thread; // EventLoop对应的线程private:/*实例化 EventLoop 对象唤醒_cond上有可能阻塞的线程并且开始运行EventLoop模块的功能*/void ThreadEntry() {EventLoop loop;{std::unique_lockstd::mutex lock(_mutex);//加锁_loop loop;_cond.notify_all();}loop.Start();}public:/*创建线程设定线程入口函数*/LoopThread():_loop(NULL), _thread(std::thread(LoopThread::ThreadEntry, this)) {}/*返回当前线程关联的EventLoop对象指针*/EventLoop *GetLoop() {EventLoop *loop NULL;{std::unique_lockstd::mutex lock(_mutex);//加锁_cond.wait(lock, [](){ return _loop ! NULL; });//loop为NULL就一直阻塞loop _loop;}return loop;}
};
9.LoopThreadPool模块
线程数量可配置调节线程数量对线程数量进行管理。提供线程分配的功能。
1.线程数量可配置0或多个 注意事项在服务器中主从Reactor模型是 主线程只负责新连接获取丛书线程负责新连接的事件监控以及处理!因此当前的线程池有可能从属线程会数量为0也就是实现单 Reactor服务器一个线程及负责获取连接以及连接的处理 2. 对所有的线程进行管理其实也就是管理0个或多个LoopThread对象 3. 提供线程分配的功能 4.当主线程获取了一个链接需要将新的线程挂到从属线程上进行事件监控以及管理 5.假设0个从属线程则直接分配给主线程的EventLoop进行处理 6.假设有多个丛书线程则采用RR轮转将对应线程的EventLoop获取到设置给对应的Connection)。
设计框架
class LoopThreadPool {private:int _thread_count;int _next_idx;EventLoop *_baseloop;std::vectorLoopThread* _threads;std::vectorEventLoop * _loops;public:LoopThreadPool(EventLoop *baseloop):_thread_count(0), _next_idx(0), _baseloop(baseloop) {}void SetThreadCount(int count) { _thread_count count; }void Create() {if (_thread_count 0) {_threads.resize(_thread_count);_loops.resize(_thread_count);for (int i 0; i _thread_count; i) {_threads[i] new LoopThread();_loops[i] _threads[i]-GetLoop();}}return ;}EventLoop *NextLoop() {if (_thread_count 0) {return _baseloop;}_next_idx (_next_idx 1) % _thread_count;return _loops[_next_idx];}
};10.TcpServer模块
对之前所有的模块进行整合完成一个服务器的搭建。
实现思想
1.管理
Acceptor对象创建一个监听套接字EventLoop 对象baseloop对象实现对监听套接字的事件监控std::vector conns,实现对新建连接的管理EventLoopPool 对象创建loop线程池对新建连接进行事件监控和处理
2.流程
在TcpServer中实例一个Acceptor对象以及一个EventLoop 对象baseloop)将Acceptor 挂在baseloop 进行事件监控一旦Acceptor 对象就绪了可读事件则执行时间回调函数获取新建连接对新连接创造一个 Connection 进行管理对新连接对应的 Connection 设置功能回调 连接完成回调消息回调关闭回调任意事件监控启动Connettion 的非活跃链接的超时销毁功能.将新连接对应的Connection 挂到 LoopThreadPool 中的丛书线程对应的Eventloop 中进行事件监控一旦Connection对应的链接就绪了可读事件则这个时候执行读事件回调函数读取数据读取完毕后调用TcpServer设置的消息回调
三功能设计
设置从属线程池数量启动服务器设置各种回调函数连接建立完成消息关闭任意 用户设置给TcpServer TcpServer设置获取的新连接是否启动非活跃连接超时销毁功能添加任务
设计框架
class TcpServer {private:uint64_t _next_id; //这是一个自动增长的连接IDint _port;int _timeout; //这是非活跃连接的统计时间---多长时间无通信就是非活跃连接bool _enable_inactive_release;//是否启动了非活跃连接超时销毁的判断标志EventLoop _baseloop; //这是主线程的EventLoop对象负责监听事件的处理Acceptor _acceptor; //这是监听套接字的管理对象LoopThreadPool _pool; //这是从属EventLoop线程池std::unordered_mapuint64_t, PtrConnection _conns;//保存管理所有连接对应的shared_ptr对象using ConnectedCallback std::functionvoid(const PtrConnection);using MessageCallback std::functionvoid(const PtrConnection, Buffer *);using ClosedCallback std::functionvoid(const PtrConnection);using AnyEventCallback std::functionvoid(const PtrConnection);using Functor std::functionvoid();ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;private:void RunAfterInLoop(const Functor task, int delay) {}//为新连接构造一个Connection进行管理void NewConnection(int fd) {}void RemoveConnectionInLoop(const PtrConnection conn) {}//从管理Connection的_conns中移除连接信息void RemoveConnection(const PtrConnection conn) {}public:TcpServer(int port):_port(port), _next_id(0), _enable_inactive_release(false), _acceptor(_baseloop, port),_pool(_baseloop) {_acceptor.SetAcceptCallback(std::bind(TcpServer::NewConnection, this, std::placeholders::_1));}void SetThreadCount(int count) { }void SetConnectedCallback(const ConnectedCallbackcb) { }void SetMessageCallback(const MessageCallbackcb) { }void SetClosedCallback(const ClosedCallbackcb) {}void SetAnyEventCallback(const AnyEventCallbackcb) { }void EnableInactiveRelease(int timeout) { }//用于添加一个定时任务void RunAfter(const Functor task, int delay) {}void Start() { }
};2.2 Http模块实现
Http模块是处于应用层的一个简单的协议包含五部分每部分有不同的内容。
1.Util的处理
目的目的:实现一些工具接口读取文件内容向文件写入内容URL编码URL解码通过HTTP状态码获取描述信息通过文件后缀名获取mime判断一个文件是不是目录判断一个文是否是一个普通文件HTTP资源路径有效性判断
框架设计
class Util {public:// 字符串分割函数size_t Spilt();// 读取文件内容static bool ReadFile() {}// 向文件写入内容static bool WriteFile();// URL编码static bool UrlEncode();// URL解码static bool UrlDecode();// 通过HTTP状态码获取描述信息static std::string StatusDesc();// 根据文件后缀名获取文件MINEstatic std::string ExtMine();// 判断一个文件是不是目录static bool IsDirectory();//判断一个文件是否是一个普通文件static bool IsRegular();//HTTP资源路径有效性判断static bool VaildPath();
};2.HttpRequest
目的存储Http请求信息的接收到一个数据按照HTTP请求格式进行解析得到各个关键要素放到Request中。
功能
HttpRequest模块存储HTTP请求信息接收到一个数据按照HTTP请求格式进行解析得到各个关键要素放到Request中HttpResponse模块存储HTTP响应信息进行业务处理的同时让使用者向Response中填充响应要素完毕后将其组织成HTTP响应格式的数据发给客户端。
class HttpRequest {public:std::string _method; //请求方法std::string _path; //资源路径std::string _version; //协议版本std::string _body; //请求正文std::smatch _matches; //资源路径的正则提取数据std::unordered_mapstd::string, std::string _headers; //头部字段std::unordered_mapstd::string, std::string _params; //查询字符串public:HttpRequest():_version(HTTP/1.1) {}void ReSet() {_method.clear();_path.clear();_version HTTP/1.1;_body.clear();std::smatch match;_matches.swap(match);_headers.clear();_params.clear();}//插入头部字段void SetHeader(const std::string key, const std::string val) {_headers.insert(std::make_pair(key, val));}//判断是否存在指定头部字段bool HasHeader(const std::string key) const {auto it _headers.find(key);if (it _headers.end()) {return false;}return true;}//获取指定头部字段的值std::string GetHeader(const std::string key) const {auto it _headers.find(key);if (it _headers.end()) {return ;}return it-second;}//插入查询字符串void SetParam(const std::string key, const std::string val) {_params.insert(std::make_pair(key, val));}//判断是否有某个指定的查询字符串bool HasParam(const std::string key) const {auto it _params.find(key);if (it _params.end()) {return false;}return true;}//获取指定的查询字符串std::string GetParam(const std::string key) const {auto it _params.find(key);if (it _params.end()) {return ;}return it-second;}//获取正文长度size_t ContentLength() const {// Content-Length: 1234\r\nbool ret HasHeader(Content-Length);if (ret false) {return 0;}std::string clen GetHeader(Content-Length);return std::stol(clen);}//判断是否是短链接bool Close() const {// 没有Connection字段或者有Connection但是值是close则都是短链接否则就是长连接if (HasHeader(Connection) true GetHeader(Connection) keep-alive) {return false;}return true;}
};3.Http