我们都知道异步IO的作用,就是可以提高我们程序的并发能力,尤其在网络模型中。在linux中有aio的一系列异步IO的函数接口,但是这类函数都是glibc库中的函数,是基于多线程实现,不是真正的异步IO,在内核中有真正的异步IO函数接口。下边我们来学习一下linux内核中的异步IO。
首先我们来看一下内核中异步IO的主要函数接口:
int io_setup(unsigned nr_events, aio_context_t *ctxp);
功能:用来初始化异步IO的上下文。
其中参数ctxp用来描述异步IO上下文, 参数nr_events表示小可处理的异步IO事件的个数。
int io_submit(io_context_t ctx, long nr, struct iocb *iocbs[]);
功能:提交初始化好的异步读写事件。
其中ctx是上文的描述句柄, nr表示提交的异步事件个数。Iocbs是异步事件的结构体。
int io_getevents(io_context_t ctx, long nr, struct io_event *events[], struct timespec *timeout);
功能:获得已经完成的异步IO事件。
其中参数ctx是上下文的句柄,nr 表示期望获得异步IO事件个数,events用来存放已经完成的异步事件的数据,timeout为超时事件。
int io_destroy(aio_context_t ctx);
功能:用于销毁异步IO事件句柄。
但是内核的异步IO通常和epoll等IO多路复用配合使用来完成一些异步事件,那么就需要使用epoll来监听一个可以通知异步IO完成的描述符,那么就需要使用eventfd函数来获得一个这样的描述符。
下边附上一个epoll和内核异步IO配合使用的示例代码:
#define TEST_FILE "aio_test_file"
#define TEST_FILE_SIZE (127 * 1024)
#define NUM_EVENTS 128
#define ALIGN_SIZE 512
#define RD_WR_SIZE 1024
struct custom_iocb
{
struct iocb iocb;
int nth_request;
};
//异步IO的回调函数
void aio_callback(io_context_t ctx, struct iocb *iocb, long res, long res2)
{
struct custom_iocb *iocbp = (struct custom_iocb *)iocb;
printf("nth_request: %d, request_type: %s, offset: %lld, length: %lu, res: %ld, res2: %ld\n", iocbp->nth_request, (iocb->aio_lio_opcode == IO_CMD_PREAD) ? "READ" : "WRITE",iocb->u.c.offset, iocb->u.c.nbytes, res, res2);
}
int main(int argc, char *argv[])
{
int efd, fd, epfd;
io_context_t ctx;
struct timespec tms;
struct io_event events[NUM_EVENTS];
struct custom_iocb iocbs[NUM_EVENTS];
struct iocb *iocbps[NUM_EVENTS];
struct custom_iocb *iocbp;
int i, j, r;
void *buf;
struct epoll_event epevent;
//创建用于获取异步事件的通知描述符
efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (efd == -1) {
perror("eventfd");
return 2;
}
fd = open(TEST_FILE, O_RDWR | O_CREAT | O_DIRECT , 0644);
if (fd == -1) {
perror("open");
return 3;
}
ftruncate(fd, TEST_FILE_SIZE);
ctx = 0;
//创建异步IO的句柄
if (io_setup(8192, &ctx)) {
perror("io_setup");
return 4;
}
//申请空间
if (posix_memalign(&buf, ALIGN_SIZE, RD_WR_SIZE)) {
perror("posix_memalign");
return 5;
}
printf("buf: %p\n", buf);
for (i = 0, iocbp = iocbs; i < NUM_EVENTS; ++i, ++iocbp) {
iocbps[i] = &iocbp->iocb;
//设置异步IO读事件
io_prep_pread(&iocbp->iocb, fd, buf, RD_WR_SIZE, i * RD_WR_SIZE);
//关联通知描述符
io_set_eventfd(&iocbp->iocb, efd);
//设置回调函数
io_set_callback(&iocbp->iocb, aio_callback);
iocbp->nth_request = i + 1;
}
//提交异步IO事件
if (io_submit(ctx, NUM_EVENTS, iocbps) != NUM_EVENTS) {
perror("io_submit");
return 6;
}
epfd = epoll_create(1);
if (epfd == -1) {
perror("epoll_create");
return 7;
}
epevent.events = EPOLLIN | EPOLLET;
epevent.data.ptr = NULL;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &epevent)) {
perror("epoll_ctl");
return 8;
}
i = 0;
while (i < NUM_EVENTS) {
uint64_t finished_aio;
//监听通知描述符
if (epoll_wait(epfd, &epevent, 1, -1) != 1) {
perror("epoll_wait");
return 9;
}
//读取完成的异步IO事件个数
if (read(efd, &finished_aio, sizeof(finished_aio)) != sizeof(finished_aio)) {
perror("read");
return 10;
}
printf("finished io number: %"PRIu64"\n", finished_aio);
while (finished_aio > 0) {
tms.tv_sec = 0;
tms.tv_nsec = 0;
//获取完成的异步IO事件
r = io_getevents(ctx, 1, NUM_EVENTS, events, &tms);
if (r > 0) {
for (j = 0; j < r; ++j) {
//调用回调函数
//events[j].data的数据和设置的iocb结构体中的data数据是一致。
((io_callback_t)(events[j].data))(ctx, events[j].obj, events[j].res, events[j].res2);
}
i += r;
finished_aio -= r;
}
}
}
close(epfd);
free(buf);
io_destroy(ctx);
close(fd);
close(efd);
remove(TEST_FILE);
return 0;
}
Copyright © 2004-2024 华清远见教育科技集团 版权所有
京ICP备16055225号-5,京公海网安备11010802025203号