Skip to content

Commit

Permalink
Merge pull request #58 from rakivo/dev
Browse files Browse the repository at this point in the history
Major "waiting model" PR
  • Loading branch information
rakivo authored Feb 23, 2025
2 parents d73ea90 + 132124c commit 75c3a92
Show file tree
Hide file tree
Showing 13 changed files with 451 additions and 691 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ tramer = { version = "0.1.0", optional = true }
nix = { version = "0.29.0", features = ["poll"] }
dashmap = { version = "6.1.0", features = ["inline"] }
fxhash = { git = "https://github.com/rakivo/fxhash", rev = "2b61551" }
rayon = "1.10.0"
129 changes: 52 additions & 77 deletions src/command.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,25 @@
use crate::flags::Flags;
use crate::graph::Graph;
use crate::parser::comp::Job;
use crate::parser::comp::Edge;
use crate::types::StrDashMap;
use crate::consts::CLEAN_TARGET;
use crate::poll::{Poller, Subprocess};

use std::io;
use std::ptr;
use std::mem;
use std::fs::File;
use std::sync::Arc;
use std::path::Path;
use std::borrow::Cow;
use std::ffi::CString;
use std::{io, ptr, mem};
use std::time::SystemTime;
use std::os::fd::{AsFd, AsRawFd};
use std::sync::atomic::{Ordering};
use std::os::fd::{IntoRawFd, FromRawFd};

use dashmap::DashMap;
use fxhash::FxBuildHasher;
use crossbeam_channel::Sender;
use nix::poll::{PollFd, PollFlags};

#[cfg_attr(feature = "dbg", derive(Debug))]
pub struct Command<'a> {
pub target: &'a str,
pub command: &'a str,
pub description: Option::<&'a str>
pub target: Cow::<'a, str>,
pub command: Cow::<'a, str>,
pub description: Option::<Cow::<'a, str>>
}

// Custom implementation of `Command` to avoid fork + exec overhead
Expand Down Expand Up @@ -64,24 +57,14 @@ impl<'a> Command<'a> {
Ok((r, w))
}

pub fn execute(
&self,
poller: &Poller,
poll_fds_sender: Sender::<PollFd>,
) -> io::Result::<()> {
let cmd = CString::new(self.command.as_bytes())?;
let args = [
c"/bin/sh".as_ptr(),
c"-c".as_ptr(),
cmd.as_ptr(),
ptr::null(),
];

let (stdout_reader, stdout_writer) = Self::create_pipe()?;
let (stderr_reader, stderr_writer) = Self::create_pipe()?;

let stdout_reader_fd = stdout_reader.as_raw_fd();
let stderr_reader_fd = stderr_reader.as_raw_fd();
pub fn execute(&self) -> io::Result::<Output> {
let ref command = self.command;

let (mut stdout_reader, stdout_writer) = Self::create_pipe()?;
let (mut stderr_reader, stderr_writer) = Self::create_pipe()?;

let cmd = CString::new(command.as_bytes())?;
let args = [c"/bin/sh".as_ptr(), c"-c".as_ptr(), cmd.as_ptr(), ptr::null()];

let stdout_writer_fd = stdout_writer.into_raw_fd();
let stderr_writer_fd = stderr_writer.into_raw_fd();
Expand All @@ -99,6 +82,7 @@ impl<'a> Command<'a> {
}

let env = [c"PATH=/usr/bin:/bin".as_ptr(), ptr::null()];

let mut pid = 0;
let ret = unsafe {
libc::posix_spawn(
Expand All @@ -107,7 +91,7 @@ impl<'a> Command<'a> {
&file_actions,
&attr,
args.as_ptr() as *const *mut _,
env.as_ptr() as *const *mut _,
env.as_ptr() as *const *mut _
)
};

Expand All @@ -120,55 +104,46 @@ impl<'a> Command<'a> {
libc::close(stderr_writer_fd);
}

{
let target = Box::from(self.target);
let command = Box::from(self.command);
let description = self.description.map(Box::from);
let subprocess = Arc::new(Subprocess {pid, target, command, description});

poller.fd_to_subprocess.insert(stdout_reader_fd, Arc::clone(&subprocess));
poller.fd_to_subprocess.insert(stderr_reader_fd, subprocess);
}
let stdout = io::read_to_string(&mut stdout_reader)?;
let stderr = io::read_to_string(&mut stderr_reader)?;

poller.active_fds.fetch_add(1, Ordering::Relaxed);
let mut status = 0;
unsafe {
libc::waitpid(pid, &mut status, 0);
}

// TODO: dont leak here
let stdout_poll_fd = {
let stdout_reader_fd: &'static _ = Box::leak(Box::new(stdout_reader));
PollFd::new(stdout_reader_fd.as_fd(), PollFlags::POLLIN)
};
unsafe {
libc::posix_spawn_file_actions_destroy(&mut file_actions);
libc::posix_spawnattr_destroy(&mut attr);
}

let stderr_poll_fd = {
let stderr_reader_fd: &'static _ = Box::leak(Box::new(stderr_reader));
PollFd::new(stderr_reader_fd.as_fd(), PollFlags::POLLIN)
};
Ok(Output {status, stdout, stderr})
}
}

#[cfg(feature = "dbg_hardcore")] {
{
let stdout_fd = stdout_poll_fd.as_fd().as_raw_fd();
let mut stdout = format!("sending: FD: {stdout_fd:?} ");
if let Some(revents) = stdout_poll_fd.revents() {
let revents = format!("revents: {revents:?}");
stdout.push_str(&revents)
}
println!("{stdout}");
}
pub struct Output {
pub status: i32,
pub stdout: String,
pub stderr: String,
}

{
let stderr_fd = stderr_poll_fd.as_fd().as_raw_fd();
let mut stderr = format!("sending: FD: {stderr_fd:?} ");
if let Some(revents) = stdout_poll_fd.revents() {
let revents = format!("revents: {revents:?}");
stderr.push_str(&revents)
}
println!("{stderr}");
}
impl Output {
#[inline]
pub fn to_string(&self, flags: &Flags) -> String {
if flags.quiet() {
return const { String::new() }
}

_ = poll_fds_sender.send(stdout_poll_fd);
_ = poll_fds_sender.send(stderr_poll_fd);

Ok(())
let Output { stdout, stderr, .. } = self;
let n = stdout.len() + stderr.len();
let mut buf = String::with_capacity(n);
if !stdout.is_empty() {
buf.push_str(&stdout);
}
if !stderr.is_empty() {
buf.push_str(&stderr);
}
buf
}
}

Expand Down Expand Up @@ -198,15 +173,15 @@ impl<'a> MetadataCache<'a> {
}

#[inline]
pub fn needs_rebuild(&self, job: &Job<'a>, transitive_deps: &Graph<'a>) -> bool {
pub fn needs_rebuild(&self, edge: &Edge<'a>, transitive_deps: &Graph<'a>) -> bool {
// TODO: do something here if dependent file does not exist
let mtimes = unsafe {
transitive_deps.get(job.target).unwrap_unchecked()
transitive_deps.get(edge.target).unwrap_unchecked()
}.iter().filter_map(|dep| {
self.mtime(*dep).ok()
}).collect::<Vec::<_>>();

let Ok(target_mtime) = self.mtime(job.target) else {
let Ok(target_mtime) = self.mtime(edge.target) else {
return true
};

Expand Down
Loading

0 comments on commit 75c3a92

Please sign in to comment.