最新消息:

Linux native AIO与eventfd、epoll的结合使用

epoll admin 4261浏览 0评论

在前面的示例libaio_test.c和native_aio_test.c中,可以看到对磁盘aio请求(本文的aio都指此类)的使用有阻塞等待,这明显之处为对io_getevents()函数(当然,其它函数,比如io_submit()也有一定程度的阻塞)的调用,它会等待并获取已完成的io请求,如果当前没有或少于指定数目的io请求完成,那么就会等待直到timeout。

io_getevents()函数的等待会导致整个进程的阻塞使得程序无法继续向下执行,如果程序还有其它阻塞点,那么有必要想办法把这多处等待合而为一同时进行,从而提高并行性,也就是通常所说的select/epoll等这类多路复用技术。

本文就以epoll为例,介绍一下在linux下,如何把aio结合并应用到epoll机制里。我们知道,epoll机制的最大好处就是它能够在同一时刻对多个文件描述符(通常是由众多套接字形成的描述符集合)进行监听,并将其上发生的读/写(或错误等)事件通知给应用程序,也就是做到时间上的复用。如果能够把aio也放到epoll机制里,即把aio当作epoll机制里的“一路io”,那么就能使得aio与其它可能的等待操作(比如:读/写套接字)共同工作,从而达到时间复用的目的。

作为epoll机制里的“一路io”,需要一个文件描述符来反馈对应的发生事件,而对于纯aio而言,是没有文件描述符作为代表的,因此linux系统上多出了一个eventfd()的系统调用:

#include <sys/eventfd.h>
int eventfd(unsigned int initval, int flags);

当然,这个系统调用是否就是因此原因才出现,我不得而知(也没去细查),但要把aio应用到epoll机制里,的确少不了它。从man手册http://man7.org/linux/man-pages/man2/eventfd.2.html可以看到,eventfd()函数的作用是提供一种让内核通知应用程序有事件发生的机制。根据给定参数的不同,对eventfd进行read()的语义也有所不同,看本文aio应用的场景情况:

int efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);

对该描述符efd进行read(),如果读取成功,那么将返回8-byte的整型数据,而该数据也就是表示已经完成的aio请求个数。

充当中间桥梁的eventfd有了,并且eventfd()函数返回的描述符可以添加到epoll机制内,因此剩下需要做的就是把eventfd与aio联系起来,而目前aio当然已经有了这个支持,不过,由于native aio的相关结构体有两套封装,即一种是libaio的封装,一种是内核的直接封装(便于直接使用aio),比如iocb:
libaio的封装(来之:/usr/include/libaio.h):

struct io_iocb_common {
    PADDEDptr(void  *buf, __pad1);
    PADDEDul(nbytes, __pad2);
    long long   offset;
    long long   __pad3;
    unsigned    flags;
    unsigned    resfd;
};  /* result code is the amount read or -'ve errno */
struct io_iocb_vector {
    const struct iovec  *vec;
    int         nr;
    long long       offset;
};  /* result code is the amount read or -'ve errno */
struct iocb {
    PADDEDptr(void *data, __pad1);  /* Return in the io completion event */
    PADDED(unsigned key, __pad2);   /* For use in identifying io requests */
    short       aio_lio_opcode;
    short       aio_reqprio;
    int     aio_fildes;
    union {
        struct io_iocb_common       c;
        struct io_iocb_vector       v;
        struct io_iocb_poll     poll;
        struct io_iocb_sockaddr saddr;
    } u;
};

内核的封装(来之:/usr/include/linux/aio_abi.h或/usr/src/linux-2.6.38.8/include/linux/aio_abi.h):

/*
 * we always use a 64bit off_t when communicating
 * with userland.  its up to libraries to do the
 * proper padding and aio_error abstraction
 */
struct iocb {
    /* these are internal to the kernel/libc. */
    __u64   aio_data;   /* data to be returned in event's data */
    __u32   PADDED(aio_key, aio_reserved1);
                /* the kernel sets aio_key to the req # */
    /* common fields */
    __u16   aio_lio_opcode; /* see IOCB_CMD_ above */
    __s16   aio_reqprio;
    __u32   aio_fildes;
    __u64   aio_buf;
    __u64   aio_nbytes;
    __s64   aio_offset;
    /* extra parameters */
    __u64   aio_reserved2;  /* TODO: use this for a (struct sigevent *) */
    /* flags for the "struct iocb" */
    __u32   aio_flags;
    /*
     * if the IOCB_FLAG_RESFD flag of "aio_flags" is set, this is an
     * eventfd to signal <span class="wp_keywordlink_affiliate"><a href="http://www.lenky.info/archives/tag/aio" title="View all posts in AIO">AIO</a></span> readiness to
     */
    __u32   aio_resfd;
}; /* 64 bytes */

两个结构体是等价的,只是字段名称有所不同而已,此处仅看内核封装的情况(后续将提到nginx对aio的使用实现,而nginx是采用的就是syscall手动封装),有一段很明显的英文注释出卖了aio对eventfd的使用支持,即两个字段:aio_flags与aio_resfd,详细来说就是将aio_flags打上IOCB_FLAG_RESFD标记并且将eventfd()函数返回的描述符设置到aio_resfd即可。

废话少说,看两个示例,第一个来之:http://blog.sina.com.cn/s/blog_6b19f21d0100znza.html

#define _GNU_SOURCE
#define __STDC_FORMAT_MACROS
#include <stdio.h>
#include <errno.h>
#include <libaio.h>
#include <sys/eventfd.h>
#include <sys/epoll.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdint.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <inttypes.h>
#define TEST_FILE   "aio_test_file"
#define TEST_FILE_SIZE  (127 * 1024)
#define NUM_EVENTS  128
#define ALIGN_SIZE  512
#define RD_WR_SIZE  1024
struct custom_iocb
{
    struct iocb iocb;
    int nth_request;
};
void aio_callback(io_context_t ctx, struct iocb *iocb, long res, long res2)
{
    struct custom_iocb *iocbp = (struct custom_iocb *)iocb;
    printf("nth_request: %d, request_type: %s, offset: %lld, length: %lu, res: %ld, res2: %ldn",
            iocbp->nth_request, (iocb->aio_lio_opcode == IO_CMD_PREAD) ? "READ" : "WRITE",
            iocb->u.c.offset, iocb->u.c.nbytes, res, res2);
}
int main(int argc, char *argv[])
{
    int efd, fd, epfd;
    io_context_t ctx;
    struct timespec tms;
    struct io_event events[NUM_EVENTS];
    struct custom_iocb iocbs[NUM_EVENTS];
    struct iocb *iocbps[NUM_EVENTS];
    struct custom_iocb *iocbp;
    int i, j, r;
    void *buf;
    struct epoll_event epevent;
    efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    if (efd == -1) {
        perror("eventfd");
        return 2;
    }
    fd = open(TEST_FILE, O_RDWR | O_CREAT | O_DIRECT, 0644);
    if (fd == -1) {
        perror("open");
        return 3;
    }
    ftruncate(fd, TEST_FILE_SIZE);
    
    ctx = 0;
    if (io_setup(8192, &ctx)) {
        perror("io_setup");
        return 4;
    }
    if (posix_memalign(&buf, ALIGN_SIZE, RD_WR_SIZE)) {
        perror("posix_memalign");
        return 5;
    }
    printf("buf: %pn", buf);
    for (i = 0, iocbp = iocbs; i < NUM_EVENTS; ++i, ++iocbp) {
        iocbps[i] = &iocbp->iocb;
        io_prep_pread(&iocbp->iocb, fd, buf, RD_WR_SIZE, i * RD_WR_SIZE);
        io_set_eventfd(&iocbp->iocb, efd);
        io_set_callback(&iocbp->iocb, aio_callback);
        iocbp->nth_request = i + 1;
    }
    if (io_submit(ctx, NUM_EVENTS, iocbps) != NUM_EVENTS) {
        perror("io_submit");
        return 6;
    }
    epfd = epoll_create(1);
    if (epfd == -1) {
        perror("epoll_create");
        return 7;
    }
    epevent.events = EPOLLIN | EPOLLET;
    epevent.data.ptr = NULL;
    if (epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &epevent)) {
        perror("epoll_ctl");
        return 8;
    }
    i = 0;
    while (i < NUM_EVENTS) {
        uint64_t finished_aio;
        if (epoll_wait(epfd, &epevent, 1, -1) != 1) {
            perror("epoll_wait");
            return 9;
        }
        if (read(efd, &finished_aio, sizeof(finished_aio)) != sizeof(finished_aio)) {
            perror("read");
            return 10;
        }
        printf("finished io number: %"PRIu64"n", finished_aio);
    
        while (finished_aio > 0) {
            tms.tv_sec = 0;
            tms.tv_nsec = 0;
            r = io_getevents(ctx, 1, NUM_EVENTS, events, &tms);
            if (r > 0) {
                for (j = 0; j < r; ++j) {
                    ((io_callback_t)(events[j].data))(ctx, events[j].obj, events[j].res, events[j].res2);
                }
                i += r;
                finished_aio -= r;
            }
        }
    }
    
    close(epfd);
    free(buf);
    io_destroy(ctx);
    close(fd);
    close(efd);
    remove(TEST_FILE);
    return 0;
}

编译执行,OK无误(特别注意:上面示例代码仅只是演示aio+eventfd+epoll的使用,而细节部分是有严重bug的,比如所有请求共用一个缓存区buf):

[root@www 1]# gcc t.c -laio
[root@www 1]# ./a.out

上面示例采用了libaio库,试试syscall简单封装(由上面示例修改而来):

/**
 * gcc aio_eventfd_epoll.c -o aio_eventfd_epoll
 * modified by: http://lenky.info/
 */
#define _GNU_SOURCE
#define __STDC_FORMAT_MACROS
#include <sys/epoll.h>
#include <stdio.h>            /* for perror() */
#include <unistd.h>           /* for syscall() */
#include <sys/syscall.h>      /* for __NR_* definitions */
#include <linux/aio_abi.h>    /* for AIO types and constants */
#include <fcntl.h>            /* O_RDWR */
#include <string.h>           /* memset() */
#include <inttypes.h>         /* uint64_t */
#include <stdlib.h>
#define TEST_FILE   "aio_test_file"
#define TEST_FILE_SIZE  (128 * 1024)
#define NUM_EVENTS  128
#define ALIGN_SIZE  512
#define RD_WR_SIZE  1024
inline int io_setup(unsigned nr, aio_context_t *ctxp)
{
    return syscall(__NR_io_setup, nr, ctxp);
}
inline int io_submit(aio_context_t ctx, long nr,  struct iocb **iocbpp)
{
    return syscall(__NR_io_submit, ctx, nr, iocbpp);
}
inline int io_getevents(aio_context_t ctx, long min_nr, long max_nr,
        struct io_event *events, struct timespec *timeout)
{
    return syscall(__NR_io_getevents, ctx, min_nr, max_nr, events, timeout);
}
inline int io_destroy(aio_context_t ctx)
{
    return syscall(__NR_io_destroy, ctx);
}
inline int eventfd2(unsigned int initval, int flags)
{
    return syscall(__NR_eventfd2, initval, flags);
}
struct custom_iocb
{
    struct iocb iocb;
    int nth_request;
};
typedef void io_callback_t(aio_context_t ctx, struct iocb *iocb, long res, long res2);
void aio_callback(aio_context_t ctx, struct iocb *iocb, long res, long res2)
{
    struct custom_iocb *iocbp = (struct custom_iocb *)iocb;
    printf("nth_request: %d, request_type: %s, offset: %lld, length: %lu, res: %ld, res2: %ldn",
            iocbp->nth_request, (iocb->aio_lio_opcode == IOCB_CMD_PREAD) ? "READ" : "WRITE",
            iocb->aio_offset, iocb->aio_nbytes, res, res2);
}
int main(int argc, char *argv[])
{
    int efd, fd, epfd;
    aio_context_t ctx;
    struct timespec tms;
    struct io_event events[NUM_EVENTS];
    struct custom_iocb iocbs[NUM_EVENTS];
    struct iocb *iocbps[NUM_EVENTS];
    struct custom_iocb *iocbp;
    int i, j, r;
    void *buf;
    void *aio_buf;
    struct epoll_event epevent;
    efd = eventfd2(0, O_NONBLOCK | O_CLOEXEC);
    if (efd == -1) {
        perror("eventfd2");
        return 2;
    }
    fd = open(TEST_FILE, O_RDWR | O_CREAT | O_DIRECT, 0644);
    if (fd == -1) {
        perror("open");
        return 3;
    }
    ftruncate(fd, TEST_FILE_SIZE);
    
    ctx = 0;
    if (io_setup(NUM_EVENTS, &ctx)) {
        perror("io_setup");
        return 4;
    }
    if (posix_memalign(&buf, ALIGN_SIZE, RD_WR_SIZE * NUM_EVENTS)) {
        perror("posix_memalign");
        return 5;
    }
    printf("buf: %pn", buf);
    for (i = 0, iocbp = iocbs; i < NUM_EVENTS; ++i, ++iocbp) {
        aio_buf = (void *)((char *)buf + (i*RD_WR_SIZE));
        memset(aio_buf, 0, RD_WR_SIZE);
        //io_prep_pread(&iocbp->iocb, fd, buf, RD_WR_SIZE, i * RD_WR_SIZE);
        iocbp->iocb.aio_fildes = fd;
        iocbp->iocb.aio_lio_opcode = IOCB_CMD_PREAD;
        iocbp->iocb.aio_buf = (uint64_t)aio_buf;
        iocbp->iocb.aio_offset = i * RD_WR_SIZE;
        iocbp->iocb.aio_nbytes = RD_WR_SIZE;
        //io_set_eventfd(&iocbp->iocb, efd);
        iocbp->iocb.aio_flags = IOCB_FLAG_RESFD;
        iocbp->iocb.aio_resfd = efd;
        
        //io_set_callback(&iocbp->iocb, aio_callback);
        iocbp->iocb.aio_data = (__u64)aio_callback;
        iocbp->nth_request = i + 1;
        iocbps[i] = &iocbp->iocb;
    }
    if (io_submit(ctx, NUM_EVENTS, iocbps) != NUM_EVENTS) {
        perror("io_submit");
        return 6;
    }
    epfd = epoll_create(1);
    if (epfd == -1) {
        perror("epoll_create");
        return 7;
    }
    epevent.events = EPOLLIN | EPOLLET;
    epevent.data.ptr = NULL;
    if (epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &epevent)) {
        perror("epoll_ctl");
        return 8;
    }
    i = 0;
    while (i < NUM_EVENTS) {
        uint64_t finished_aio;
        if (epoll_wait(epfd, &epevent, 1, -1) != 1) {
            perror("epoll_wait");
            return 9;
        }
        if (read(efd, &finished_aio, sizeof(finished_aio)) != sizeof(finished_aio)) {
            perror("read");
            return 10;
        }
        printf("finished io number: %"PRIu64"n", finished_aio);
    
        while (finished_aio > 0) {
            tms.tv_sec = 0;
            tms.tv_nsec = 0;
            r = io_getevents(ctx, 1, NUM_EVENTS, events, &tms);
            if (r > 0) {
                for (j = 0; j < r; ++j) {
                    ((io_callback_t *)(events[j].data))(ctx, (struct iocb *)events[j].obj, events[j].res, events[j].res2);
                }
                i += r;
                finished_aio -= r;
            }
        }
    }
    
    close(epfd);
    free(buf);
    io_destroy(ctx);
    close(fd);
    close(efd);
    remove(TEST_FILE);
    return 0;
}

 

转载请注明:爱开源 » Linux native AIO与eventfd、epoll的结合使用

您必须 登录 才能发表评论!