From c133171567fe3a81f817d0ea159bd9229d75291c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20Anic=CC=81?= Date: Mon, 3 Mar 2025 14:37:52 +0100 Subject: [PATCH] io_uring: incremental provided buffer consumption [Incremental provided buffer consumption](https://github.com/axboe/liburing/wiki/What's-new-with-io_uring-in-6.11-and-6.12#incremental-provided-buffer-consumption) support is added in kernel 6.12. IoUring.BufferGroup will now use incremental consumption whenever kernel supports it. Before, provided buffers are wholly consumed when picked. Each cqe points to the different buffer. With this, cqe points to the part of the buffer. Multiple cqe's can reuse same buffer. Appropriate sizing of buffers becomes less important. There are slight changes in BufferGroup interface (it now needs to track current receive point for each buffer). Init requires allocator instead of buffers slice, it will allocate buffers slice and head pointers slice. Get and put now requires cqe becasue there we have information will the buffer be reused. --- lib/std/os/linux.zig | 9 +- lib/std/os/linux/IoUring.zig | 181 +++++++++++++++++++---------------- 2 files changed, 106 insertions(+), 84 deletions(-) diff --git a/lib/std/os/linux.zig b/lib/std/os/linux.zig index 0cf2c60db1db..ebdcd3f94a28 100644 --- a/lib/std/os/linux.zig +++ b/lib/std/os/linux.zig @@ -5933,6 +5933,8 @@ pub const IORING_CQE_F_MORE = 1 << 1; pub const IORING_CQE_F_SOCK_NONEMPTY = 1 << 2; /// Set for notification CQEs. Can be used to distinct them from sends. pub const IORING_CQE_F_NOTIF = 1 << 3; +/// If set, the buffer ID set in the completion will get more completions. +pub const IORING_CQE_F_BUF_MORE = 1 << 4; pub const IORING_CQE_BUFFER_SHIFT = 16; @@ -6222,8 +6224,13 @@ pub const io_uring_buf_reg = extern struct { ring_addr: u64, ring_entries: u32, bgid: u16, - pad: u16, + flags: u16, resv: [3]u64, + + pub const FLAG = struct { + // Incremental buffer consummation. + pub const INC: u16 = 2; + }; }; pub const io_uring_getevents_arg = extern struct { diff --git a/lib/std/os/linux/IoUring.zig b/lib/std/os/linux/IoUring.zig index 323e36e6d6e5..368bd0fb5993 100644 --- a/lib/std/os/linux/IoUring.zig +++ b/lib/std/os/linux/IoUring.zig @@ -1594,28 +1594,34 @@ pub const BufferGroup = struct { buffers: []u8, /// Size of each buffer in buffers. buffer_size: u32, - // Number of buffers in `buffers`, number of `io_uring_buf structures` in br. + /// Number of buffers in `buffers`, number of `io_uring_buf structures` in br. buffers_count: u16, + /// Head of unconsumed part of each buffer, if incremental consumption is enabled + heads: []u32, /// ID of this group, must be unique in ring. group_id: u16, pub fn init( ring: *IoUring, + allocator: mem.Allocator, group_id: u16, - buffers: []u8, buffer_size: u32, buffers_count: u16, ) !BufferGroup { - assert(buffers.len == buffers_count * buffer_size); + const buffers = try allocator.alloc(u8, buffer_size * buffers_count); + errdefer allocator.free(buffers); + const heads = try allocator.alloc(u32, buffers_count); + errdefer allocator.free(heads); - const br = try setup_buf_ring(ring.fd, buffers_count, group_id); + const br = try setup_buf_ring(ring.fd, buffers_count, group_id, linux.io_uring_buf_reg.FLAG.INC); buf_ring_init(br); const mask = buf_ring_mask(buffers_count); var i: u16 = 0; while (i < buffers_count) : (i += 1) { - const start = buffer_size * i; - const buf = buffers[start .. start + buffer_size]; + const pos = buffer_size * i; + const buf = buffers[pos .. pos + buffer_size]; + heads[i] = 0; buf_ring_add(br, buf, i, mask, i); } buf_ring_advance(br, buffers_count); @@ -1625,11 +1631,18 @@ pub const BufferGroup = struct { .group_id = group_id, .br = br, .buffers = buffers, + .heads = heads, .buffer_size = buffer_size, .buffers_count = buffers_count, }; } + pub fn deinit(self: *BufferGroup, allocator: mem.Allocator) void { + free_buf_ring(self.ring.fd, self.br, self.buffers_count, self.group_id); + allocator.free(self.buffers); + allocator.free(self.heads); + } + // Prepare recv operation which will select buffer from this group. pub fn recv(self: *BufferGroup, user_data: u64, fd: posix.fd_t, flags: u32) !*linux.io_uring_sqe { var sqe = try self.ring.get_sqe(); @@ -1649,33 +1662,34 @@ pub const BufferGroup = struct { } // Get buffer by id. - pub fn get(self: *BufferGroup, buffer_id: u16) []u8 { - const head = self.buffer_size * buffer_id; - return self.buffers[head .. head + self.buffer_size]; + fn get_by_id(self: *BufferGroup, buffer_id: u16) []u8 { + const pos = self.buffer_size * buffer_id; + return self.buffers[pos .. pos + self.buffer_size][self.heads[buffer_id]..]; } // Get buffer by CQE. - pub fn get_cqe(self: *BufferGroup, cqe: linux.io_uring_cqe) ![]u8 { + pub fn get(self: *BufferGroup, cqe: linux.io_uring_cqe) ![]u8 { const buffer_id = try cqe.buffer_id(); const used_len = @as(usize, @intCast(cqe.res)); - return self.get(buffer_id)[0..used_len]; - } - - // Release buffer to the kernel. - pub fn put(self: *BufferGroup, buffer_id: u16) void { - const mask = buf_ring_mask(self.buffers_count); - const buffer = self.get(buffer_id); - buf_ring_add(self.br, buffer, buffer_id, mask, 0); - buf_ring_advance(self.br, 1); + return self.get_by_id(buffer_id)[0..used_len]; } // Release buffer from CQE to the kernel. - pub fn put_cqe(self: *BufferGroup, cqe: linux.io_uring_cqe) !void { - self.put(try cqe.buffer_id()); - } + pub fn put(self: *BufferGroup, cqe: linux.io_uring_cqe) !void { + const buffer_id = try cqe.buffer_id(); + if (cqe.flags & linux.IORING_CQE_F_BUF_MORE == linux.IORING_CQE_F_BUF_MORE) { + // Incremental consumption active, kernel will write to the this buffer again + const used_len = @as(u32, @intCast(cqe.res)); + // Track what part of the buffer is used + self.heads[buffer_id] += used_len; + return; + } + self.heads[buffer_id] = 0; - pub fn deinit(self: *BufferGroup) void { - free_buf_ring(self.ring.fd, self.br, self.buffers_count, self.group_id); + // Release buffer to the kernel. const mask = buf_ring_mask(self.buffers_count); + const mask = buf_ring_mask(self.buffers_count); + buf_ring_add(self.br, self.get_by_id(buffer_id), buffer_id, mask, 0); + buf_ring_advance(self.br, 1); } }; @@ -1684,7 +1698,7 @@ pub const BufferGroup = struct { /// `fd` is IO_Uring.fd for which the provided buffer ring is being registered. /// `entries` is the number of entries requested in the buffer ring, must be power of 2. /// `group_id` is the chosen buffer group ID, unique in IO_Uring. -pub fn setup_buf_ring(fd: posix.fd_t, entries: u16, group_id: u16) !*align(page_size_min) linux.io_uring_buf_ring { +pub fn setup_buf_ring(fd: posix.fd_t, entries: u16, group_id: u16, flags: u16) !*align(page_size_min) linux.io_uring_buf_ring { if (entries == 0 or entries > 1 << 15) return error.EntriesNotInRange; if (!std.math.isPowerOfTwo(entries)) return error.EntriesNotPowerOfTwo; @@ -1701,22 +1715,24 @@ pub fn setup_buf_ring(fd: posix.fd_t, entries: u16, group_id: u16) !*align(page_ assert(mmap.len == mmap_size); const br: *align(page_size_min) linux.io_uring_buf_ring = @ptrCast(mmap.ptr); - try register_buf_ring(fd, @intFromPtr(br), entries, group_id); + try register_buf_ring(fd, @intFromPtr(br), entries, group_id, flags); return br; } -fn register_buf_ring(fd: posix.fd_t, addr: u64, entries: u32, group_id: u16) !void { +fn register_buf_ring(fd: posix.fd_t, addr: u64, entries: u32, group_id: u16, flags: u16) !void { var reg = mem.zeroInit(linux.io_uring_buf_reg, .{ .ring_addr = addr, .ring_entries = entries, .bgid = group_id, + .flags = flags, }); - const res = linux.io_uring_register( - fd, - .REGISTER_PBUF_RING, - @as(*const anyopaque, @ptrCast(®)), - 1, - ); + var res = linux.io_uring_register(fd, .REGISTER_PBUF_RING, @as(*const anyopaque, @ptrCast(®)), 1); + if (linux.E.init(res) == .INVAL and reg.flags & linux.io_uring_buf_reg.FLAG.INC > 0) { + // Retry without incremental buffer consumption. + // It is available since kernel 6.12. returns INVAL on older. + reg.flags &= ~linux.io_uring_buf_reg.FLAG.INC; + res = linux.io_uring_register(fd, .REGISTER_PBUF_RING, @as(*const anyopaque, @ptrCast(®)), 1); + } try handle_register_buf_ring_result(res); } @@ -4041,12 +4057,10 @@ test BufferGroup { const group_id: u16 = 1; // buffers group id const buffers_count: u16 = 1; // number of buffers in buffer group const buffer_size: usize = 128; // size of each buffer in group - const buffers = try testing.allocator.alloc(u8, buffers_count * buffer_size); - defer testing.allocator.free(buffers); var buf_grp = BufferGroup.init( &ring, + testing.allocator, group_id, - buffers, buffer_size, buffers_count, ) catch |err| switch (err) { @@ -4054,7 +4068,7 @@ test BufferGroup { error.ArgumentsInvalid => return error.SkipZigTest, else => return err, }; - defer buf_grp.deinit(); + defer buf_grp.deinit(testing.allocator); // Create client/server fds const fds = try createSocketTestHarness(&ring); @@ -4085,14 +4099,11 @@ test BufferGroup { try testing.expectEqual(posix.E.SUCCESS, cqe.err()); try testing.expectEqual(data.len, @as(usize, @intCast(cqe.res))); // cqe.res holds received data len - // Read buffer_id and used buffer len from cqe - const buffer_id = try cqe.buffer_id(); - const len: usize = @intCast(cqe.res); // Get buffer from pool - const buf = buf_grp.get(buffer_id)[0..len]; + const buf = try buf_grp.get(cqe); try testing.expectEqualSlices(u8, &data, buf); // Release buffer to the kernel when application is done with it - buf_grp.put(buffer_id); + try buf_grp.put(cqe); } } @@ -4110,12 +4121,10 @@ test "ring mapped buffers recv" { const group_id: u16 = 1; // buffers group id const buffers_count: u16 = 2; // number of buffers in buffer group const buffer_size: usize = 4; // size of each buffer in group - const buffers = try testing.allocator.alloc(u8, buffers_count * buffer_size); - defer testing.allocator.free(buffers); var buf_grp = BufferGroup.init( &ring, + testing.allocator, group_id, - buffers, buffer_size, buffers_count, ) catch |err| switch (err) { @@ -4123,7 +4132,7 @@ test "ring mapped buffers recv" { error.ArgumentsInvalid => return error.SkipZigTest, else => return err, }; - defer buf_grp.deinit(); + defer buf_grp.deinit(testing.allocator); // create client/server fds const fds = try createSocketTestHarness(&ring); @@ -4145,14 +4154,18 @@ test "ring mapped buffers recv" { if (cqe_send.err() == .INVAL) return error.SkipZigTest; try testing.expectEqual(linux.io_uring_cqe{ .user_data = user_data, .res = data.len, .flags = 0 }, cqe_send); } - - // server reads data into provided buffers - // there are 2 buffers of size 4, so each read gets only chunk of data - // we read four chunks of 4, 4, 4, 3 bytes each - var chunk: []const u8 = data[0..buffer_size]; // first chunk - const id1 = try expect_buf_grp_recv(&ring, &buf_grp, fds.server, rnd.int(u64), chunk); - chunk = data[buffer_size .. buffer_size * 2]; // second chunk - const id2 = try expect_buf_grp_recv(&ring, &buf_grp, fds.server, rnd.int(u64), chunk); + var pos: usize = 0; + + // read first chunk + const cqe1 = try buf_grp_recv_submit_get_cqe(&ring, &buf_grp, fds.server, rnd.int(u64)); + var buf = try buf_grp.get(cqe1); + try testing.expectEqualSlices(u8, data[pos..][0..buf.len], buf); + pos += buf.len; + // second chunk + const cqe2 = try buf_grp_recv_submit_get_cqe(&ring, &buf_grp, fds.server, rnd.int(u64)); + buf = try buf_grp.get(cqe2); + try testing.expectEqualSlices(u8, data[pos..][0..buf.len], buf); + pos += buf.len; // both buffers provided to the kernel are used so we get error // 'no more buffers', until we put buffers to the kernel @@ -4169,16 +4182,17 @@ test "ring mapped buffers recv" { } // put buffers back to the kernel - buf_grp.put(id1); - buf_grp.put(id2); - - chunk = data[buffer_size * 2 .. buffer_size * 3]; // third chunk - const id3 = try expect_buf_grp_recv(&ring, &buf_grp, fds.server, rnd.int(u64), chunk); - buf_grp.put(id3); - - chunk = data[buffer_size * 3 ..]; // last chunk - const id4 = try expect_buf_grp_recv(&ring, &buf_grp, fds.server, rnd.int(u64), chunk); - buf_grp.put(id4); + try buf_grp.put(cqe1); + try buf_grp.put(cqe2); + + // read remaining data + while (pos < data.len) { + const cqe = try buf_grp_recv_submit_get_cqe(&ring, &buf_grp, fds.server, rnd.int(u64)); + buf = try buf_grp.get(cqe); + try testing.expectEqualSlices(u8, data[pos..][0..buf.len], buf); + pos += buf.len; + try buf_grp.put(cqe); + } } } @@ -4196,12 +4210,10 @@ test "ring mapped buffers multishot recv" { const group_id: u16 = 1; // buffers group id const buffers_count: u16 = 2; // number of buffers in buffer group const buffer_size: usize = 4; // size of each buffer in group - const buffers = try testing.allocator.alloc(u8, buffers_count * buffer_size); - defer testing.allocator.free(buffers); var buf_grp = BufferGroup.init( &ring, + testing.allocator, group_id, - buffers, buffer_size, buffers_count, ) catch |err| switch (err) { @@ -4209,7 +4221,7 @@ test "ring mapped buffers multishot recv" { error.ArgumentsInvalid => return error.SkipZigTest, else => return err, }; - defer buf_grp.deinit(); + defer buf_grp.deinit(testing.allocator); // create client/server fds const fds = try createSocketTestHarness(&ring); @@ -4222,7 +4234,7 @@ test "ring mapped buffers multishot recv" { var round: usize = 4; // repeat send/recv cycle round times while (round > 0) : (round -= 1) { // client sends data - const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe }; + const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf }; { const user_data = rnd.int(u64); _ = try ring.send(user_data, fds.client, data[0..], 0); @@ -4239,7 +4251,7 @@ test "ring mapped buffers multishot recv" { // server reads data into provided buffers // there are 2 buffers of size 4, so each read gets only chunk of data - // we read four chunks of 4, 4, 4, 3 bytes each + // we read four chunks of 4, 4, 4, 4 bytes each var chunk: []const u8 = data[0..buffer_size]; // first chunk const cqe1 = try expect_buf_grp_cqe(&ring, &buf_grp, recv_user_data, chunk); try testing.expect(cqe1.flags & linux.IORING_CQE_F_MORE > 0); @@ -4263,8 +4275,8 @@ test "ring mapped buffers multishot recv" { } // put buffers back to the kernel - buf_grp.put(try cqe1.buffer_id()); - buf_grp.put(try cqe2.buffer_id()); + try buf_grp.put(cqe1); + try buf_grp.put(cqe2); // restart multishot recv_user_data = rnd.int(u64); @@ -4274,12 +4286,12 @@ test "ring mapped buffers multishot recv" { chunk = data[buffer_size * 2 .. buffer_size * 3]; // third chunk const cqe3 = try expect_buf_grp_cqe(&ring, &buf_grp, recv_user_data, chunk); try testing.expect(cqe3.flags & linux.IORING_CQE_F_MORE > 0); - buf_grp.put(try cqe3.buffer_id()); + try buf_grp.put(cqe3); chunk = data[buffer_size * 3 ..]; // last chunk const cqe4 = try expect_buf_grp_cqe(&ring, &buf_grp, recv_user_data, chunk); try testing.expect(cqe4.flags & linux.IORING_CQE_F_MORE > 0); - buf_grp.put(try cqe4.buffer_id()); + try buf_grp.put(cqe4); // cancel pending multishot recv operation { @@ -4323,23 +4335,26 @@ test "ring mapped buffers multishot recv" { } } -// Prepare and submit recv using buffer group. -// Test that buffer from group, pointed by cqe, matches expected. -fn expect_buf_grp_recv( +// Prepare, submit recv and get cqe using buffer group. +fn buf_grp_recv_submit_get_cqe( ring: *IoUring, buf_grp: *BufferGroup, fd: posix.fd_t, user_data: u64, - expected: []const u8, -) !u16 { - // prepare and submit read +) !linux.io_uring_cqe { + // prepare and submit recv const sqe = try buf_grp.recv(user_data, fd, 0); try testing.expect(sqe.flags & linux.IOSQE_BUFFER_SELECT == linux.IOSQE_BUFFER_SELECT); try testing.expect(sqe.buf_index == buf_grp.group_id); try testing.expectEqual(@as(u32, 1), try ring.submit()); // submit + // get cqe, expect success + const cqe = try ring.copy_cqe(); + try testing.expectEqual(user_data, cqe.user_data); + try testing.expect(cqe.res >= 0); // success + try testing.expectEqual(posix.E.SUCCESS, cqe.err()); + try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER); // IORING_CQE_F_BUFFER flag is set - const cqe = try expect_buf_grp_cqe(ring, buf_grp, user_data, expected); - return try cqe.buffer_id(); + return cqe; } fn expect_buf_grp_cqe( @@ -4359,7 +4374,7 @@ fn expect_buf_grp_cqe( // get buffer from pool const buffer_id = try cqe.buffer_id(); const len = @as(usize, @intCast(cqe.res)); - const buf = buf_grp.get(buffer_id)[0..len]; + const buf = buf_grp.get_by_id(buffer_id)[0..len]; try testing.expectEqualSlices(u8, expected, buf); return cqe;