网站1g租用价格,玖久建筑网,网络规划与设计第二版,公司网站建设找谁io_uring
1、概述
io_uring是Linux#xff08;内核版本在5.1以后#xff09;在2019年加入到内核中的一种新型的异步I/O模型#xff1b;
io_uring使用共享内存#xff0c;解决高IOPS场景中的用户态和内核态的切换过程#xff0c;减少系统调用#xff1b;用户可以直接向…io_uring
1、概述
io_uring是Linux内核版本在5.1以后在2019年加入到内核中的一种新型的异步I/O模型
io_uring使用共享内存解决高IOPS场景中的用户态和内核态的切换过程减少系统调用用户可以直接向共享内存提交要发起的I/O操作内核线程可以直接获取共享内存中的I/O操作并进行相应的读写操作io_uring是一种proactor模式的网络架构 Reactor 是非阻塞同步网络模式感知的是就绪可读写事件。在每次感知到有事件发生比如可读就绪事件后就需要应用进程主动调用 read 方法来完成数据的读取也就是要应用进程主动将 socket 接收缓存中的数据读到应用进程内存中这个过程是同步的读取完数据后应用进程才能处理数据。 Proactor 是异步网络模式 感知的是已完成的读写事件。在发起异步读写请求时需要传入数据缓冲区的地址用来存放结果数据等信息这样系统内核才可以自动帮我们把数据的读写工作完成这里的读写工作全程由操作系统来做并不需要像 Reactor 那样还需要应用进程主动发起 read/write 来读写数据操作系统完成读写工作后就会通知应用进程直接处理数据。
优点 避免了提交I/O事件和完成事件中存在的内存拷贝使用共享内存 减少的了I/O任务提交和完成事件任务是的系统调用过程 采取无锁队列减少了锁资源的竞争
主要内存结构
提交队列Submission QueueSQ连续的内存空间环形队列存放将要执行的I/O操作数据完成队列Completion Queue CQ连续的内存空间环形队列存放执行完成I/O操作后的返回结果提交队列项数组提Submission Queue EntrySQE方便通过环形缓冲区提交内存请求
2、主要接口
io_uring提供三个用户态的系统调用接口
io_uring_setup初始化一个新的io_uring对象一个SQ和一个CQ通过使用共享内存进行数据操作io_uring_register注册用于异步I/O的文件或用户缓冲区buffersio_uring_enter提交I/O任务等待I/O完成 SQ和CQ保存的都是SQEs数据的索引不是真正的请求真实是请求保存在SQE数组中在提交请求时可以批量提交一组SQE数值上不连续的请求
SQ、CQ、SQE中的内存区域都是有内核进行分配的用户初始化会返回对应的fd通过fd进行mmap和内核共享内存空间
3、第三方库
liburing通过对io_uring进行分装提供了一个简单的API通过一下命令可以安装该动态库
git clone https://github.com/axboe/liburing.git
cd liburing
./configure
make
sudo make install
sudo ldconfig #更新动态库连接缓存4、主要使用流程
1. io_uring初始化
io_uring通过io_uring_setup函数初始化在liburing库中通过io_uring_queue_init_params函数进行初始化创建sumbmit队列和complete队列以及SQE内存数组
//io_uring实现异步的方式
struct io_uring_params pragma;
memset(pragma, 0, sizeof(pragma));
struct io_uring ring;
// 初始化io_uring 创建submit队列和complite队列
io_uring_queue_init_params(1024, ring, pragma);2. io_uring 提交注册到SQ环形队列
io_uring通过io_uring_register函数提交注册到用于异步I/O的缓冲区中在liburing中通过io_uring_prep_accept函数对io_uring_refister进行封装使用
// 获取ringbuffer的头
struct io_uring_sqe *sqe io_uring_get_sqe(ring);
connect_info_t accept_info {sockfd, EVENT_ACCEPT};
// 注册一个I/O事件
io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)clientaddr, addrlen, flags);
memcpy(sqe-user_data, accept_info, sizeof(connect_info_t));3. io_uring_enter 提交I/O
io_uring中通过io_uring_enter函数来提交I/O并等待事件的完成在liburing中通过io_uring_submit来提交SQE的读写请求io_uring_wait_cqe来等待I/O的处理结果io_uring_peek_batch_cqe来获取CQ中的处理结果 // 提交worker中执行
io_uring_submit(ring);
struct io_uring_cqe *cqe;
//等待complete队列中的结果
io_uring_wait_cqe(ring, cqe);
struct io_uring_cqe *cqes[128];
// 获取CQ环形队列中的处理结果
int count io_uring_peek_batch_cqe(ring, cqes, 128);5、实现
io_uring_server.c
#include liburing.h
#include stdio.h
#include stdlib.h
#include unistd.h
#include fcntl.h
#include string.h
#include netinet/in.henum event_type {EVENT_ACCEPT,EVENT_READ,EVENT_WRITE
};typedef struct connect_info{int conn_fd;int event;
}connect_info_t;struct conn_info {int fd;int event;
};int init_server(unsigned short port)
{ int sockfd socket(AF_INET, SOCK_STREAM, 0);if (sockfd 0) {perror(socket);return -1;}struct sockaddr_in serveraddr;;serveraddr.sin_family AF_INET;serveraddr.sin_port htons(port);serveraddr.sin_addr.s_addr htonl(INADDR_ANY);if (bind(sockfd, (struct sockaddr *)serveraddr, sizeof(serveraddr)) 0) {perror(bind error);return -1;}int opt 1;if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, opt, sizeof(opt)) 0) {perror(setsockopt);return -1;}listen(sockfd, 10);return sockfd;
}int set_event_recv(struct io_uring *ring, int sockfd, void *buf, int len, int flags)
{struct io_uring_sqe *sqe io_uring_get_sqe(ring);connect_info_t accept_info {sockfd, EVENT_READ};io_uring_prep_recv(sqe, sockfd, buf, len, flags);memcpy(sqe-user_data, accept_info, sizeof(connect_info_t));printf(set event recv----\n);return 0;
}int set_event_send(struct io_uring *ring, int sockfd, const void *buf, int len, int flags)
{struct io_uring_sqe *sqe io_uring_get_sqe(ring);connect_info_t accept_info {sockfd, EVENT_WRITE};io_uring_prep_send(sqe, sockfd, buf, len, flags);memcpy(sqe-user_data, accept_info, sizeof(connect_info_t));printf(set event send----\n);return 0;
}int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *clientaddr,socklen_t *addrlen, int flags) {// 获取sqestruct io_uring_sqe *sqe io_uring_get_sqe(ring);// 初始化accept_infoconnect_info_t accept_info {sockfd, EVENT_ACCEPT};// 准备accept操作io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)clientaddr, addrlen, flags);// 设置用户数据memcpy(sqe-user_data, accept_info, sizeof(connect_info_t));printf(set event accept\n);return 0;
}int main(int argc, char *argv[])
{// 初始化服务器unsigned short port 9999;// 初始化服务器int socketfd init_server(port);if (socketfd 0)return -1;//io_uring实现异步的方式struct io_uring_params pragma;// 初始化io_uring 创建submit队列和complite队列memset(pragma, 0, sizeof(pragma));struct io_uring ring;io_uring_queue_init_params(1024, ring, pragma);struct sockaddr_in clientaddr;socklen_t addrlen sizeof(struct sockaddr);// 提交到submit队列中set_event_accept(ring, socketfd, (struct sockaddr*)clientaddr, addrlen, 0);char buffer[1024] {0};while (1){// 提交worker中执行io_uring_submit(ring);printf(complete\n);struct io_uring_cqe *cqe;//等待complete队列中的结果io_uring_wait_cqe(ring, cqe);printf(complete end\n);struct io_uring_cqe *cqes[128];int count io_uring_peek_batch_cqe(ring, cqes, 128);for (int i 0; i count; i){struct io_uring_cqe *entries cqes[i];connect_info_t result;//struct conn_info result;memcpy(result, entries-user_data, sizeof(connect_info_t));if (result.event EVENT_ACCEPT) {// 设置读事件set_event_accept(ring, socketfd, (struct sockaddr*)clientaddr, addrlen, 0);printf(accept success\n);int conn_fd entries-res;printf(conn_fd %d res %d\n, conn_fd, entries-res);// 设置读事件set_event_recv(ring, conn_fd, buffer, 1024,0);}else if (result.event EVENT_READ){int ret entries-res;printf(set_event_recv ret: %d, %s\n, ret, buffer);if (ret 0){close(result.conn_fd);continue;}else if (ret 0){// 设置写事件set_event_send(ring, result.conn_fd, buffer, ret,0);}printf(read success\n);}else if (result.event EVENT_WRITE){int ret entries-res;set_event_recv(ring, result.conn_fd, buffer, 1024,0);printf(write success\n);}}io_uring_cq_advance(ring, count);}return 0;
}io_uring_test.c
#include stdio.h
#include stdlib.h
#include string.h
#include unistd.h
#include pthread.h
#include sys/time.h#include sys/socket.h
#include arpa/inet.h#define TIMESUB_MS(tv1, tv2) (((tv2).tv_sec - (tv1).tv_sec) * 1000 ((tv2).tv_usec - (tv1).tv_usec) / 1000)
#define TEST_MESSAGE ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890abcdefghijklmnopqrstuvwxyz\r\n
#define RBUFFER_LENGTH 2048
#define WBUFFER_LENGTH 2048typedef struct test_conttext
{char server_ip[16];int server_port;int thread_num;int connection_num;int request_num;int fail_num;
} test_conttext_t;int send_recv_tcp(int sockfd)
{char wbuffer[WBUFFER_LENGTH];char rbuffer[RBUFFER_LENGTH];memset(wbuffer, 0, sizeof(wbuffer));memset(rbuffer, 0, sizeof(rbuffer));for (int i 0; i 8; i){strcpy(wbuffer i * strlen(TEST_MESSAGE), TEST_MESSAGE);}int res send(sockfd, wbuffer, strlen(wbuffer), 0);if (res 0){return -1;}res recv(sockfd, rbuffer, sizeof(rbuffer), 0);if (res 0){return -1;}if (strcmp(rbuffer, wbuffer) ! 0){printf(failed: %s ! %s\n, rbuffer, wbuffer);return -1;}return 0;
}int connect_tcpserver(char *ip, int port)
{int sockfd socket(AF_INET, SOCK_STREAM, 0);if (sockfd 0){perror(socket);return -1;}struct sockaddr_in server_addr;server_addr.sin_family AF_INET;server_addr.sin_port htons(port);server_addr.sin_addr.s_addr inet_addr(ip);if (connect(sockfd, (struct sockaddr *)server_addr, sizeof(server_addr)) 0){perror(connect);close(sockfd);return -1;}return sockfd;
}static void *test_qps(void *arg)
{test_conttext_t *ctx (test_conttext_t *)arg;int sockfd connect_tcpserver(ctx-server_ip, ctx-server_port);if (sockfd 0){printf(connect server failed\n);return NULL;}int conut ctx-request_num / ctx-connection_num;int indx 0;int res;while (indx conut){res send_recv_tcp(sockfd);if (res 0){printf(send_recv_tcp failed\n);ctx-fail_num;continue;}}return NULL;
}int main(int argc, char *argv[])
{int i;printf(----%d\n, argc);// for (i 1; i argc; i)// printf(%s\n, argv[i]);test_conttext_t ctx {0};int opt;while ((opt getopt(argc, argv, s:p:t:c:n:)) ! -1){switch (opt){case s:strcpy(ctx.server_ip, optarg);printf(-s: %s\n, optarg);break;case p:ctx.server_port atoi(optarg);printf(-p: %s\n, optarg);break;case t:ctx.thread_num atoi(optarg);printf(-t: %s\n, optarg);break;case c:ctx.connection_num atoi(optarg);printf(-c: %s\n, optarg);break;case n:ctx.request_num atoi(optarg);printf(-n: %s\n, optarg);break;default:return EXIT_FAILURE;}}pthread_t *threads (pthread_t *)malloc(sizeof(pthread_t) * ctx.thread_num);struct timeval start, end;gettimeofday(start, NULL);for (i 0; i ctx.thread_num; i){printf(thread %d pthread_create\n, i);pthread_create(threads[i], NULL, test_qps, ctx);}for (i 0; i ctx.thread_num; i){pthread_join(threads[i], NULL);printf(thread %d finished\n, i);}gettimeofday(end, NULL);int time_used TIMESUB_MS(start, end);printf(success :%d, failed:%d, time used: %d , qps %d\n, ctx.request_num-ctx.fail_num, ctx.fail_num, time_used, ctx.request_num * 1000 / time_used);free(threads);return EXIT_SUCCESS;
}