Skip to content

Commit

Permalink
📝 refine kqueue select code
Browse files Browse the repository at this point in the history
  • Loading branch information
Xudong-Huang committed Dec 23, 2024
1 parent f509d42 commit df3a37e
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 100 deletions.
185 changes: 85 additions & 100 deletions src/io/sys/unix/kqueue.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::os::unix::io::RawFd;
use std::os::fd::{AsRawFd, FromRawFd, RawFd};
use std::os::unix::io::OwnedFd;
use std::sync::atomic::Ordering;
use std::sync::Arc;
#[cfg(feature = "io_timeout")]
Expand Down Expand Up @@ -34,32 +35,35 @@ macro_rules! kevent {
}

struct SingleSelector {
kqfd: RawFd,
kqfd: OwnedFd,
#[cfg(feature = "io_timeout")]
timer_list: TimerList,
free_ev: Queue<Arc<EventData>>,
}

impl AsRawFd for SingleSelector {
fn as_raw_fd(&self) -> RawFd {
self.kqfd.as_raw_fd()
}
}

impl SingleSelector {
pub fn new() -> io::Result<Self> {
let kqfd = unsafe { libc::kqueue() };
if kqfd < 0 {
return Err(io::Error::last_os_error());
}
let kqfd = unsafe { OwnedFd::from_raw_fd(libc::kqueue()) };
syscall!(fcntl(kqfd.as_raw_fd(), libc::F_SETFD, libc::FD_CLOEXEC))?;

let kev = libc::kevent {
let mut kev = libc::kevent {
ident: NOTIFY_IDENT,
filter: libc::EVFILT_USER,
flags: libc::EV_ADD | libc::EV_CLEAR,
flags: libc::EV_ADD | libc::EV_CLEAR | libc::EV_RECEIPT,
fflags: 0,
data: 0,
udata: ptr::null_mut(),
};

let ret = unsafe { libc::kevent(kqfd, &kev, 1, ptr::null_mut(), 0, ptr::null()) };
if ret < 0 {
unsafe { libc::close(kqfd) };
return Err(io::Error::last_os_error());
syscall!(kevent(kqfd.as_raw_fd(), &kev, 1, &mut kev, 1, ptr::null()))?;
if kev.flags & libc::EV_ERROR != 0 && kev.data != 0 {
return Err(io::Error::from_raw_os_error(kev.data as i32));
}

Ok(SingleSelector {
Expand All @@ -71,12 +75,6 @@ impl SingleSelector {
}
}

impl Drop for SingleSelector {
fn drop(&mut self) {
let _ = unsafe { libc::close(self.kqfd) };
}
}

pub struct Selector {
// 128 should be fine for max io threads
vec: SmallVec<[SingleSelector; 128]>,
Expand Down Expand Up @@ -117,39 +115,28 @@ impl Selector {
let timeout = timeout_spec
.as_ref()
.map(|s| s as *const _)
.unwrap_or(ptr::null_mut());
.unwrap_or(ptr::null());
#[cfg(not(feature = "io_timeout"))]
let timeout = ptr::null_mut();
// info!("select; timeout={:?}", timeout_ms);
let timeout = ptr::null();
// debug!("select; timeout={:?}", timeout_spec);

let single_selector = unsafe { self.vec.get_unchecked(id) };

// Wait for kqueue events for at most timeout_ms milliseconds
let kqfd = single_selector.kqfd;
let n = unsafe {
libc::kevent(
kqfd,
ptr::null(),
0,
events.as_mut_ptr(),
events.len() as libc::c_int,
timeout,
)
};

if n < 0 {
return Err(io::Error::last_os_error());
}
let kqfd = single_selector.as_raw_fd();
let n = syscall!(kevent(
kqfd,
ptr::null(),
0,
events.as_mut_ptr(),
events.len() as libc::c_int,
timeout,
))?;

let n = n as usize;

for event in unsafe { events.get_unchecked(..n) } {
if event.udata.is_null() {
// this is just a wakeup event, ignore it
// let mut buf = [0u8; 8];
// clear the eventfd, ignore the result
// read(self.vec[id].evfd, &mut buf).ok();
// info!("got wakeup event in select, id={}", id);
if event.ident == NOTIFY_IDENT && event.filter == libc::EVFILT_USER {
scheduler.collect_global(id);
continue;
}
Expand Down Expand Up @@ -200,82 +187,80 @@ impl Selector {
// this will post an os event so that we can wakeup the event loop
#[inline]
pub fn wakeup(&self, id: usize) {
let kqfd = unsafe { self.vec.get_unchecked(id) }.kqfd;
let kev = libc::kevent {
let selector = unsafe { self.vec.get_unchecked(id) };
let kqfd = selector.as_raw_fd();
let mut kev = libc::kevent {
ident: NOTIFY_IDENT,
filter: libc::EVFILT_USER,
flags: 0,
flags: libc::EV_ADD | libc::EV_RECEIPT,
fflags: libc::NOTE_TRIGGER,
data: 0,
udata: ptr::null_mut(),
};

let ret = unsafe { libc::kevent(kqfd, &kev, 1, ptr::null_mut(), 0, ptr::null()) };
syscall!(kevent(kqfd, &kev, 1, &mut kev, 1, ptr::null())).unwrap();
assert!(kev.flags & libc::EV_ERROR == 0 || kev.data == 0);

trace!("wakeup id={:?}, ret={:?}", id, ret);
trace!("wakeup id={:?}", id);
}

// register io event to the selector
#[inline]
pub fn add_fd(&self, io_data: IoData) -> io::Result<IoData> {
let fd = io_data.fd;
let id = fd as usize % self.vec.len();
let kqfd = unsafe { self.vec.get_unchecked(id) }.kqfd;
info!("add fd to kqueue select, fd={:?}", fd);
let kqfd = unsafe { self.vec.get_unchecked(id) }.as_raw_fd();

let flags = libc::EV_ADD | libc::EV_CLEAR;
let flags = libc::EV_ADD | libc::EV_CLEAR | libc::EV_RECEIPT;
let udata = io_data.as_ref() as *const _;
let changes = [
let mut changes = [
kevent!(fd, libc::EVFILT_READ, flags, udata),
kevent!(fd, libc::EVFILT_WRITE, flags, udata),
];

let n = unsafe {
libc::kevent(
kqfd,
changes.as_ptr(),
changes.len() as libc::c_int,
ptr::null_mut(),
0,
ptr::null(),
)
};
if n < 0 {
return Err(io::Error::last_os_error());
}

syscall!(kevent(
kqfd,
changes.as_ptr(),
changes.len() as libc::c_int,
changes.as_mut_ptr(),
changes.len() as libc::c_int,
ptr::null(),
))?;

debug!("add fd to kqueue select, fd={:?}", fd);
Ok(io_data)
}

#[inline]
pub fn mod_fd(&self, io_data: &IoData, is_read: bool) -> io::Result<()> {
let fd = io_data.fd;
let id = fd as usize % self.vec.len();
let kqfd = unsafe { self.vec.get_unchecked(id) }.kqfd;
info!("add fd to kqueue select, fd={:?}", fd);
let kqfd = unsafe { self.vec.get_unchecked(id) }.as_raw_fd();

let flags = libc::EV_DELETE;
let flags = libc::EV_CLEAR | libc::EV_RECEIPT;
let udata = io_data.as_ref() as *const _;
let changes = if is_read {
[kevent!(fd, libc::EVFILT_WRITE, flags, udata)]
let mut changes = if is_read {
[
kevent!(fd, libc::EVFILT_WRITE, flags | libc::EV_DELETE, udata),
kevent!(fd, libc::EVFILT_READ, flags | libc::EV_ADD, udata),
]
} else {
[kevent!(fd, libc::EVFILT_READ, flags, udata)]
};

let n = unsafe {
libc::kevent(
kqfd,
changes.as_ptr(),
changes.len() as libc::c_int,
ptr::null_mut(),
0,
ptr::null(),
)
[
kevent!(fd, libc::EVFILT_WRITE, flags | libc::EV_ADD, udata),
kevent!(fd, libc::EVFILT_READ, flags | libc::EV_DELETE, udata),
]
};
if n < 0 {
return Err(io::Error::last_os_error());
}

syscall!(kevent(
kqfd,
changes.as_ptr(),
changes.len() as libc::c_int,
changes.as_mut_ptr(),
changes.len() as libc::c_int,
ptr::null(),
))?;

debug!("modify fd to kqueue select, fd={:?}", fd);
Ok(())
}

Expand All @@ -295,26 +280,26 @@ impl Selector {
let fd = io_data.fd;
let id = fd as usize % self.vec.len();
let single_selector = unsafe { self.vec.get_unchecked(id) };
let kqfd = single_selector.kqfd;
info!("del fd from kqueue select, fd={:?}", fd);
let kqfd = single_selector.as_raw_fd();

let filter = libc::EV_DELETE;
let changes = [
let filter = libc::EV_DELETE | libc::EV_RECEIPT;
let mut changes = [
kevent!(fd, libc::EVFILT_READ, filter, ptr::null_mut()),
kevent!(fd, libc::EVFILT_WRITE, filter, ptr::null_mut()),
];
// ignore the error
unsafe {
libc::kevent(
kqfd,
changes.as_ptr(),
changes.len() as libc::c_int,
ptr::null_mut(),
0,
ptr::null(),
);
}

// ignore the error, the fd may already closed
syscall!(kevent(
kqfd,
changes.as_ptr(),
changes.len() as libc::c_int,
changes.as_mut_ptr(),
changes.len() as libc::c_int,
ptr::null(),
))
.ok();

debug!("del fd from kqueue select, fd={:?}", fd);
// after EpollCtlDel push the unused event data
single_selector.free_ev.push((*io_data).clone());
}
Expand Down
13 changes: 13 additions & 0 deletions src/io/sys/unix/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
#[allow(unused_macros)]
macro_rules! syscall {
($fn: ident ( $($arg: expr),* $(,)* ) ) => {{
#[allow(unused_unsafe)]
let res = unsafe { libc::$fn($($arg, )*) };
if res < 0 {
Err(std::io::Error::last_os_error())
} else {
Ok(res)
}
}};
}

#[cfg(any(target_os = "linux", target_os = "android"))]
#[path = "epoll.rs"]
mod select;
Expand Down

0 comments on commit df3a37e

Please sign in to comment.