liburing icon indicating copy to clipboard operation
liburing copied to clipboard

When using `io_uring_prep_recv_multishot` and `IORING_RECVSEND_BUNDLE` and `IOU_PBUF_RING_INC`, data will only be accepted once!

Open AomaYple opened this issue 1 year ago • 16 comments

As stated in the title, io_uring_prep_recv_multishot must be initiated again to continue receiving data

AomaYple avatar Jan 04 '25 06:01 AomaYple

You're going to need to be a LOT more specific on what you're talking about here.

axboe avatar Jan 04 '25 16:01 axboe

You're going to need to be a LOT more specific on what you're talking about here.

#include <arpa/inet.h>
#include <liburing.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

int main() {
    const unsigned entries = 32768;
    struct io_uring ring;
    int result = io_uring_queue_init(entries, &ring, 0);
    if (result != 0) {
        printf("Failed to initialize io_uring queue: %s\n", strerror(-result));
        exit(1);
    }
    const int ringBufferId = 0;
    int error;
    struct io_uring_buf_ring *ringBuffer =
        io_uring_setup_buf_ring(&ring, entries, ringBufferId, IOU_PBUF_RING_INC, &error);
    if (ringBuffer == NULL) {
        printf("Failed to setup buffer ring: %s\n", strerror(-error));
        exit(1);
    }

    char buffer0[16], buffer1[16], buffer2[16];
    unsigned buffer0Offset = 0, buffer1Offset = 0, buffer2Offset = 0;
    const unsigned bufferLength = sizeof(buffer0);
    io_uring_buf_ring_add(ringBuffer, buffer0, bufferLength, 0, io_uring_buf_ring_mask(entries), 0);
    io_uring_buf_ring_add(ringBuffer, buffer1, bufferLength, 1, io_uring_buf_ring_mask(entries), 1);
    io_uring_buf_ring_add(ringBuffer, buffer2, bufferLength, 2, io_uring_buf_ring_mask(entries), 2);
    io_uring_buf_ring_advance(ringBuffer, 3);

    int listenFd = socket(AF_INET, SOCK_STREAM, 0);
    if (listenFd == -1) {
        printf("Failed to create socket: %s\n", strerror(errno));
        exit(1);
    }

    int option = 1;
    result = setsockopt(listenFd, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(option));
    if (result == -1) {
        printf("Failed to set socket option: %s\n", strerror(errno));
        exit(1);
    }

    struct sockaddr_in address = {0};
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_family = AF_INET;
    address.sin_port = htons(8080);

    result = bind(listenFd, (struct sockaddr *) &address, sizeof(address));
    if (result == -1) {
        printf("Failed to bind socket: %s\n", strerror(errno));
        exit(1);
    }

    result = listen(listenFd, SOMAXCONN);
    if (result == -1) {
        printf("Failed to listen on socket: %s\n", strerror(errno));
        exit(1);
    }

    int clientFd = accept(listenFd, NULL, NULL);
    if (clientFd == -1) {
        printf("Failed to accept connection: %s\n", strerror(errno));
        exit(1);
    }

    for (int i = 0; i != 3; ++i) { // loop 3 times
        struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
        if (sqe == NULL) {
            printf("no sqe available\n");
            exit(1);
        }

        io_uring_prep_recv_multishot(sqe, clientFd, NULL, 0, 0);
        io_uring_sqe_set_flags(sqe, IOSQE_BUFFER_SELECT);
        sqe->ioprio = IORING_RECVSEND_BUNDLE;
        sqe->buf_group = ringBufferId;

        result = io_uring_submit_and_wait(&ring, 1);
        if (result < 0) {
            printf("Failed to submit and wait: %s\n", strerror(-result));
            exit(1);
        }

        int count = 0;
        struct io_uring_cqe *cqe;
        unsigned head;
        io_uring_for_each_cqe(&ring, head, cqe) {
            printf("received %d bytes\n",
                   cqe->res);    // only print once, if remove IORING_RECVSEND_BUNDLE, it will print 3 times

            ++count;
        }
        io_uring_cq_advance(&ring, count);
    }

    result = io_uring_free_buf_ring(&ring, ringBuffer, entries, ringBufferId);
    if (result != 0) {
        printf("Failed to free buffer ring: %s\n", strerror(-result));
        exit(1);
    }

    io_uring_queue_exit(&ring);
}

As described above, this is an acceptance program.

Three buffers were added, all 16 bytes in size

At this point, I write a send program that sends 40 bytes to this program.

I open a loop for three times.

Each loop initiates an io_uring_prep_recv_multishot

But each io_uring_for_each_cqe generates only one cqe and prints the data once.

The first two times it prints out 16 bytes and the last time it prints 8 bytes before it is all received.

So why can't it be accepted all at once and have to be initiated manually to do so?

AomaYple avatar Jan 05 '25 08:01 AomaYple

You should only generate and submit a new recv multishot, when the previous CQE for that multishot request doesn't have IORING_CQE_F_MORE set. If it does, then more completions will be generated by that request.

axboe avatar Jan 05 '25 18:01 axboe

That said, the behavior does sound odd. I'll take a look.

axboe avatar Jan 05 '25 18:01 axboe

Oh, well I think this one is pretty simple. When you do:

sqe->ioprio = IORING_RECVSEND_BUNDLE;

you're overwriting the multishot flag. Change that to a |= and I suspect it'll look more like what you expect.

axboe avatar Jan 05 '25 18:01 axboe

Oh, well I think this one is pretty simple. When you do:

sqe->ioprio = IORING_RECVSEND_BUNDLE;

you're overwriting the multishot flag. Change that to a |= and I suspect it'll look more like what you expect.

#include <arpa/inet.h>
#include <liburing.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

void addBuffer(struct io_uring_buf_ring *ringBuffer, int bufferId, int mask);
void submitRecvMultishot(struct io_uring *ring, int clientFd, int ringBufferId);
void forEachCqe(struct io_uring *ring, struct io_uring_buf_ring *ringBuffer, int *bufferId, int mask);

int main() {
    const unsigned entries = 32768;

    struct io_uring ring;
    int result = io_uring_queue_init(entries, &ring, 0);
    if (result != 0) {
        printf("Failed to initialize io_uring queue: %s\n", strerror(-result));
        exit(1);
    }

    const int ringBufferId = 0;
    int error;
    struct io_uring_buf_ring *ringBuffer =
        io_uring_setup_buf_ring(&ring, entries, ringBufferId, IOU_PBUF_RING_INC, &error);
    if (ringBuffer == NULL) {
        printf("Failed to setup buffer ring: %s\n", strerror(-error));
        exit(1);
    }
    int bufferId = 0;

    const int listenFd = socket(AF_INET, SOCK_STREAM, 0);
    if (listenFd == -1) {
        printf("Failed to create socket: %s\n", strerror(errno));
        exit(1);
    }

    const int option = 1;
    result = setsockopt(listenFd, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(option));
    if (result == -1) {
        printf("Failed to set socket option: %s\n", strerror(errno));
        exit(1);
    }

    struct sockaddr_in address = {0};
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_family = AF_INET;
    address.sin_port = htons(8080);

    result = bind(listenFd, (struct sockaddr *) &address, sizeof(address));
    if (result == -1) {
        printf("Failed to bind socket: %s\n", strerror(errno));
        exit(1);
    }

    result = listen(listenFd, SOMAXCONN);
    if (result == -1) {
        printf("Failed to listen on socket: %s\n", strerror(errno));
        exit(1);
    }

    const int clientFd = accept(listenFd, NULL, NULL);
    if (clientFd == -1) {
        printf("Failed to accept connection: %s\n", strerror(errno));
        exit(1);
    }

    // client send 20 bytes

    submitRecvMultishot(&ring, clientFd, ringBufferId);
    forEachCqe(&ring, ringBuffer, &bufferId, io_uring_buf_ring_mask(entries));
    // first, generate ENOBUFS

    submitRecvMultishot(&ring, clientFd, ringBufferId);
    forEachCqe(&ring, ringBuffer, &bufferId, io_uring_buf_ring_mask(entries));
    // second, receive 16 bytes, but not all
    // if not set IORING_RECVSEND_BUNDLE, will generate two cqe, one for 16 bytes, one ENOBUFS, then add buffer
    // if set IORING_RECVSEND_BUNDLE, will generate one cqe, 16 bytes

    submitRecvMultishot(&ring, clientFd, ringBufferId);
    forEachCqe(&ring, ringBuffer, &bufferId, io_uring_buf_ring_mask(entries));
    // thrid
    // if not set IORING_RECVSEND_BUNDLE, will generate one cqe, 4 bytes
    // if set IORING_RECVSEND_BUNDLE, will generate one cqe, ENOBUFS, then add buffer

    submitRecvMultishot(&ring, clientFd, ringBufferId);
    forEachCqe(&ring, ringBuffer, &bufferId, io_uring_buf_ring_mask(entries));
    // if set IORING_RECVSEND_BUNDLE, will generate one cqe, 4 bytes

    result = io_uring_free_buf_ring(&ring, ringBuffer, entries, ringBufferId);
    if (result != 0) {
        printf("Failed to free buffer ring: %s\n", strerror(-result));
        exit(1);
    }

    io_uring_queue_exit(&ring);
}

void addBuffer(struct io_uring_buf_ring *const ringBuffer, const int bufferId, const int mask) {
    void *buffer = malloc(16);    // allocate 16 bytes
    io_uring_buf_ring_add(ringBuffer, buffer, 16, bufferId, mask, 0);
    io_uring_buf_ring_advance(ringBuffer, 1);
}

void submitRecvMultishot(struct io_uring *const ring, const int clientFd, const int ringBufferId) {
    struct io_uring_sqe *const sqe = io_uring_get_sqe(ring);
    if (sqe == NULL) {
        printf("no sqe available\n");
        exit(1);
    }

    io_uring_prep_recv_multishot(sqe, clientFd, NULL, 0, 0);
    io_uring_sqe_set_flags(sqe, IOSQE_BUFFER_SELECT);
    sqe->ioprio |= IORING_RECVSEND_BUNDLE;
    sqe->buf_group = ringBufferId;

    const int result = io_uring_submit_and_wait(ring, 1);
    if (result < 0) {
        printf("Failed to submit and wait: %s\n", strerror(-result));
        exit(1);
    }
}

void forEachCqe(struct io_uring *const ring, struct io_uring_buf_ring *const ringBuffer, int *const bufferId,
                const int mask) {
    struct io_uring_cqe *cqe;
    unsigned head;
    io_uring_for_each_cqe(ring, head, cqe) {
        printf("received %d bytes\n", cqe->res);
        if (cqe->res == -ENOBUFS) {
            printf("ENOBUFS\n");
            addBuffer(ringBuffer, (*bufferId)++, mask);
        }

        if ((cqe->flags & IORING_CQE_F_BUFFER) != 0) {
            const unsigned currentBufferId = cqe->flags >> IORING_CQE_BUFFER_SHIFT;
            printf("buffer id: %d\n", currentBufferId);
            printf("is more data: %d\n", (cqe->flags & IORING_CQE_F_BUF_MORE) != 0);
        }

        io_uring_cq_advance(ring, 1);
    }

    printf("loop end\n");
}

The client sends 20 bytes to the server. The server-side io_uring_buf_ring has a buffer size of 16 bytes per buffer, meaning that it takes at least two buffers to be accepted.

Assume that initiating an io_uring_prep_recv_multishot and io_uring_for_each_cqe is a single execution.

Disregarding reuse of buffers.

There are two scenarios here.

  1. Does not set IORING_RECVSEND_BUNDLE.

    1. The first execution, which produces a cqe, results in -ENOBUFS because there is no buffer.then add the buffer.
    2. The second execution produces two cqe. the first for received 16 bytes and the second for -ENOBUFS. then add the buffer.
    3. The third execution produces a cqe and receives 4 bytes of data.
  2. Setting IORING_RECVSEND_BUNDLE

    1. The first execution, which produces a cqe, results in -ENOBUFS because there is no buffer.then add the buffer.
    2. The second execution produces a cqe and receives 16 bytes.
    3. The third execution, which produces a cqe, results in -ENOBUFS because there is no buffer.then add the buffer.
    4. The fourth execution produces a cqe and receives 4 bytes of data.

So why are the two situations different? Why does IORING_RECVSEND_BUNDLE need to be executed once?

AomaYple avatar Jan 06 '25 10:01 AomaYple

I asked this before but you did not answer - what kernel are you using?

axboe avatar Jan 06 '25 15:01 axboe

I asked this before but you did not answer - what kernel are you using?

Arch WSL 6.12.8

AomaYple avatar Jan 07 '25 01:01 AomaYple

Here's what I see:

IORING_RECVSEND_BUNDLE``` not set
received -105 bytes
ENOBUFS
loop end
received 16 bytes
buffer id: 0
is more data: 0
received -105 bytes
ENOBUFS
loop end
received 4 bytes
buffer id: 1
is more data: 1
received 0 bytes
loop end
received 0 bytes
loop end
IORING_RECVSEND_BUNDLE``` set
received -105 bytes
ENOBUFS
loop end
received 16 bytes
buffer id: 0
is more data: 0
loop end
received -105 bytes
ENOBUFS
loop end
received 4 bytes
buffer id: 1
is more data: 1
loop end

which looks as expected? I appreciate the reports, but you never seem to clearly mention how what you're seeing is differing from your expectations. Any bug or issue report should include that, to avoid the other end needing to guess.

axboe avatar Jan 07 '25 19:01 axboe

Here's what I see:

IORING_RECVSEND_BUNDLE``` not set
received -105 bytes
ENOBUFS
loop end
received 16 bytes
buffer id: 0
is more data: 0
received -105 bytes
ENOBUFS
loop end
received 4 bytes
buffer id: 1
is more data: 1
received 0 bytes
loop end
received 0 bytes
loop end
IORING_RECVSEND_BUNDLE``` set
received -105 bytes
ENOBUFS
loop end
received 16 bytes
buffer id: 0
is more data: 0
loop end
received -105 bytes
ENOBUFS
loop end
received 4 bytes
buffer id: 1
is more data: 1
loop end

which looks as expected? I appreciate the reports, but you never seem to clearly mention how what you're seeing is differing from your expectations. Any bug or issue report should include that, to avoid the other end needing to guess.

It looks like you're getting the same results as me. My question is why after setting IORING_RECVSEND_BUNDLE, why does the second receive only generate a cqe and no ENOBUFS is generated.

In my other code, my buffer additions are triggered mostly by ENOBUFS, and if the second receive doesn't generate ENOBUFS, it causes that socket to never finish receiving the rest of the data.

AomaYple avatar Jan 08 '25 01:01 AomaYple

@AomaYple I've just tested this issue, I reproduced and receives same result with you. But it seems that the second cqe that receives 16 bytes without ENOBUF. it has cflags 5. cqe flags 5 would be IORING_CQE_F_BUFFER | IORING_CQE_F_SOCK_NONEMPTY

IORING_CQE_F_SOCK_NONEMPTY   If set, more data to read after socket recv

You can know that there is more that to receive for this socket without ENOBUF

SidongYang avatar Feb 22 '25 14:02 SidongYang

As stated in the title, io_uring_prep_recv_multishot must be initiated again to continue receiving data

Hi.

你的问题可能没设置好BUFFER AND RING,在io_uring_setup_buf_ring时,其中nentries的值与之后使用io_uring_buf_ring_add添加BUFFER的次数必须一致,即setup中nentries是几,那么必须调用nentries次数的io_uring_buf_ring_add进行添加,且io_uring_buf_ring_advance的值也必须是nentries。这样填满后,才能在之后的使用中进行循环起来。

如果没填满,那么ENOBUF则无法处理。 如果是满的。ENOBUF出现时,可以不用担心,因为只是没有空余的,只要其它占用的进行io_uring_buf_ring_advance交还后,整个BUFFER AND RING还是可以用的。

IORING_RECVSEND_BUNDLE 当这个设置了,我这边是正常的,它会连续的使用buffer,从而减少CQE的数量。

~~然后,见你也有用WSL,WSL是不安全的。 因为我的WSL内核版本是6.13.6-microsoft-standard-WSL2,遇到很多问题,在使用RECV MULTI进行ECHO时,当数据大于3072时,发过去A,它给我一个B,特别无语。还有 direct alloc fixed fd 也不好使,socket它会自动关闭。~~

WSL 在 ECHO 的问题以解决,是networkingMode=mirrored引起的,但很特别,只在MULTISHOT_RECV时才会出现问题,出现问题的情况是BID重复,连续3次同一个BID,而不是递增。关闭mirrored后,自动分配 FIXED ID 也正常了。

但在HYPER-V里不会出现这些问题。

wamshawn avatar Apr 03 '25 14:04 wamshawn

补充几个关于io_uring_prep_recv_multishot使用心得。 目前一个CONN独享一个BUFFER AND RING的情况是完全没有问题的。 多个CONN共享一个BUFFER AND RING,这比较难以控制,不同内核版本会出现不一样的情况,而且和BR的大小有强关系,难以确定多少个共享多大的是绝对安全且不浪费的。

注意,是不使用IOU_PBUF_RING_INC,不想处理这个,这个和IOURING_RECVSEND_BUNDLE貌似不合。

关于 BUFFER AND RING的设置如上述,要设置满。

关于CQE的处理。

  1. 关于错误,即RES小于0。 判断是否需要重新提交任务,如ENOBUF,这是需要重新提交任务的。其它基本上是断开了。

  2. 关于正确的,即RES大于-1。 此时IORING_CQE_F_BUFFER肯定存在的,如果不存在,也得不到BID,确保BID的正确,需要判断其存在。 处理RES,即是否有数据收到,有就处理数据,找出BIDRES所对应的BUFFER组。 是的,BID只是起始,而RES才是决定有几个BUFFER。然后io_uring_buf_ring_advance进行交回几个所用的。 同时注意如果有BIDRES为0,也是需要交回一个。

这时不要返回函数,否则会错过任务的重新提交。

接着判断FLAGS,因为即使RES大于0,也会有 FLAGS。 判断是否存在IORING_CQE_F_MORE,如果存在,则返回,任务还在继续中。 如果不存在,判断RES是否为0,为0可视为EOF,不为0,需要重新提交任务,如同ENOBUF的处理。 因为可能BR用光了,对的,用光了不一定是ENOBUF,还有其它的可能,如果共享BR的话。

wamshawn avatar Apr 04 '25 15:04 wamshawn

@wamshawn I ignored it yesterday, but let's not make a habit of using non-english here. Please translate and post instead.

axboe avatar Apr 04 '25 15:04 axboe

@wamshawn I ignored it yesterday, but let's not make a habit of using non-english here. Please translate and post instead.

OOPS...

@axboe I was just communicating with AomaYple, so I used Chinese for convenience.

Next time i will use english.

For short, it just about the usage of recv_multishot and buffer and ring.

wamshawn avatar Apr 11 '25 14:04 wamshawn

@axboe I was just communicating with AomaYple, so I used Chinese for convenience. Next time i will use english.

Thanks! Because the thing is, you're not just communicating with 1 other person, it's for every one to read and learn from (or be able to comment on). This isn't a private email, it's a public forum.

axboe avatar Apr 11 '25 15:04 axboe