diff --git a/lib/std/os/linux.zig b/lib/std/os/linux.zig index d224a45a8fc9..b05ea412c5d8 100644 --- a/lib/std/os/linux.zig +++ b/lib/std/os/linux.zig @@ -5806,6 +5806,9 @@ pub const IORING_OP = enum(u8) { FUTEX_WAITV, FIXED_FD_INSTALL, FTRUNCATE, + BIND, + LISTEN, + RECV_ZC, _, }; @@ -5930,6 +5933,8 @@ pub const IORING_CQE_F_MORE = 1 << 1; pub const IORING_CQE_F_SOCK_NONEMPTY = 1 << 2; /// Set for notification CQEs. Can be used to distinct them from sends. pub const IORING_CQE_F_NOTIF = 1 << 3; +/// If set, the buffer ID set in the completion will get more completions. +pub const IORING_CQE_F_BUF_MORE = 1 << 4; pub const IORING_CQE_BUFFER_SHIFT = 16; @@ -6135,26 +6140,32 @@ pub const IO_URING_OP_SUPPORTED = 1 << 0; pub const io_uring_probe_op = extern struct { op: IORING_OP, - resv: u8, - /// IO_URING_OP_* flags flags: u16, - resv2: u32, + + pub fn is_supported(self: @This()) bool { + return self.flags & IO_URING_OP_SUPPORTED != 0; + } }; pub const io_uring_probe = extern struct { - /// last opcode supported + /// Last opcode supported last_op: IORING_OP, - - /// Number of io_uring_probe_op following + /// Length of ops[] array below ops_len: u8, - resv: u16, resv2: [3]u32, - - // Followed by up to `ops_len` io_uring_probe_op structures + ops: [256]io_uring_probe_op, + + /// Is the operation supported on the running kernel. + pub fn is_supported(self: @This(), op: IORING_OP) bool { + const i = @intFromEnum(op); + if (i > @intFromEnum(self.last_op) or i >= self.ops_len) + return false; + return self.ops[i].is_supported(); + } }; pub const io_uring_restriction = extern struct { @@ -6190,6 +6201,13 @@ pub const IORING_RESTRICTION = enum(u16) { _, }; +pub const IO_URING_SOCKET_OP = enum(u16) { + SIOCIN = 0, + SIOCOUTQ = 1, + GETSOCKOPT = 2, + SETSOCKOPT = 3, +}; + pub const io_uring_buf = extern struct { addr: u64, len: u32, @@ -6209,8 +6227,15 @@ pub const io_uring_buf_reg = extern struct { ring_addr: u64, ring_entries: u32, bgid: u16, - pad: u16, + flags: Flags, resv: [3]u64, + + pub const Flags = packed struct { + _0: u1 = 0, + /// Incremental buffer consumption. + inc: bool, + _: u14 = 0, + }; }; pub const io_uring_getevents_arg = extern struct { diff --git a/lib/std/os/linux/IoUring.zig b/lib/std/os/linux/IoUring.zig index f26b2ce8bf96..2daf29fd0535 100644 --- a/lib/std/os/linux/IoUring.zig +++ b/lib/std/os/linux/IoUring.zig @@ -1272,6 +1272,16 @@ pub fn unregister_buffers(self: *IoUring) !void { } } +/// Returns a io_uring_probe which is used to probe the capabilities of the +/// io_uring subsystem of the running kernel. The io_uring_probe contains the +/// list of supported operations. +pub fn get_probe(self: *IoUring) !linux.io_uring_probe { + var probe = mem.zeroInit(linux.io_uring_probe, .{}); + const res = linux.io_uring_register(self.fd, .REGISTER_PROBE, &probe, probe.ops.len); + try handle_register_buf_ring_result(res); + return probe; +} + fn handle_registration_result(res: usize) !void { switch (linux.E.init(res)) { .SUCCESS => {}, @@ -1356,6 +1366,102 @@ pub fn socket_direct_alloc( return sqe; } +/// Queues (but does not submit) an SQE to perform an `bind(2)` on a socket. +/// Returns a pointer to the SQE. +/// Available since 6.11 +pub fn bind( + self: *IoUring, + user_data: u64, + fd: posix.fd_t, + addr: *const posix.sockaddr, + addrlen: posix.socklen_t, + flags: u32, +) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + sqe.prep_bind(fd, addr, addrlen, flags); + sqe.user_data = user_data; + return sqe; +} + +/// Queues (but does not submit) an SQE to perform an `listen(2)` on a socket. +/// Returns a pointer to the SQE. +/// Available since 6.11 +pub fn listen( + self: *IoUring, + user_data: u64, + fd: posix.fd_t, + backlog: usize, + flags: u32, +) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + sqe.prep_listen(fd, backlog, flags); + sqe.user_data = user_data; + return sqe; +} + +/// Prepares an cmd request for a socket. +/// See: https://man7.org/linux/man-pages/man3/io_uring_prep_cmd.3.html +/// Available since 6.7. +pub fn cmd_sock( + self: *IoUring, + user_data: u64, + cmd_op: linux.IO_URING_SOCKET_OP, + fd: linux.fd_t, + level: u32, // linux.SOL + optname: u32, // linux.SO + optval: u64, // pointer to the option value + optlen: u32, // size of the option value +) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + sqe.prep_cmd_sock(cmd_op, fd, level, optname, optval, optlen); + sqe.user_data = user_data; + return sqe; +} + +/// Prepares set socket option for the optname argument, at the protocol +/// level specified by the level argument. +/// Available since 6.7.n +pub fn setsockopt( + self: *IoUring, + user_data: u64, + fd: linux.fd_t, + level: u32, // linux.SOL + optname: u32, // linux.SO + opt: []const u8, +) !*linux.io_uring_sqe { + return try self.cmd_sock( + user_data, + .SETSOCKOPT, + fd, + level, + optname, + @intFromPtr(opt.ptr), + @intCast(opt.len), + ); +} + +/// Prepares get socket option to retrieve the value for the option specified by +/// the option_name argument for the socket specified by the fd argument. +/// Available since 6.7. +pub fn getsockopt( + self: *IoUring, + user_data: u64, + fd: linux.fd_t, + level: u32, // linux.SOL + optname: u32, // linux.SO + opt: []u8, +) !*linux.io_uring_sqe { + return try self.cmd_sock( + user_data, + .GETSOCKOPT, + fd, + level, + optname, + @intFromPtr(opt.ptr), + @intCast(opt.len), + ); +} + pub const SubmissionQueue = struct { head: *u32, tail: *u32, @@ -1488,28 +1594,34 @@ pub const BufferGroup = struct { buffers: []u8, /// Size of each buffer in buffers. buffer_size: u32, - // Number of buffers in `buffers`, number of `io_uring_buf structures` in br. + /// Number of buffers in `buffers`, number of `io_uring_buf structures` in br. buffers_count: u16, + /// Head of unconsumed part of each buffer, if incremental consumption is enabled + heads: []u32, /// ID of this group, must be unique in ring. group_id: u16, pub fn init( ring: *IoUring, + allocator: mem.Allocator, group_id: u16, - buffers: []u8, buffer_size: u32, buffers_count: u16, ) !BufferGroup { - assert(buffers.len == buffers_count * buffer_size); + const buffers = try allocator.alloc(u8, buffer_size * buffers_count); + errdefer allocator.free(buffers); + const heads = try allocator.alloc(u32, buffers_count); + errdefer allocator.free(heads); - const br = try setup_buf_ring(ring.fd, buffers_count, group_id); + const br = try setup_buf_ring(ring.fd, buffers_count, group_id, .{ .inc = true }); buf_ring_init(br); const mask = buf_ring_mask(buffers_count); var i: u16 = 0; while (i < buffers_count) : (i += 1) { - const start = buffer_size * i; - const buf = buffers[start .. start + buffer_size]; + const pos = buffer_size * i; + const buf = buffers[pos .. pos + buffer_size]; + heads[i] = 0; buf_ring_add(br, buf, i, mask, i); } buf_ring_advance(br, buffers_count); @@ -1519,11 +1631,18 @@ pub const BufferGroup = struct { .group_id = group_id, .br = br, .buffers = buffers, + .heads = heads, .buffer_size = buffer_size, .buffers_count = buffers_count, }; } + pub fn deinit(self: *BufferGroup, allocator: mem.Allocator) void { + free_buf_ring(self.ring.fd, self.br, self.buffers_count, self.group_id); + allocator.free(self.buffers); + allocator.free(self.heads); + } + // Prepare recv operation which will select buffer from this group. pub fn recv(self: *BufferGroup, user_data: u64, fd: posix.fd_t, flags: u32) !*linux.io_uring_sqe { var sqe = try self.ring.get_sqe(); @@ -1543,33 +1662,34 @@ pub const BufferGroup = struct { } // Get buffer by id. - pub fn get(self: *BufferGroup, buffer_id: u16) []u8 { - const head = self.buffer_size * buffer_id; - return self.buffers[head .. head + self.buffer_size]; + fn get_by_id(self: *BufferGroup, buffer_id: u16) []u8 { + const pos = self.buffer_size * buffer_id; + return self.buffers[pos .. pos + self.buffer_size][self.heads[buffer_id]..]; } // Get buffer by CQE. - pub fn get_cqe(self: *BufferGroup, cqe: linux.io_uring_cqe) ![]u8 { + pub fn get(self: *BufferGroup, cqe: linux.io_uring_cqe) ![]u8 { const buffer_id = try cqe.buffer_id(); const used_len = @as(usize, @intCast(cqe.res)); - return self.get(buffer_id)[0..used_len]; - } - - // Release buffer to the kernel. - pub fn put(self: *BufferGroup, buffer_id: u16) void { - const mask = buf_ring_mask(self.buffers_count); - const buffer = self.get(buffer_id); - buf_ring_add(self.br, buffer, buffer_id, mask, 0); - buf_ring_advance(self.br, 1); + return self.get_by_id(buffer_id)[0..used_len]; } // Release buffer from CQE to the kernel. - pub fn put_cqe(self: *BufferGroup, cqe: linux.io_uring_cqe) !void { - self.put(try cqe.buffer_id()); - } + pub fn put(self: *BufferGroup, cqe: linux.io_uring_cqe) !void { + const buffer_id = try cqe.buffer_id(); + if (cqe.flags & linux.IORING_CQE_F_BUF_MORE == linux.IORING_CQE_F_BUF_MORE) { + // Incremental consumption active, kernel will write to the this buffer again + const used_len = @as(u32, @intCast(cqe.res)); + // Track what part of the buffer is used + self.heads[buffer_id] += used_len; + return; + } + self.heads[buffer_id] = 0; - pub fn deinit(self: *BufferGroup) void { - free_buf_ring(self.ring.fd, self.br, self.buffers_count, self.group_id); + // Release buffer to the kernel. const mask = buf_ring_mask(self.buffers_count); + const mask = buf_ring_mask(self.buffers_count); + buf_ring_add(self.br, self.get_by_id(buffer_id), buffer_id, mask, 0); + buf_ring_advance(self.br, 1); } }; @@ -1578,7 +1698,12 @@ pub const BufferGroup = struct { /// `fd` is IO_Uring.fd for which the provided buffer ring is being registered. /// `entries` is the number of entries requested in the buffer ring, must be power of 2. /// `group_id` is the chosen buffer group ID, unique in IO_Uring. -pub fn setup_buf_ring(fd: posix.fd_t, entries: u16, group_id: u16) !*align(page_size_min) linux.io_uring_buf_ring { +pub fn setup_buf_ring( + fd: posix.fd_t, + entries: u16, + group_id: u16, + flags: linux.io_uring_buf_reg.Flags, +) !*align(page_size_min) linux.io_uring_buf_ring { if (entries == 0 or entries > 1 << 15) return error.EntriesNotInRange; if (!std.math.isPowerOfTwo(entries)) return error.EntriesNotPowerOfTwo; @@ -1595,22 +1720,30 @@ pub fn setup_buf_ring(fd: posix.fd_t, entries: u16, group_id: u16) !*align(page_ assert(mmap.len == mmap_size); const br: *align(page_size_min) linux.io_uring_buf_ring = @ptrCast(mmap.ptr); - try register_buf_ring(fd, @intFromPtr(br), entries, group_id); + try register_buf_ring(fd, @intFromPtr(br), entries, group_id, flags); return br; } -fn register_buf_ring(fd: posix.fd_t, addr: u64, entries: u32, group_id: u16) !void { +fn register_buf_ring( + fd: posix.fd_t, + addr: u64, + entries: u32, + group_id: u16, + flags: linux.io_uring_buf_reg.Flags, +) !void { var reg = mem.zeroInit(linux.io_uring_buf_reg, .{ .ring_addr = addr, .ring_entries = entries, .bgid = group_id, + .flags = flags, }); - const res = linux.io_uring_register( - fd, - .REGISTER_PBUF_RING, - @as(*const anyopaque, @ptrCast(®)), - 1, - ); + var res = linux.io_uring_register(fd, .REGISTER_PBUF_RING, @as(*const anyopaque, @ptrCast(®)), 1); + if (linux.E.init(res) == .INVAL and reg.flags.inc) { + // Retry without incremental buffer consumption. + // It is available since kernel 6.12. returns INVAL on older. + reg.flags.inc = false; + res = linux.io_uring_register(fd, .REGISTER_PBUF_RING, @as(*const anyopaque, @ptrCast(®)), 1); + } try handle_register_buf_ring_result(res); } @@ -3054,7 +3187,7 @@ test "provide_buffers: read" { const cqe = try ring.copy_cqe(); switch (cqe.err()) { // Happens when the kernel is < 5.7 - .INVAL => return error.SkipZigTest, + .INVAL, .BADF => return error.SkipZigTest, .SUCCESS => {}, else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), } @@ -3181,7 +3314,7 @@ test "remove_buffers" { const cqe = try ring.copy_cqe(); switch (cqe.err()) { - .INVAL => return error.SkipZigTest, + .INVAL, .BADF => return error.SkipZigTest, .SUCCESS => {}, else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), } @@ -3935,12 +4068,10 @@ test BufferGroup { const group_id: u16 = 1; // buffers group id const buffers_count: u16 = 1; // number of buffers in buffer group const buffer_size: usize = 128; // size of each buffer in group - const buffers = try testing.allocator.alloc(u8, buffers_count * buffer_size); - defer testing.allocator.free(buffers); var buf_grp = BufferGroup.init( &ring, + testing.allocator, group_id, - buffers, buffer_size, buffers_count, ) catch |err| switch (err) { @@ -3948,7 +4079,7 @@ test BufferGroup { error.ArgumentsInvalid => return error.SkipZigTest, else => return err, }; - defer buf_grp.deinit(); + defer buf_grp.deinit(testing.allocator); // Create client/server fds const fds = try createSocketTestHarness(&ring); @@ -3979,14 +4110,11 @@ test BufferGroup { try testing.expectEqual(posix.E.SUCCESS, cqe.err()); try testing.expectEqual(data.len, @as(usize, @intCast(cqe.res))); // cqe.res holds received data len - // Read buffer_id and used buffer len from cqe - const buffer_id = try cqe.buffer_id(); - const len: usize = @intCast(cqe.res); // Get buffer from pool - const buf = buf_grp.get(buffer_id)[0..len]; + const buf = try buf_grp.get(cqe); try testing.expectEqualSlices(u8, &data, buf); // Release buffer to the kernel when application is done with it - buf_grp.put(buffer_id); + try buf_grp.put(cqe); } } @@ -4004,12 +4132,10 @@ test "ring mapped buffers recv" { const group_id: u16 = 1; // buffers group id const buffers_count: u16 = 2; // number of buffers in buffer group const buffer_size: usize = 4; // size of each buffer in group - const buffers = try testing.allocator.alloc(u8, buffers_count * buffer_size); - defer testing.allocator.free(buffers); var buf_grp = BufferGroup.init( &ring, + testing.allocator, group_id, - buffers, buffer_size, buffers_count, ) catch |err| switch (err) { @@ -4017,7 +4143,7 @@ test "ring mapped buffers recv" { error.ArgumentsInvalid => return error.SkipZigTest, else => return err, }; - defer buf_grp.deinit(); + defer buf_grp.deinit(testing.allocator); // create client/server fds const fds = try createSocketTestHarness(&ring); @@ -4039,14 +4165,18 @@ test "ring mapped buffers recv" { if (cqe_send.err() == .INVAL) return error.SkipZigTest; try testing.expectEqual(linux.io_uring_cqe{ .user_data = user_data, .res = data.len, .flags = 0 }, cqe_send); } - - // server reads data into provided buffers - // there are 2 buffers of size 4, so each read gets only chunk of data - // we read four chunks of 4, 4, 4, 3 bytes each - var chunk: []const u8 = data[0..buffer_size]; // first chunk - const id1 = try expect_buf_grp_recv(&ring, &buf_grp, fds.server, rnd.int(u64), chunk); - chunk = data[buffer_size .. buffer_size * 2]; // second chunk - const id2 = try expect_buf_grp_recv(&ring, &buf_grp, fds.server, rnd.int(u64), chunk); + var pos: usize = 0; + + // read first chunk + const cqe1 = try buf_grp_recv_submit_get_cqe(&ring, &buf_grp, fds.server, rnd.int(u64)); + var buf = try buf_grp.get(cqe1); + try testing.expectEqualSlices(u8, data[pos..][0..buf.len], buf); + pos += buf.len; + // second chunk + const cqe2 = try buf_grp_recv_submit_get_cqe(&ring, &buf_grp, fds.server, rnd.int(u64)); + buf = try buf_grp.get(cqe2); + try testing.expectEqualSlices(u8, data[pos..][0..buf.len], buf); + pos += buf.len; // both buffers provided to the kernel are used so we get error // 'no more buffers', until we put buffers to the kernel @@ -4063,16 +4193,17 @@ test "ring mapped buffers recv" { } // put buffers back to the kernel - buf_grp.put(id1); - buf_grp.put(id2); - - chunk = data[buffer_size * 2 .. buffer_size * 3]; // third chunk - const id3 = try expect_buf_grp_recv(&ring, &buf_grp, fds.server, rnd.int(u64), chunk); - buf_grp.put(id3); - - chunk = data[buffer_size * 3 ..]; // last chunk - const id4 = try expect_buf_grp_recv(&ring, &buf_grp, fds.server, rnd.int(u64), chunk); - buf_grp.put(id4); + try buf_grp.put(cqe1); + try buf_grp.put(cqe2); + + // read remaining data + while (pos < data.len) { + const cqe = try buf_grp_recv_submit_get_cqe(&ring, &buf_grp, fds.server, rnd.int(u64)); + buf = try buf_grp.get(cqe); + try testing.expectEqualSlices(u8, data[pos..][0..buf.len], buf); + pos += buf.len; + try buf_grp.put(cqe); + } } } @@ -4090,12 +4221,10 @@ test "ring mapped buffers multishot recv" { const group_id: u16 = 1; // buffers group id const buffers_count: u16 = 2; // number of buffers in buffer group const buffer_size: usize = 4; // size of each buffer in group - const buffers = try testing.allocator.alloc(u8, buffers_count * buffer_size); - defer testing.allocator.free(buffers); var buf_grp = BufferGroup.init( &ring, + testing.allocator, group_id, - buffers, buffer_size, buffers_count, ) catch |err| switch (err) { @@ -4103,7 +4232,7 @@ test "ring mapped buffers multishot recv" { error.ArgumentsInvalid => return error.SkipZigTest, else => return err, }; - defer buf_grp.deinit(); + defer buf_grp.deinit(testing.allocator); // create client/server fds const fds = try createSocketTestHarness(&ring); @@ -4116,7 +4245,7 @@ test "ring mapped buffers multishot recv" { var round: usize = 4; // repeat send/recv cycle round times while (round > 0) : (round -= 1) { // client sends data - const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe }; + const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf }; { const user_data = rnd.int(u64); _ = try ring.send(user_data, fds.client, data[0..], 0); @@ -4133,7 +4262,7 @@ test "ring mapped buffers multishot recv" { // server reads data into provided buffers // there are 2 buffers of size 4, so each read gets only chunk of data - // we read four chunks of 4, 4, 4, 3 bytes each + // we read four chunks of 4, 4, 4, 4 bytes each var chunk: []const u8 = data[0..buffer_size]; // first chunk const cqe1 = try expect_buf_grp_cqe(&ring, &buf_grp, recv_user_data, chunk); try testing.expect(cqe1.flags & linux.IORING_CQE_F_MORE > 0); @@ -4157,8 +4286,8 @@ test "ring mapped buffers multishot recv" { } // put buffers back to the kernel - buf_grp.put(try cqe1.buffer_id()); - buf_grp.put(try cqe2.buffer_id()); + try buf_grp.put(cqe1); + try buf_grp.put(cqe2); // restart multishot recv_user_data = rnd.int(u64); @@ -4168,12 +4297,12 @@ test "ring mapped buffers multishot recv" { chunk = data[buffer_size * 2 .. buffer_size * 3]; // third chunk const cqe3 = try expect_buf_grp_cqe(&ring, &buf_grp, recv_user_data, chunk); try testing.expect(cqe3.flags & linux.IORING_CQE_F_MORE > 0); - buf_grp.put(try cqe3.buffer_id()); + try buf_grp.put(cqe3); chunk = data[buffer_size * 3 ..]; // last chunk const cqe4 = try expect_buf_grp_cqe(&ring, &buf_grp, recv_user_data, chunk); try testing.expect(cqe4.flags & linux.IORING_CQE_F_MORE > 0); - buf_grp.put(try cqe4.buffer_id()); + try buf_grp.put(cqe4); // cancel pending multishot recv operation { @@ -4217,23 +4346,26 @@ test "ring mapped buffers multishot recv" { } } -// Prepare and submit recv using buffer group. -// Test that buffer from group, pointed by cqe, matches expected. -fn expect_buf_grp_recv( +// Prepare, submit recv and get cqe using buffer group. +fn buf_grp_recv_submit_get_cqe( ring: *IoUring, buf_grp: *BufferGroup, fd: posix.fd_t, user_data: u64, - expected: []const u8, -) !u16 { - // prepare and submit read +) !linux.io_uring_cqe { + // prepare and submit recv const sqe = try buf_grp.recv(user_data, fd, 0); try testing.expect(sqe.flags & linux.IOSQE_BUFFER_SELECT == linux.IOSQE_BUFFER_SELECT); try testing.expect(sqe.buf_index == buf_grp.group_id); try testing.expectEqual(@as(u32, 1), try ring.submit()); // submit + // get cqe, expect success + const cqe = try ring.copy_cqe(); + try testing.expectEqual(user_data, cqe.user_data); + try testing.expect(cqe.res >= 0); // success + try testing.expectEqual(posix.E.SUCCESS, cqe.err()); + try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER); // IORING_CQE_F_BUFFER flag is set - const cqe = try expect_buf_grp_cqe(ring, buf_grp, user_data, expected); - return try cqe.buffer_id(); + return cqe; } fn expect_buf_grp_cqe( @@ -4253,7 +4385,7 @@ fn expect_buf_grp_cqe( // get buffer from pool const buffer_id = try cqe.buffer_id(); const len = @as(usize, @intCast(cqe.res)); - const buf = buf_grp.get(buffer_id)[0..len]; + const buf = buf_grp.get_by_id(buffer_id)[0..len]; try testing.expectEqualSlices(u8, expected, buf); return cqe; @@ -4305,3 +4437,137 @@ test "copy_cqes with wrapping sq.cqes buffer" { try testing.expectEqual(2 + 4 * i, ring.cq.head.*); } } + +test "bind/listen/connect" { + var ring = IoUring.init(4, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; + defer ring.deinit(); + + const probe = ring.get_probe() catch return error.SkipZigTest; + // LISTEN is higher required operation + if (!probe.is_supported(.LISTEN)) return error.SkipZigTest; + + var addr = net.Address.initIp4([4]u8{ 127, 0, 0, 1 }, 0); + const proto: u32 = if (addr.any.family == linux.AF.UNIX) 0 else linux.IPPROTO.TCP; + + const listen_fd = brk: { + // Create socket + _ = try ring.socket(1, addr.any.family, linux.SOCK.STREAM | linux.SOCK.CLOEXEC, proto, 0); + try testing.expectEqual(1, try ring.submit()); + var cqe = try ring.copy_cqe(); + try testing.expectEqual(1, cqe.user_data); + try testing.expectEqual(posix.E.SUCCESS, cqe.err()); + const listen_fd: posix.fd_t = @intCast(cqe.res); + try testing.expect(listen_fd > 2); + + // Prepare: set socket option * 2, bind, listen + var optval: u32 = 1; + (try ring.setsockopt(2, listen_fd, linux.SOL.SOCKET, linux.SO.REUSEADDR, mem.asBytes(&optval))).link_next(); + (try ring.setsockopt(3, listen_fd, linux.SOL.SOCKET, linux.SO.REUSEPORT, mem.asBytes(&optval))).link_next(); + (try ring.bind(4, listen_fd, &addr.any, addr.getOsSockLen(), 0)).link_next(); + _ = try ring.listen(5, listen_fd, 1, 0); + // Submit 4 operations + try testing.expectEqual(4, try ring.submit()); + // Expect all to succeed + for (2..6) |user_data| { + cqe = try ring.copy_cqe(); + try testing.expectEqual(user_data, cqe.user_data); + try testing.expectEqual(posix.E.SUCCESS, cqe.err()); + } + + // Check that socket option is set + optval = 0; + _ = try ring.getsockopt(5, listen_fd, linux.SOL.SOCKET, linux.SO.REUSEADDR, mem.asBytes(&optval)); + try testing.expectEqual(1, try ring.submit()); + cqe = try ring.copy_cqe(); + try testing.expectEqual(5, cqe.user_data); + try testing.expectEqual(posix.E.SUCCESS, cqe.err()); + try testing.expectEqual(1, optval); + + // Read system assigned port into addr + var addr_len: posix.socklen_t = addr.getOsSockLen(); + try posix.getsockname(listen_fd, &addr.any, &addr_len); + + break :brk listen_fd; + }; + + const connect_fd = brk: { + // Create connect socket + _ = try ring.socket(6, addr.any.family, linux.SOCK.STREAM | linux.SOCK.CLOEXEC, proto, 0); + try testing.expectEqual(1, try ring.submit()); + const cqe = try ring.copy_cqe(); + try testing.expectEqual(6, cqe.user_data); + try testing.expectEqual(posix.E.SUCCESS, cqe.err()); + // Get connect socket fd + const connect_fd: posix.fd_t = @intCast(cqe.res); + try testing.expect(connect_fd > 2 and connect_fd != listen_fd); + break :brk connect_fd; + }; + + // Prepare accept/connect operations + _ = try ring.accept(7, listen_fd, null, null, 0); + _ = try ring.connect(8, connect_fd, &addr.any, addr.getOsSockLen()); + try testing.expectEqual(2, try ring.submit()); + // Get listener accepted socket + var accept_fd: posix.socket_t = 0; + for (0..2) |_| { + const cqe = try ring.copy_cqe(); + try testing.expectEqual(posix.E.SUCCESS, cqe.err()); + if (cqe.user_data == 7) { + accept_fd = @intCast(cqe.res); + } else { + try testing.expectEqual(8, cqe.user_data); + } + } + try testing.expect(accept_fd > 2 and accept_fd != listen_fd and accept_fd != connect_fd); + + // Communicate + try testSendRecv(&ring, connect_fd, accept_fd); + try testSendRecv(&ring, accept_fd, connect_fd); + + // Shutdown and close all sockets + for ([_]posix.socket_t{ connect_fd, accept_fd, listen_fd }) |fd| { + (try ring.shutdown(9, fd, posix.SHUT.RDWR)).link_next(); + _ = try ring.close(10, fd); + try testing.expectEqual(2, try ring.submit()); + for (0..2) |i| { + const cqe = try ring.copy_cqe(); + try testing.expectEqual(posix.E.SUCCESS, cqe.err()); + try testing.expectEqual(9 + i, cqe.user_data); + } + } +} + +fn testSendRecv(ring: *IoUring, send_fd: posix.socket_t, recv_fd: posix.socket_t) !void { + const buffer_send = "0123456789abcdf" ** 10; + var buffer_recv: [buffer_send.len * 2]u8 = undefined; + + // 2 sends + _ = try ring.send(1, send_fd, buffer_send, linux.MSG.WAITALL); + _ = try ring.send(2, send_fd, buffer_send, linux.MSG.WAITALL); + try testing.expectEqual(2, try ring.submit()); + for (0..2) |i| { + const cqe = try ring.copy_cqe(); + try testing.expectEqual(1 + i, cqe.user_data); + try testing.expectEqual(posix.E.SUCCESS, cqe.err()); + try testing.expectEqual(buffer_send.len, @as(usize, @intCast(cqe.res))); + } + + // receive + var recv_len: usize = 0; + while (recv_len < buffer_send.len * 2) { + _ = try ring.recv(3, recv_fd, .{ .buffer = buffer_recv[recv_len..] }, 0); + try testing.expectEqual(1, try ring.submit()); + const cqe = try ring.copy_cqe(); + try testing.expectEqual(3, cqe.user_data); + try testing.expectEqual(posix.E.SUCCESS, cqe.err()); + recv_len += @intCast(cqe.res); + } + + // inspect recv buffer + try testing.expectEqualSlices(u8, buffer_send, buffer_recv[0..buffer_send.len]); + try testing.expectEqualSlices(u8, buffer_send, buffer_recv[buffer_send.len..]); +} diff --git a/lib/std/os/linux/io_uring_sqe.zig b/lib/std/os/linux/io_uring_sqe.zig index 19dae3e8fae9..5658206a66a8 100644 --- a/lib/std/os/linux/io_uring_sqe.zig +++ b/lib/std/os/linux/io_uring_sqe.zig @@ -619,4 +619,61 @@ pub const io_uring_sqe = extern struct { sqe.rw_flags = flags; sqe.splice_fd_in = @bitCast(options); } + + pub fn prep_bind( + sqe: *linux.io_uring_sqe, + fd: linux.fd_t, + addr: *const linux.sockaddr, + addrlen: linux.socklen_t, + flags: u32, + ) void { + sqe.prep_rw(.BIND, fd, @intFromPtr(addr), 0, addrlen); + sqe.rw_flags = flags; + } + + pub fn prep_listen( + sqe: *linux.io_uring_sqe, + fd: linux.fd_t, + backlog: usize, + flags: u32, + ) void { + sqe.prep_rw(.LISTEN, fd, 0, backlog, 0); + sqe.rw_flags = flags; + } + + pub fn prep_cmd_sock( + sqe: *linux.io_uring_sqe, + cmd_op: linux.IO_URING_SOCKET_OP, + fd: linux.fd_t, + level: u32, + optname: u32, + optval: u64, + optlen: u32, + ) void { + sqe.prep_rw(.URING_CMD, fd, 0, 0, 0); + // off is overloaded with cmd_op, https://github.com/axboe/liburing/blob/e1003e496e66f9b0ae06674869795edf772d5500/src/include/liburing/io_uring.h#L39 + sqe.off = @intFromEnum(cmd_op); + // addr is overloaded, https://github.com/axboe/liburing/blob/e1003e496e66f9b0ae06674869795edf772d5500/src/include/liburing/io_uring.h#L46 + sqe.addr = @bitCast(packed struct { + level: u32, + optname: u32, + }{ + .level = level, + .optname = optname, + }); + // splice_fd_in if overloaded u32 -> i32 + sqe.splice_fd_in = @bitCast(optlen); + // addr3 is overloaded, https://github.com/axboe/liburing/blob/e1003e496e66f9b0ae06674869795edf772d5500/src/include/liburing/io_uring.h#L102 + sqe.addr3 = optval; + } + + pub fn set_flags(sqe: *linux.io_uring_sqe, flags: u8) void { + sqe.flags |= flags; + } + + /// This SQE forms a link with the next SQE in the submission ring. Next SQE + /// will not be started before this one completes. Forms a chain of SQEs. + pub fn link_next(sqe: *linux.io_uring_sqe) void { + sqe.flags |= linux.IOSQE_IO_LINK; + } };