网站建设公司的流程,展示型企业网站设计方案,网站开发学哪一个好,浙江省建设工程质量协会网站io_uring的异步IO机制 io_uring 原理io_uring接口应用性能测试 io_uring 原理
io_uring 是 Linux 内核 5.1 版本引入的全新异步 I/O 接口#xff0c;由 Jens Axboe 开发。它通过两个共享环形缓冲区(ring buffers)在内核和用户空间之间高效传递 I/O 请求和完成事件#xff0c… io_uring的异步IO机制 io_uring 原理io_uring接口应用性能测试 io_uring 原理
io_uring 是 Linux 内核 5.1 版本引入的全新异步 I/O 接口由 Jens Axboe 开发。它通过两个共享环形缓冲区(ring buffers)在内核和用户空间之间高效传递 I/O 请求和完成事件避免了传统 AIO 的各种限制。
我们之前了解过在网络高并发的场景下epoll 在这方面的性能是独树一帜通过 epoll 的工作模式实现了 reactor 这种事件触发的机制在 reactor 中异步的实现我们可以通过多线程也可以通过协程去进行实现。但是本质上最后还是调用到 read/write/recv/send 这样的接口来完成数据的收发工作。 理解read/write/recv/send本质 他们作为系统调用函数其实本质上还是一种拷贝函数如果底层缓冲区中有数据就会调用这些接口如果没有数据就会等待数据就绪或者设置为非阻塞的状态不等就直接返回一个错误码表示此时数据还未就绪。
IO 操作本质上就是 “等 拷贝” 的操作单纯的去看待 read/write/recv/send 这些函数其实他返回请求接收数据和返回数据这些操作都是一起的。 在 epoll 的处理中我们可以看见他可以一次性处理多个文件描述符将这种等的操作重叠了起来透过他的底层实现我们可以发现他其实是将 “等 拷贝” 的操作实现了一种分离被监听的 fd 与就绪的 fd 实则是两种不同的数据结构进行处理构成了一种生产者消费者模型也就是说 “等” 其实在 epoll 的接口中就已经完成了而我们调用read/write/recv/send 函数的时候就不用再等了数据已经就绪由于这种可以对多个 fd 进行处理的机制加上使用回调函数进行处理使得epoll 的功能就很强大。 如何理解io_uring 对于 io_uring 来说其实更倾向于将这种 IO 操作实现成异步的在 io_uring 的实现当中提供了两个队列结构一个是提交队列SQ另一个是完成队列(CQ)注意这两个队列都是环形队列结构。
提交队列的作用就是提交 IO 请求完成队列的作用就是内核通知完成的 I/O 操作假设当前有 100 个客户端发起读的请求在 io_uring 的工作方式中会将这 100 个 IO 请求先 push 到提交队列当中然后进行处理然后处理完成的在 push 到完成队列当中返回结果他也是由不同线程去完成的。这两个队列干的事两件不同的事情从而产生了异步的效果。 由于 io_uring 的内部使用 mmap 去进行实现这种方式是他在整个过程中也只会进行一次的数据拷贝无异于也是对效率的一个提升而且通过这种无锁的环形队列接口减少了频繁进行加锁解锁的消耗这对于高并发的场景无异于是一个巨大的提升其实这两个流程也是一个典型的生产者消费者模型。
io_uring接口应用
io_uring 在这儿主要提供了 3 个系统调用接口
int io_uring_setup(unsigned entries, struct io_uring_params *params);io_uring_setup 是 Linux 内核提供的系统调用用于初始化一个异步 I/O 上下文io_uring 实例参数如下
entries指定提交队列SQ和完成队列CQ的初始大小条目数。通常为 2 的幂次方内核可能会调整实际大小。params指向 io_uring_params 结构的指针用于传递配置参数并返回队列信息。结构定义如下
struct io_uring_params {__u32 sq_entries; // 内核实际分配的 SQ 大小__u32 cq_entries; // 内核实际分配的 CQ 大小__u32 flags; // 配置标志如 IORING_SETUP_IOPOLL__u32 sq_thread_cpu; // 绑定 SQ 线程的 CPU__u32 sq_thread_idle; // SQ 线程空闲超时毫秒__u32 features; // 内核返回的支持特性__u32 resv[4];struct io_sqring_offsets sq_off; // SQ 环的偏移信息struct io_cqring_offsets cq_off; // CQ 环的偏移信息
};
返回值
成功时返回一个文件描述符fd代表创建的 io_uring 实例失败时返回 -1 并设置 errno。
int io_uring_enter(unsigned int fd, unsigned int to_submit,unsigned int min_complete, unsigned int flags, sigset_t *sig); io_uring_enter 用于提交 I/O 操作请求或等待已完成事件参数如下
fd: 关联的 io_uring 实例的文件描述符。to_submit: 准备提交的 I/O 操作数量。min_complete: 要求内核等待至少完成的事件数若 flags 包含 IORING_ENTER_GETEVENTS。flags: 控制行为的标志位如 IORING_ENTER_GETEVENTS。sig: 等待时临时屏蔽的信号集可为 NULL。
int io_uring_register(unsigned int fd, unsigned int opcode, void *arg, unsigned int nr_args);io_uring_register 是 Linux 内核提供的系统调用syscall用于为 io_uring 实例注册资源如文件描述符、缓冲区等以优化异步 I/O 操作的性能参数如下
fd: io_uring 实例的文件描述符由 io_uring_setup 创建。opcode: 注册操作的类型如 IORING_REGISTER_BUFFERS注册缓冲区或 IORING_REGISTER_FILES注册文件描述符。arg: 指向用户空间数据的指针具体内容取决于 opcode。nr_args: arg 指向的数组中的条目数。
其中opcode 的类型有如下几种
IORING_REGISTER_BUFFERS注册固定缓冲区用于减少 read/write 操作中的内核-用户空间数据拷贝。IORING_REGISTER_FILES注册文件描述符避免每次 I/O 操作重复传递文件描述符。IORING_REGISTER_EVENTFD注册事件文件描述符eventfd用于异步通知 I/O 完成事件。IORING_REGISTER_PROBE检查内核支持的 io_uring 功能需配合 struct io_uring_probe。
io_uring 的库其实对这三个函数进行了封装然后提供给我们一个使用的库。
接下来我们实际写一段代码来看一下
#include stdio.h
#include sys/socket.h
#include liburing.h
#include sys/types.h
#include netinet/in.h
#include string.h
#include unistd.h#define ENTRIES_LENGTH 1024
#define BUFFER_LENGTH 1024#define ACCEPT_EVENT 0
#define READ_EVENT 1
#define WRITE_EVENT 2struct conn_info
{int fd;int event;
};int init_server(int port)
{// 创建socket连接int sockfd socket(AF_INET, SOCK_STREAM, 0);struct sockaddr_in serveraddr;serveraddr.sin_family AF_INET;serveraddr.sin_addr.s_addr htonl(INADDR_ANY); // 0.0.0.0serveraddr.sin_port htons(port);// 绑定套接字int ret bind(sockfd, (struct sockaddr *)serveraddr, sizeof(struct sockaddr));if (ret -1){perror(bind);return -1;}// 监听套接字listen(sockfd, 10);return sockfd;
}int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr, socklen_t *len, int flags)
{// 接受队列事件结构, 调用 io_uring_get_sqe 接口提交的 IO 事件struct io_uring_sqe *sqe io_uring_get_sqe(ring);struct conn_info accept_info {.fd sockfd,.event ACCEPT_EVENT};// 用于准备一个异步接受连接accept的请求io_uring_prep_accept(sqe, sockfd, addr, len, flags);memcpy(sqe-user_data, accept_info, sizeof(struct conn_info));return 1;
}int set_event_recv(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags)
{struct io_uring_sqe *sqe io_uring_get_sqe(ring);struct conn_info accept_info {.fd sockfd,.event READ_EVENT};// 准备一个接收数据的操作请求io_uring_prep_recv(sqe, sockfd, buf, len, flags);return 1;
}int main()
{unsigned short port 9999;int sockfd init_server(port);struct io_uring_params params;memset(params, 0, sizeof(params));struct io_uring ring;// 先构建两个队列出来io_uring_queue_init_params(ENTRIES_LENGTH, ring, params);
#if 0// 建立与客户端的连接struct sockaddr_in clientaddr;socklen_t len sizeof(clientaddr);accept(sockfd, (struct sockaddr*)clientaddr, len);
#else// 建立与客户端的连接struct sockaddr_in clientaddr;socklen_t len sizeof(clientaddr);set_event_accept(ring, sockfd, (struct sockaddr *)clientaddr, len, 0);#endifchar buffer[BUFFER_LENGTH] {0};while (1){// 内部实现就是 io_uring_enter用于提交 IO 请求io_uring_submit(ring);// 创建一个完成队列事件结构通过 io_uring_wait_cqe// 获取完成 IO 操作的事件struct io_uring_cqe *cqe;io_uring_wait_cqe(ring, cqe);// 批量获取完成 IO 操作的事件struct io_uring_cqe *cqes[128];int nready io_uring_peek_batch_cqe(ring, cqes, 128);int i 0;for (i 0; i nready; i){// 获取到 accept 事件的入口struct io_uring_cqe *entries cqes[i];struct conn_info result;memcpy(result, entries-user_data, sizeof(struct conn_info));if (result.event ACCEPT_EVENT){printf(set_event_accept\n);}}}return 0;
}接口介绍 其中io_uring_prep_accept 接口作用就是准备一个异步接受连接accept的请求。
void io_uring_prep_accept(struct io_uring_sqe *sqe, int sockfd, struct sockaddr *addr,socklen_t *addrlen, int flags);参数介绍
sqe: 指向 io_uring 提交队列条目Submission Queue Entry的指针。sockfd: 监听套接字的文件描述符。addr: 用于存储客户端地址信息的结构体指针可选可为 NULL。addrlen: 输入时为 addr 的缓冲区大小输出时为实际地址长度。flags: 额外的标志位如 SOCK_NONBLOCK 或 SOCK_CLOEXEC。
返回值处理
通过 io_uring_wait_cqe 等待完成事件后cqe-res 为返回的客户端文件描述符。若返回值 0表示错误如 -EINVAL 或 -EBADF
io_uring_sqe 结构体用于描述一个待提交的 I/O 操作。每个 io_uring_sqe 对应一个异步 I/O 请求如读写、网络操作等通过填充该结构并提交到提交队列Submission Queue, SQ。
struct io_uring_sqe {__u8 opcode; // 操作类型如 IORING_OP_READ, IORING_OP_WRITE__u8 flags; // 请求标志如 IOSQE_FIXED_FILE, IOSQE_IO_LINK__u16 ioprio; // I/O 优先级__s32 fd; // 文件描述符__u64 off; // 文件偏移量__u64 addr; // 用户态缓冲区地址读写操作__u32 len; // 操作长度union {__kernel_rwf_t rw_flags; // 读写标志如 RWF_NOWAIT__u32 fsync_flags;__u16 poll_events;};__u64 user_data; // 用户自定义数据用于回调识别union {__u16 buf_index; // 固定缓冲区的索引IORING_OP_READ_FIXED__u64 __pad2[3];};
};关键字段说明
opcode指定操作类型常见值包括 IORING_OP_READ/IORING_OP_WRITE文件读写。 IORING_OP_SEND/IORING_OP_RECV网络通信。 IORING_OP_POLL_ADD事件监听。flags控制请求行为例如 IOSQE_FIXED_FILE使用固定文件描述符预先注册的文件表。 IOSQE_IO_LINK链接多个请求形成依赖链。user_data用于在完成事件CQE中标识请求的唯一值。
使用方法
获取空闲 SQE通过 io_uring_get_sqe 从提交队列中获取一个空闲条目。设置操作参数填充 opcode、fd、addr、len 等字段。提交请求调用 io_uring_submit 将请求提交到内核。
运行程序我们就会发现一个有意思的现象此时一直在打印 set_event_accept 这条信息。
原因就在于在当前这个循环中我们并没有将已完成的队列中特定的条目给回收掉当循环回去以后此时又继续通知处理该条目就会一直打印此时就需要用到 io_uring_cq_advance 接口
void io_uring_cq_advance(struct io_uring *ring, unsigned nr); io_uring_cq_advance 接口用于通知内核用户空间已处理完成队列Completion Queue, CQ中的特定条目允许内核回收相关资源。
参数解析
ring: 指向 io_uring 实例的指针。nr: 需要推进的完成队列条目数量通常为已处理的条目数。
它的作用就在于
推进完成队列每次从 CQ 中取出并处理一个事件后需调用此函数更新队列头指针避免重复处理同一事件。资源管理内核会回收已标记为 “完成” 的条目释放相关资源如内存。
添加这个接口以后就正常了但是此时就存在一个问题我们将这个事件标记为完成以后后续就不会再发送 accept 请求了所以在这儿就需要我们每一次都发送一个 accept 请求所以这儿也是需要进行修改的修改后代码如下
#include stdio.h
#include sys/socket.h
#include liburing.h
#include sys/types.h
#include netinet/in.h
#include string.h
#include unistd.h#define ENTRIES_LENGTH 1024
#define BUFFER_LENGTH 1024#define ACCEPT_EVENT 0
#define READ_EVENT 1
#define WRITE_EVENT 2struct conn_info
{int fd;int event;
};int init_server(int port)
{// 创建socket连接int sockfd socket(AF_INET, SOCK_STREAM, 0);struct sockaddr_in serveraddr;serveraddr.sin_family AF_INET;serveraddr.sin_addr.s_addr htonl(INADDR_ANY); // 0.0.0.0serveraddr.sin_port htons(port);// 绑定套接字int ret bind(sockfd, (struct sockaddr *)serveraddr, sizeof(struct sockaddr));if (ret -1){perror(bind);return -1;}// 监听套接字listen(sockfd, 10);return sockfd;
}int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr, socklen_t *len, int flags)
{// 接受队列事件结构, 调用 io_uring_get_sqe 接口提交的 IO 事件struct io_uring_sqe *sqe io_uring_get_sqe(ring);struct conn_info accept_info {.fd sockfd,.event ACCEPT_EVENT};// 用于准备一个异步接受连接accept的请求io_uring_prep_accept(sqe, sockfd, addr, len, flags);memcpy(sqe-user_data, accept_info, sizeof(struct conn_info));return 1;
}int main()
{unsigned short port 9999;int sockfd init_server(port);struct io_uring_params params;memset(params, 0, sizeof(params));struct io_uring ring;// 先构建两个队列出来io_uring_queue_init_params(ENTRIES_LENGTH, ring, params);
#if 0// 建立与客户端的连接struct sockaddr_in clientaddr;socklen_t len sizeof(clientaddr);accept(sockfd, (struct sockaddr*)clientaddr, len);
#else// 建立与客户端的连接struct sockaddr_in clientaddr;socklen_t len sizeof(clientaddr);set_event_accept(ring, sockfd, (struct sockaddr *)clientaddr, len, 0);#endifchar buffer[BUFFER_LENGTH] {0};while (1){// 内部实现就是 io_uring_enter用于提交 IO 请求io_uring_submit(ring);// 创建一个完成队列事件结构通过 io_uring_wait_cqe// 获取完成 IO 操作的事件struct io_uring_cqe *cqe;io_uring_wait_cqe(ring, cqe);// 批量获取完成 IO 操作的事件struct io_uring_cqe *cqes[128];int nready io_uring_peek_batch_cqe(ring, cqes, 128);int i 0;for (i 0; i nready; i){// 获取到 accept 事件的入口struct io_uring_cqe *entries cqes[i];struct conn_info result;memcpy(result, entries-user_data, sizeof(struct conn_info));if (result.event ACCEPT_EVENT){// 保证每一次都会有 accept 请求set_event_accept(ring, sockfd, (struct sockaddr *)clientaddr, len, 0);printf(set_event_accept\n);}}// 避免重复处理同一事件io_uring_cq_advance(ring, nready);}return 0;
}运行代码正常连接断开再次进行连接也不会存在问题 接下来就需要接收到这个信息我们在这儿使用的也是 io_uring 库里面提供的函数 io_uring_prep_recv io_uring_prep_recv 用于准备一个接收数据的操作请求
void io_uring_prep_recv(struct io_uring_sqe *sqe, int sockfd, void *buf, size_t len, int flags);参数如下
sqe: 指向 io_uring_sqe 结构的指针表示提交队列条目Submission Queue Entry。sockfd: 文件描述符通常是套接字。buf: 缓冲区指针用于存储接收到的数据。len: 缓冲区的长度。flags: 接收操作的标志与 recv(2) 系统调用中的 flags 参数相同。
代码改写如下
#include stdio.h
#include sys/socket.h
#include liburing.h
#include sys/types.h
#include netinet/in.h
#include string.h
#include unistd.h#define ENTRIES_LENGTH 1024
#define BUFFER_LENGTH 1024#define ACCEPT_EVENT 0
#define READ_EVENT 1
#define WRITE_EVENT 2struct conn_info
{int fd;int event;
};int init_server(int port)
{// 创建socket连接int sockfd socket(AF_INET, SOCK_STREAM, 0);struct sockaddr_in serveraddr;serveraddr.sin_family AF_INET;serveraddr.sin_addr.s_addr htonl(INADDR_ANY); // 0.0.0.0serveraddr.sin_port htons(port);// 绑定套接字int ret bind(sockfd, (struct sockaddr *)serveraddr, sizeof(struct sockaddr));if (ret -1){perror(bind);return -1;}// 监听套接字listen(sockfd, 10);return sockfd;
}int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr, socklen_t *len, int flags)
{// 接受队列事件结构, 调用 io_uring_get_sqe 接口提交的 IO 事件struct io_uring_sqe *sqe io_uring_get_sqe(ring);struct conn_info accept_info {.fd sockfd,.event ACCEPT_EVENT};// 用于准备一个异步接受连接accept的请求io_uring_prep_accept(sqe, sockfd, addr, len, flags);memcpy(sqe-user_data, accept_info, sizeof(struct conn_info));return 1;
}int set_event_recv(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags)
{struct io_uring_sqe *sqe io_uring_get_sqe(ring);struct conn_info accept_info {.fd sockfd,.event READ_EVENT};// 准备一个接收数据的操作请求io_uring_prep_recv(sqe, sockfd, buf, len, flags);memcpy(sqe-user_data, accept_info, sizeof(struct conn_info));return 1;
}int main()
{unsigned short port 9999;int sockfd init_server(port);struct io_uring_params params;memset(params, 0, sizeof(params));struct io_uring ring;// 先构建两个队列出来io_uring_queue_init_params(ENTRIES_LENGTH, ring, params);
#if 0// 建立与客户端的连接struct sockaddr_in clientaddr;socklen_t len sizeof(clientaddr);accept(sockfd, (struct sockaddr*)clientaddr, len);
#else// 建立与客户端的连接struct sockaddr_in clientaddr;socklen_t len sizeof(clientaddr);set_event_accept(ring, sockfd, (struct sockaddr *)clientaddr, len, 0);#endifchar buffer[BUFFER_LENGTH] {0};while (1){// 内部实现就是 io_uring_enter用于提交 IO 请求io_uring_submit(ring);// 创建一个完成队列事件结构通过 io_uring_wait_cqe// 获取完成 IO 操作的事件struct io_uring_cqe *cqe;io_uring_wait_cqe(ring, cqe);// 批量获取完成 IO 操作的事件struct io_uring_cqe *cqes[128];int nready io_uring_peek_batch_cqe(ring, cqes, 128);int i 0;for (i 0; i nready; i){// 获取到 accept 事件的入口struct io_uring_cqe *entries cqes[i];struct conn_info result;memcpy(result, entries-user_data, sizeof(struct conn_info));if (result.event ACCEPT_EVENT){// 保证每一次都会有 accept 请求set_event_accept(ring, sockfd, (struct sockaddr *)clientaddr, len, 0);// printf(set_event_accept\n);int connfd entries-res;set_event_recv(ring, connfd, buffer, BUFFER_LENGTH, 0);}else if (result.event READ_EVENT){int ret entries-res;printf(set_event_recv ret: %d, %s\n, ret, buffer);}}// 避免重复处理同一事件io_uring_cq_advance(ring, nready);}return 0;
}运行程序此时就可以看见我们的服务端是可以正常的接收到消息的但是此时只能接受一次后续客户端继续发送我们又接受不到了而且我们也不支持回发消息 我们需要解决上面的问题就需要调用回发数据的接口回发数据以后更新状态当前又需要接收到客户端所发的数据就像前面 accept 的时候一样这儿我们每一次都是需要发起一个 recv 的请求的否则就会被标记为已完成的事件。
完整代码如下
#include stdio.h
#include sys/socket.h
#include liburing.h
#include sys/types.h
#include netinet/in.h
#include string.h
#include unistd.h#define ENTRIES_LENGTH 1024
#define BUFFER_LENGTH 1024#define ACCEPT_EVENT 0
#define READ_EVENT 1
#define WRITE_EVENT 2struct conn_info
{int fd;int event;
};int init_server(unsigned short port)
{// 创建socket连接int sockfd socket(AF_INET, SOCK_STREAM, 0);struct sockaddr_in serveraddr;serveraddr.sin_family AF_INET;serveraddr.sin_addr.s_addr htonl(INADDR_ANY); // 0.0.0.0serveraddr.sin_port htons(port);// 绑定套接字int ret bind(sockfd, (struct sockaddr *)serveraddr, sizeof(struct sockaddr));if (ret -1){perror(bind);return -1;}// 监听套接字listen(sockfd, 10);return sockfd;
}int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr, socklen_t *len, int flags)
{// 接受队列事件结构, 调用 io_uring_get_sqe 接口提交的 IO 事件struct io_uring_sqe *sqe io_uring_get_sqe(ring);struct conn_info accept_info {.fd sockfd,.event ACCEPT_EVENT,};// 用于准备一个异步接受连接accept的请求io_uring_prep_accept(sqe, sockfd, addr, len, flags);memcpy(sqe-user_data, accept_info, sizeof(struct conn_info));return 1;
}int set_event_recv(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags)
{struct io_uring_sqe *sqe io_uring_get_sqe(ring);struct conn_info accept_info {.fd sockfd,.event READ_EVENT,};// 准备一个接收数据的操作请求io_uring_prep_recv(sqe, sockfd, buf, len, flags);memcpy(sqe-user_data, accept_info, sizeof(struct conn_info));return 1;
}int set_event_send(struct io_uring *ring, int sockfd, const void *buf, size_t len, int flags)
{struct io_uring_sqe *sqe io_uring_get_sqe(ring);struct conn_info accept_info {.fd sockfd,.event WRITE_EVENT,};io_uring_prep_send(sqe, sockfd, buf, len, flags);memcpy(sqe-user_data, accept_info, sizeof(struct conn_info));
}int main()
{unsigned short port 9999;int sockfd init_server(port);struct io_uring_params params;memset(params, 0, sizeof(params));struct io_uring ring;// 先构建两个队列出来io_uring_queue_init_params(ENTRIES_LENGTH, ring, params);
#if 0// 建立与客户端的连接struct sockaddr_in clientaddr;socklen_t len sizeof(clientaddr);accept(sockfd, (struct sockaddr*)clientaddr, len);
#else// 建立与客户端的连接struct sockaddr_in clientaddr;socklen_t len sizeof(clientaddr);set_event_accept(ring, sockfd, (struct sockaddr *)clientaddr, len, 0);#endifchar buffer[BUFFER_LENGTH] {0};while (1){// 内部实现就是 io_uring_enter用于提交 IO 请求io_uring_submit(ring);// 创建一个完成队列事件结构通过 io_uring_wait_cqe// 获取完成 IO 操作的事件// struct io_uring_cqe *cqe;// io_uring_wait_cqe(ring, cqe);// 批量获取完成 IO 操作的事件struct io_uring_cqe *cqes[128];int nready io_uring_peek_batch_cqe(ring, cqes, 128);int i 0;for (i 0; i nready; i){// 获取到已完成 IO 事件的入口struct io_uring_cqe *entries cqes[i];struct conn_info result;memcpy(result, entries-user_data, sizeof(struct conn_info));if (result.event ACCEPT_EVENT){// 保证每一次都会有 accept 请求set_event_accept(ring, sockfd, (struct sockaddr *)clientaddr, len, 0);printf(set_event_accept\n);int connfd entries-res;printf(connfd: %d\n, connfd);set_event_recv(ring, connfd, buffer, BUFFER_LENGTH, 0);}else if (result.event READ_EVENT){int ret entries-res;printf(set_event_recv ret: %d, %s\n, ret, buffer);if (ret 0){close(result.fd);}else if (ret 0){set_event_send(ring, result.fd, buffer, BUFFER_LENGTH, 0);}}else if (result.event WRITE_EVENT){int ret entries-res;printf(set_event_send ret: %d, %s\n, ret, buffer);set_event_recv(ring, result.fd, buffer, BUFFER_LENGTH, 0);}}// 避免重复处理同一事件io_uring_cq_advance(ring, nready);}return 0;
}运行代码创建 3 个客户端此时每个客户端都可以连接上对于每次发送的消息客户端也可以接收到
性能测试
接下来我们编写一段客户端代码对 io_uring 的性能进行一下测试
#include stdio.h
#include string.h
#include sys/socket.h
#include stdlib.h
#include unistd.h
#include getopt.h#include sys/time.h
#include pthread.h
#include arpa/inet.h#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 (tv1.tv_usec - tv2.tv_usec) / 1000)
#define TEST_MESSAGE ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890abcdefghijklmnopqrstuvwxyz\r\n
#define RBUFFER_LENGTH 2048
#define WBUFFER_LENGTH 2048typedef struct test_context_s {char serverip[16];int port;int threadnum;int connection;int requestion;#if 1int failed;
#endif} test_context_t;int connect_tcpserver(const char *ip, unsigned short port) {int connfd socket(AF_INET, SOCK_STREAM, 0);struct sockaddr_in tcpserver_addr;memset(tcpserver_addr, 0, sizeof(struct sockaddr_in));tcpserver_addr.sin_family AF_INET;tcpserver_addr.sin_addr.s_addr inet_addr(ip);tcpserver_addr.sin_port htons(port);int ret connect(connfd, (struct sockaddr*)tcpserver_addr, sizeof(struct sockaddr_in));if (ret) {perror(connect);return -1;}return connfd;
}int send_recv_tcppkt(int fd) {char wbuffer[WBUFFER_LENGTH] {0};int i 0;for (i 0;i 16;i ) {strcpy(wbuffer i * strlen(TEST_MESSAGE), TEST_MESSAGE);}int res send(fd, wbuffer, strlen(wbuffer), 0);if (res 0) {exit(1);}char rbuffer[RBUFFER_LENGTH] {0};res recv(fd, rbuffer, RBUFFER_LENGTH, 0);if (res 0) {exit(1);}if (strcmp(rbuffer, wbuffer) ! 0) {printf(failed: %s ! %s\n, rbuffer, wbuffer);return -1;}return 0;
}static void *test_qps_entry(void *arg) {test_context_t *pctx (test_context_t*)arg;int connfd connect_tcpserver(pctx-serverip, pctx-port);if (connfd 0) {printf(connect_tcpserver failed\n);return NULL;}int count pctx-requestion / pctx-threadnum;int i 0;int res;while (i count) {res send_recv_tcppkt(connfd);if (res ! 0) {printf(send_recv_tcppkt failed\n);pctx-failed ; // continue;}}return NULL;
}// ./test_qps_tcpclient -s 127.0.0.1 -p 2048 -t 50 -c 100 -n 10000
int main(int argc, char *argv[]) {int ret 0;test_context_t ctx {0};int opt;while ((opt getopt(argc, argv, s:p:t:c:n:?)) ! -1) {switch (opt) {case s:printf(-s: %s\n, optarg);strcpy(ctx.serverip, optarg);break;case p:printf(-p: %s\n, optarg);ctx.port atoi(optarg);break;case t:printf(-t: %s\n, optarg);ctx.threadnum atoi(optarg);break;case c:printf(-c: %s\n, optarg);ctx.connection atoi(optarg);break;case n:printf(-n: %s\n, optarg);ctx.requestion atoi(optarg);break;default:return -1;}}pthread_t *ptid malloc(ctx.threadnum * sizeof(pthread_t));int i 0;struct timeval tv_begin;gettimeofday(tv_begin, NULL);for (i 0;i ctx.threadnum;i ) {pthread_create(ptid[i], NULL, test_qps_entry, ctx);}for (i 0;i ctx.threadnum;i ) {pthread_join(ptid[i], NULL);}struct timeval tv_end;gettimeofday(tv_end, NULL);int time_used TIME_SUB_MS(tv_end, tv_begin);printf(success: %d, failed: %d, time_used: %d, qps: %d\n, ctx.requestion-ctx.failed, ctx.failed, time_used, ctx.requestion * 1000 / time_used);free(ptid);return ret;
}
简单介绍一下客户端的代码就是将一段数据写入到缓冲区当中然后发送给服务端我们在这儿的逻辑就是通过启动多个客户端发送请求然后对应的服务端进行处理看其处理时间在这儿跟之前的 epoll 进行对比查看两者之间的性能差距。
之前 epoll 的代码如下
#include stdio.h
#include sys/types.h
#include sys/socket.h
#include sys/epoll.h
#include netinet/in.h
#include errno.h
#include string.h
#include unistd.h
#include sys/time.h#define BUFFER_LENGTH 1024
#define CONNECTION_SIZE 1024 * 1024
#define MAX_PORTS 1
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 (tv1.tv_usec - tv2.tv_usec) / 1000)typedef int (*CALLBACK)(int fd);int accept_cb(int fd);
int recv_cb(int fd);
int send_cb(int fd);
struct timeval begin;int epfd 3;struct conn
{// 负责IO的文件描述符int fd;// 接收缓冲区的bufferchar rbuffer[BUFFER_LENGTH];int rlength;// 发送缓冲区的bufferchar wbuffer[BUFFER_LENGTH];int wlength;// 三个对应的回调函数CALLBACK send_callback;union{CALLBACK accept_callback;CALLBACK recv_callback;} r_action;
};struct conn con_list[CONNECTION_SIZE] {0};void send_event(int fd, int event, int flag)
{if (flag){struct epoll_event ev;ev.data.fd fd;ev.events event;epoll_ctl(epfd, EPOLL_CTL_ADD, fd, ev);}else{struct epoll_event ev;ev.data.fd fd;ev.events event;epoll_ctl(epfd, EPOLL_CTL_MOD, fd, ev);}
}int event_register(int fd, int event)
{if (fd 0){return -1;}con_list[fd].fd fd;con_list[fd].r_action.recv_callback recv_cb;con_list[fd].send_callback send_cb;memset(con_list[fd].rbuffer, 0, BUFFER_LENGTH);con_list[fd].rlength 0;memset(con_list[fd].wbuffer, 0, BUFFER_LENGTH);con_list[fd].wlength 0;send_event(fd, event, 1);
}int accept_cb(int fd)
{struct sockaddr_in clientaddr;socklen_t len sizeof(clientaddr);int clientfd accept(fd, (struct sockaddr *)clientaddr, len);if (clientfd 0){printf(accept failed !!!\n);}else{// printf(accept finished: %d\n, clientfd);}event_register(clientfd, EPOLLIN);if ((clientfd % 1000) 0){struct timeval current;gettimeofday(current, NULL);int time_used TIME_SUB_MS(current, begin);memcpy(begin, current, sizeof(struct timeval));printf(accept finshed: %d, time_used: %d\n, clientfd, time_used);}return 0;
}int recv_cb(int fd)
{// memset(con_list[fd].rbuffer, 0, BUFFER_LENGTH);int count recv(fd, con_list[fd].rbuffer, BUFFER_LENGTH, 0);if (count 0){printf(client disconnect: %d\n, fd);epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);close(fd);return 0;}else if (count 0){printf(count: %d, errno: %d, %s\n, count, errno, strerror(errno));epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);close(fd);return 0;}con_list[fd].rlength count;// printf(recv succ: %s\n, con_list[fd].rbuffer);#if 1con_list[fd].wlength con_list[fd].rlength;memcpy(con_list[fd].wbuffer, con_list[fd].rbuffer, con_list[fd].rlength);
#endifsend_event(fd, EPOLLOUT, 0);return count;
}int send_cb(int fd)
{int count send(fd, con_list[fd].wbuffer, BUFFER_LENGTH, 0);send_event(fd, EPOLLIN, 0);return count;
}int init_server(unsigned short port)
{// 创建套接字int socketfd socket(AF_INET, SOCK_STREAM, 0);printf(socketfd: %d\n, socketfd);struct sockaddr_in serveraddr;serveraddr.sin_family AF_INET;serveraddr.sin_addr.s_addr htonl(INADDR_ANY); // 0.0.0.0serveraddr.sin_port htons(port); // 0 ~ 1023// 绑定套接字int ret bind(socketfd, (struct sockaddr *)serveraddr, sizeof(struct sockaddr));if (ret -1){printf(bind failed: %s\n, strerror(errno));}// 监听套接字listen(socketfd, 10);return socketfd;
}int main()
{unsigned short port 2000;int epfd epoll_create(1);printf(epfd: %d\n, epfd);int i 0;for (i 0; i MAX_PORTS; i){int sockfd init_server(port i);// printf(socket fd: %d\n, sockfd);con_list[sockfd].fd sockfd;con_list[sockfd].r_action.recv_callback accept_cb;send_event(sockfd, EPOLLIN, 1);}gettimeofday(begin, NULL);while (1){struct epoll_event events[1024] {0};// 将就绪事件放入到就绪队里当中int nready epoll_wait(epfd, events, 1024, -1);for (int i 0; i nready; i){int connfd events[i].data.fd;#if 0if((events[i].events EPOLLIN)){con_list[i].r_action.recv_callback(connfd);}else if ((events[i].events EPOLLOUT)){con_list[i].send_callback(connfd);}
#elseif ((events[i].events EPOLLIN)){con_list[connfd].r_action.recv_callback(connfd);}if ((events[i].events EPOLLOUT)){con_list[connfd].send_callback(connfd);}
#endif}}return 0;
}
测试的两者均创建 100 个线程发起一百万个请求查看其处理时间首先来看 io_uring 的测试结果花费时间为 16 ms左右
接下来再来看 epoll 的处理性能 两者的差距在 2ms 左右其实对比下来 io_uring 还是有一个提升的对于更多的连接的时候io_uring 的效率还是会优于 epoll 的。