在前面的示例libaio_test.c和native_aio_test.c中,可以看到对磁盘aio请求(本文的aio都指此类)的使用有阻塞等待,这明显之处为对io_getevents()函数(当然,其它函数,比如io_submit()也有一定程度的阻塞)的调用,它会等待并获取已完成的io请求,如果当前没有或少于指定数目的io请求完成,那么就会等待直到timeout。
io_getevents()函数的等待会导致整个进程的阻塞使得程序无法继续向下执行,如果程序还有其它阻塞点,那么有必要想办法把这多处等待合而为一同时进行,从而提高并行性,也就是通常所说的select/epoll等这类多路复用技术。
本文就以epoll为例,介绍一下在linux下,如何把aio结合并应用到epoll机制里。我们知道,epoll机制的最大好处就是它能够在同一时刻对多个文件描述符(通常是由众多套接字形成的描述符集合)进行监听,并将其上发生的读/写(或错误等)事件通知给应用程序,也就是做到时间上的复用。如果能够把aio也放到epoll机制里,即把aio当作epoll机制里的“一路io”,那么就能使得aio与其它可能的等待操作(比如:读/写套接字)共同工作,从而达到时间复用的目的。
作为epoll机制里的“一路io”,需要一个文件描述符来反馈对应的发生事件,而对于纯aio而言,是没有文件描述符作为代表的,因此linux系统上多出了一个eventfd()的系统调用:
#include <sys/eventfd.h> int eventfd(unsigned int initval, int flags); |
当然,这个系统调用是否就是因此原因才出现,我不得而知(也没去细查),但要把aio应用到epoll机制里,的确少不了它。从man手册http://man7.org/linux/man-pages/man2/eventfd.2.html可以看到,eventfd()函数的作用是提供一种让内核通知应用程序有事件发生的机制。根据给定参数的不同,对eventfd进行read()的语义也有所不同,看本文aio应用的场景情况:
int efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); |
对该描述符efd进行read(),如果读取成功,那么将返回8-byte的整型数据,而该数据也就是表示已经完成的aio请求个数。
充当中间桥梁的eventfd有了,并且eventfd()函数返回的描述符可以添加到epoll机制内,因此剩下需要做的就是把eventfd与aio联系起来,而目前aio当然已经有了这个支持,不过,由于native aio的相关结构体有两套封装,即一种是libaio的封装,一种是内核的直接封装(便于直接使用aio),比如iocb:
libaio的封装(来之:/usr/include/libaio.h):
struct io_iocb_common { PADDEDptr( void *buf, __pad1); PADDEDul(nbytes, __pad2); long long offset; long long __pad3; unsigned flags; unsigned resfd; }; /* result code is the amount read or -'ve errno */ struct io_iocb_vector { const struct iovec *vec; int nr; long long offset; }; /* result code is the amount read or -'ve errno */ struct iocb { PADDEDptr( void *data, __pad1); /* Return in the io completion event */ PADDED(unsigned key, __pad2); /* For use in identifying io requests */ short aio_lio_opcode; short aio_reqprio; int aio_fildes; union { struct io_iocb_common c; struct io_iocb_vector v; struct io_iocb_poll poll; struct io_iocb_sockaddr saddr; } u; }; |
内核的封装(来之:/usr/include/linux/aio_abi.h或/usr/src/linux-2.6.38.8/include/linux/aio_abi.h):
/* * we always use a 64bit off_t when communicating * with userland. its up to libraries to do the * proper padding and aio_error abstraction */ struct iocb { /* these are internal to the kernel/libc. */ __u64 aio_data; /* data to be returned in event's data */ __u32 PADDED(aio_key, aio_reserved1); /* the kernel sets aio_key to the req # */ /* common fields */ __u16 aio_lio_opcode; /* see IOCB_CMD_ above */ __s16 aio_reqprio; __u32 aio_fildes; __u64 aio_buf; __u64 aio_nbytes; __s64 aio_offset; /* extra parameters */ __u64 aio_reserved2; /* TODO: use this for a (struct sigevent *) */ /* flags for the "struct iocb" */ __u32 aio_flags; /* * if the IOCB_FLAG_RESFD flag of "aio_flags" is set, this is an * eventfd to signal <span class="wp_keywordlink_affiliate"><a href="http://www.lenky.info/archives/tag/aio" title="View all posts in AIO">AIO</a></span> readiness to */ __u32 aio_resfd; }; /* 64 bytes */ |
两个结构体是等价的,只是字段名称有所不同而已,此处仅看内核封装的情况(后续将提到nginx对aio的使用实现,而nginx是采用的就是syscall手动封装),有一段很明显的英文注释出卖了aio对eventfd的使用支持,即两个字段:aio_flags与aio_resfd,详细来说就是将aio_flags打上IOCB_FLAG_RESFD标记并且将eventfd()函数返回的描述符设置到aio_resfd即可。
废话少说,看两个示例,第一个来之:http://blog.sina.com.cn/s/blog_6b19f21d0100znza.html
#define _GNU_SOURCE #define __STDC_FORMAT_MACROS #include <stdio.h> #include <errno.h> #include <libaio.h> #include <sys/eventfd.h> #include <sys/epoll.h> #include <stdlib.h> #include <sys/types.h> #include <unistd.h> #include <stdint.h> #include <sys/stat.h> #include <fcntl.h> #include <inttypes.h> #define TEST_FILE "aio_test_file" #define TEST_FILE_SIZE (127 * 1024) #define NUM_EVENTS 128 #define ALIGN_SIZE 512 #define RD_WR_SIZE 1024 struct custom_iocb { struct iocb iocb; int nth_request; }; void aio_callback(io_context_t ctx, struct iocb *iocb, long res, long res2) { struct custom_iocb *iocbp = ( struct custom_iocb *)iocb; printf ( "nth_request: %d, request_type: %s, offset: %lld, length: %lu, res: %ld, res2: %ldn" , iocbp->nth_request, (iocb->aio_lio_opcode == IO_CMD_PREAD) ? "READ" : "WRITE" , iocb->u.c.offset, iocb->u.c.nbytes, res, res2); } int main( int argc, char *argv[]) { int efd, fd, epfd; io_context_t ctx; struct timespec tms; struct io_event events[NUM_EVENTS]; struct custom_iocb iocbs[NUM_EVENTS]; struct iocb *iocbps[NUM_EVENTS]; struct custom_iocb *iocbp; int i, j, r; void *buf; struct epoll_event epevent; efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); if (efd == -1) { perror ( "eventfd" ); return 2; } fd = open(TEST_FILE, O_RDWR | O_CREAT | O_DIRECT, 0644); if (fd == -1) { perror ( "open" ); return 3; } ftruncate(fd, TEST_FILE_SIZE); ctx = 0; if (io_setup(8192, &ctx)) { perror ( "io_setup" ); return 4; } if (posix_memalign(&buf, ALIGN_SIZE, RD_WR_SIZE)) { perror ( "posix_memalign" ); return 5; } printf ( "buf: %pn" , buf); for (i = 0, iocbp = iocbs; i < NUM_EVENTS; ++i, ++iocbp) { iocbps[i] = &iocbp->iocb; io_prep_pread(&iocbp->iocb, fd, buf, RD_WR_SIZE, i * RD_WR_SIZE); io_set_eventfd(&iocbp->iocb, efd); io_set_callback(&iocbp->iocb, aio_callback); iocbp->nth_request = i + 1; } if (io_submit(ctx, NUM_EVENTS, iocbps) != NUM_EVENTS) { perror ( "io_submit" ); return 6; } epfd = epoll_create(1); if (epfd == -1) { perror ( "epoll_create" ); return 7; } epevent.events = EPOLLIN | EPOLLET; epevent.data.ptr = NULL; if (epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &epevent)) { perror ( "epoll_ctl" ); return 8; } i = 0; while (i < NUM_EVENTS) { uint64_t finished_aio; if (epoll_wait(epfd, &epevent, 1, -1) != 1) { perror ( "epoll_wait" ); return 9; } if (read(efd, &finished_aio, sizeof (finished_aio)) != sizeof (finished_aio)) { perror ( "read" ); return 10; } printf ( "finished io number: %" PRIu64 "n" , finished_aio); while (finished_aio > 0) { tms.tv_sec = 0; tms.tv_nsec = 0; r = io_getevents(ctx, 1, NUM_EVENTS, events, &tms); if (r > 0) { for (j = 0; j < r; ++j) { ((io_callback_t)(events[j].data))(ctx, events[j].obj, events[j].res, events[j].res2); } i += r; finished_aio -= r; } } } close(epfd); free (buf); io_destroy(ctx); close(fd); close(efd); remove (TEST_FILE); return 0; } |
编译执行,OK无误(特别注意:上面示例代码仅只是演示aio+eventfd+epoll的使用,而细节部分是有严重bug的,比如所有请求共用一个缓存区buf):
[root@www 1]# gcc t.c -laio [root@www 1]# ./a.out |
上面示例采用了libaio库,试试syscall简单封装(由上面示例修改而来):
/** * gcc aio_eventfd_epoll.c -o aio_eventfd_epoll * modified by: http://lenky.info/ */ #define _GNU_SOURCE #define __STDC_FORMAT_MACROS #include <sys/epoll.h> #include <stdio.h> /* for perror() */ #include <unistd.h> /* for syscall() */ #include <sys/syscall.h> /* for __NR_* definitions */ #include <linux/aio_abi.h> /* for AIO types and constants */ #include <fcntl.h> /* O_RDWR */ #include <string.h> /* memset() */ #include <inttypes.h> /* uint64_t */ #include <stdlib.h> #define TEST_FILE "aio_test_file" #define TEST_FILE_SIZE (128 * 1024) #define NUM_EVENTS 128 #define ALIGN_SIZE 512 #define RD_WR_SIZE 1024 inline int io_setup(unsigned nr, aio_context_t *ctxp) { return syscall(__NR_io_setup, nr, ctxp); } inline int io_submit(aio_context_t ctx, long nr, struct iocb **iocbpp) { return syscall(__NR_io_submit, ctx, nr, iocbpp); } inline int io_getevents(aio_context_t ctx, long min_nr, long max_nr, struct io_event *events, struct timespec *timeout) { return syscall(__NR_io_getevents, ctx, min_nr, max_nr, events, timeout); } inline int io_destroy(aio_context_t ctx) { return syscall(__NR_io_destroy, ctx); } inline int eventfd2(unsigned int initval, int flags) { return syscall(__NR_eventfd2, initval, flags); } struct custom_iocb { struct iocb iocb; int nth_request; }; typedef void io_callback_t(aio_context_t ctx, struct iocb *iocb, long res, long res2); void aio_callback(aio_context_t ctx, struct iocb *iocb, long res, long res2) { struct custom_iocb *iocbp = ( struct custom_iocb *)iocb; printf ( "nth_request: %d, request_type: %s, offset: %lld, length: %lu, res: %ld, res2: %ldn" , iocbp->nth_request, (iocb->aio_lio_opcode == IOCB_CMD_PREAD) ? "READ" : "WRITE" , iocb->aio_offset, iocb->aio_nbytes, res, res2); } int main( int argc, char *argv[]) { int efd, fd, epfd; aio_context_t ctx; struct timespec tms; struct io_event events[NUM_EVENTS]; struct custom_iocb iocbs[NUM_EVENTS]; struct iocb *iocbps[NUM_EVENTS]; struct custom_iocb *iocbp; int i, j, r; void *buf; void *aio_buf; struct epoll_event epevent; efd = eventfd2(0, O_NONBLOCK | O_CLOEXEC); if (efd == -1) { perror ( "eventfd2" ); return 2; } fd = open(TEST_FILE, O_RDWR | O_CREAT | O_DIRECT, 0644); if (fd == -1) { perror ( "open" ); return 3; } ftruncate(fd, TEST_FILE_SIZE); ctx = 0; if (io_setup(NUM_EVENTS, &ctx)) { perror ( "io_setup" ); return 4; } if (posix_memalign(&buf, ALIGN_SIZE, RD_WR_SIZE * NUM_EVENTS)) { perror ( "posix_memalign" ); return 5; } printf ( "buf: %pn" , buf); for (i = 0, iocbp = iocbs; i < NUM_EVENTS; ++i, ++iocbp) { aio_buf = ( void *)(( char *)buf + (i*RD_WR_SIZE)); memset (aio_buf, 0, RD_WR_SIZE); //io_prep_pread(&iocbp->iocb, fd, buf, RD_WR_SIZE, i * RD_WR_SIZE); iocbp->iocb.aio_fildes = fd; iocbp->iocb.aio_lio_opcode = IOCB_CMD_PREAD; iocbp->iocb.aio_buf = (uint64_t)aio_buf; iocbp->iocb.aio_offset = i * RD_WR_SIZE; iocbp->iocb.aio_nbytes = RD_WR_SIZE; //io_set_eventfd(&iocbp->iocb, efd); iocbp->iocb.aio_flags = IOCB_FLAG_RESFD; iocbp->iocb.aio_resfd = efd; //io_set_callback(&iocbp->iocb, aio_callback); iocbp->iocb.aio_data = (__u64)aio_callback; iocbp->nth_request = i + 1; iocbps[i] = &iocbp->iocb; } if (io_submit(ctx, NUM_EVENTS, iocbps) != NUM_EVENTS) { perror ( "io_submit" ); return 6; } epfd = epoll_create(1); if (epfd == -1) { perror ( "epoll_create" ); return 7; } epevent.events = EPOLLIN | EPOLLET; epevent.data.ptr = NULL; if (epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &epevent)) { perror ( "epoll_ctl" ); return 8; } i = 0; while (i < NUM_EVENTS) { uint64_t finished_aio; if (epoll_wait(epfd, &epevent, 1, -1) != 1) { perror ( "epoll_wait" ); return 9; } if (read(efd, &finished_aio, sizeof (finished_aio)) != sizeof (finished_aio)) { perror ( "read" ); return 10; } printf ( "finished io number: %" PRIu64 "n" , finished_aio); while (finished_aio > 0) { tms.tv_sec = 0; tms.tv_nsec = 0; r = io_getevents(ctx, 1, NUM_EVENTS, events, &tms); if (r > 0) { for (j = 0; j < r; ++j) { ((io_callback_t *)(events[j].data))(ctx, ( struct iocb *)events[j].obj, events[j].res, events[j].res2); } i += r; finished_aio -= r; } } } close(epfd); free (buf); io_destroy(ctx); close(fd); close(efd); remove (TEST_FILE); return 0; } |