use libc::c_int; use crate::FromEnvErrorInner; use std::fs::{File, OpenOptions}; use std::io::{self, Read, Write}; use std::mem; use std::mem::MaybeUninit; use std::os::unix::prelude::*; use std::path::Path; use std::process::Command; use std::ptr; use std::sync::{ atomic::{AtomicBool, Ordering}, Arc, Once, }; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; #[derive(Debug)] /// This preserves the `--jobserver-auth` type at creation time, /// so auth type will be passed down to and inherit from sub-Make processes correctly. /// /// See for details. enum ClientCreationArg { Fds { read: c_int, write: c_int }, Fifo(Box), } #[derive(Debug)] pub struct Client { read: File, write: File, creation_arg: ClientCreationArg, /// It is set to `None` if the pipe is shared with other processes, so it /// cannot support non-blocking mode. /// /// If it is set to `Some`, then it can only go from /// `Some(false)` -> `Some(true)` but not the other way around, /// since that could cause a race condition. is_non_blocking: Option, } #[derive(Debug)] pub struct Acquired { byte: u8, } impl Client { pub fn new(mut limit: usize) -> io::Result { let client = unsafe { Client::mk()? }; // I don't think the character written here matters, but I could be // wrong! const BUFFER: [u8; 128] = [b'|'; 128]; let mut write = &client.write; set_nonblocking(write.as_raw_fd(), true)?; while limit > 0 { let n = limit.min(BUFFER.len()); write.write_all(&BUFFER[..n])?; limit -= n; } set_nonblocking(write.as_raw_fd(), false)?; Ok(client) } unsafe fn mk() -> io::Result { let mut pipes = [0; 2]; // Attempt atomically-create-with-cloexec if we can on Linux, // detected by using the `syscall` function in `libc` to try to work // with as many kernels/glibc implementations as possible. #[cfg(target_os = "linux")] { static PIPE2_AVAILABLE: AtomicBool = AtomicBool::new(true); if PIPE2_AVAILABLE.load(Ordering::SeqCst) { match libc::syscall(libc::SYS_pipe2, pipes.as_mut_ptr(), libc::O_CLOEXEC) { -1 => { let err = io::Error::last_os_error(); if err.raw_os_error() == Some(libc::ENOSYS) { PIPE2_AVAILABLE.store(false, Ordering::SeqCst); } else { return Err(err); } } _ => return Ok(Client::from_fds(pipes[0], pipes[1])), } } } cvt(libc::pipe(pipes.as_mut_ptr()))?; drop(set_cloexec(pipes[0], true)); drop(set_cloexec(pipes[1], true)); Ok(Client::from_fds(pipes[0], pipes[1])) } pub(crate) unsafe fn open(s: &str, check_pipe: bool) -> Result { if let Some(client) = Self::from_fifo(s)? { return Ok(client); } if let Some(client) = Self::from_pipe(s, check_pipe)? { return Ok(client); } Err(FromEnvErrorInner::CannotParse(format!( "expected `fifo:PATH` or `R,W`, found `{s}`" ))) } /// `--jobserver-auth=fifo:PATH` fn from_fifo(s: &str) -> Result, FromEnvErrorInner> { let mut parts = s.splitn(2, ':'); if parts.next().unwrap() != "fifo" { return Ok(None); } let path_str = parts.next().ok_or_else(|| { FromEnvErrorInner::CannotParse("expected a path after `fifo:`".to_string()) })?; let path = Path::new(path_str); let open_file = || { // Opening with read write is necessary, since opening with // read-only or write-only could block the thread until another // thread opens it with write-only or read-only (or RDWR) // correspondingly. OpenOptions::new() .read(true) .write(true) .open(path) .map_err(|err| FromEnvErrorInner::CannotOpenPath(path_str.to_string(), err)) }; Ok(Some(Client { read: open_file()?, write: open_file()?, creation_arg: ClientCreationArg::Fifo(path.into()), is_non_blocking: Some(AtomicBool::new(false)), })) } /// `--jobserver-auth=R,W` unsafe fn from_pipe(s: &str, check_pipe: bool) -> Result, FromEnvErrorInner> { let mut parts = s.splitn(2, ','); let read = parts.next().unwrap(); let write = match parts.next() { Some(w) => w, None => return Ok(None), }; let read = read .parse() .map_err(|e| FromEnvErrorInner::CannotParse(format!("cannot parse `read` fd: {e}")))?; let write = write .parse() .map_err(|e| FromEnvErrorInner::CannotParse(format!("cannot parse `write` fd: {e}")))?; // If either or both of these file descriptors are negative, // it means the jobserver is disabled for this process. if read < 0 { return Err(FromEnvErrorInner::NegativeFd(read)); } if write < 0 { return Err(FromEnvErrorInner::NegativeFd(write)); } let creation_arg = ClientCreationArg::Fds { read, write }; // Ok so we've got two integers that look like file descriptors, but // for extra sanity checking let's see if they actually look like // valid files and instances of a pipe if feature enabled before we // return the client. // // If we're called from `make` *without* the leading + on our rule // then we'll have `MAKEFLAGS` env vars but won't actually have // access to the file descriptors. // // `NotAPipe` is a worse error, return it if it's reported for any of the two fds. match (fd_check(read, check_pipe), fd_check(write, check_pipe)) { (read_err @ Err(FromEnvErrorInner::NotAPipe(..)), _) => read_err?, (_, write_err @ Err(FromEnvErrorInner::NotAPipe(..))) => write_err?, (read_err, write_err) => { read_err?; write_err?; // Optimization: Try converting it to a fifo by using /dev/fd // // On linux, opening `/dev/fd/$fd` returns a fd with a new file description, // so we can set `O_NONBLOCK` on it without affecting other processes. // // On macOS, opening `/dev/fd/$fd` seems to be the same as `File::try_clone`. // // I tested this on macOS 14 and Linux 6.5.13 #[cfg(target_os = "linux")] if let (Ok(read), Ok(write)) = ( File::open(format!("/dev/fd/{}", read)), OpenOptions::new() .write(true) .open(format!("/dev/fd/{}", write)), ) { return Ok(Some(Client { read, write, creation_arg, is_non_blocking: Some(AtomicBool::new(false)), })); } } } Ok(Some(Client { read: clone_fd_and_set_cloexec(read)?, write: clone_fd_and_set_cloexec(write)?, creation_arg, is_non_blocking: None, })) } unsafe fn from_fds(read: c_int, write: c_int) -> Client { Client { read: File::from_raw_fd(read), write: File::from_raw_fd(write), creation_arg: ClientCreationArg::Fds { read, write }, is_non_blocking: None, } } pub fn acquire(&self) -> io::Result { // Ignore interrupts and keep trying if that happens loop { if let Some(token) = self.acquire_allow_interrupts()? { return Ok(token); } } } /// Block waiting for a token, returning `None` if we're interrupted with /// EINTR. fn acquire_allow_interrupts(&self) -> io::Result> { // We don't actually know if the file descriptor here is set in // blocking or nonblocking mode. AFAIK all released versions of // `make` use blocking fds for the jobserver, but the unreleased // version of `make` doesn't. In the unreleased version jobserver // fds are set to nonblocking and combined with `pselect` // internally. // // Here we try to be compatible with both strategies. We optimistically // try to read from the file descriptor which then may block, return // a token or indicate that polling is needed. // Blocking reads (if possible) allows the kernel to be more selective // about which readers to wake up when a token is written to the pipe. // // We use `poll` here to block this thread waiting for read // readiness, and then afterwards we perform the `read` itself. If // the `read` returns that it would block then we start over and try // again. // // Also note that we explicitly don't handle EINTR here. That's used // to shut us down, so we otherwise punt all errors upwards. unsafe { let mut fd: libc::pollfd = mem::zeroed(); let mut read = &self.read; fd.fd = read.as_raw_fd(); fd.events = libc::POLLIN; loop { let mut buf = [0]; match read.read(&mut buf) { Ok(1) => return Ok(Some(Acquired { byte: buf[0] })), Ok(_) => { return Err(io::Error::new( io::ErrorKind::UnexpectedEof, "early EOF on jobserver pipe", )); } Err(e) => match e.kind() { io::ErrorKind::WouldBlock => { /* fall through to polling */ } io::ErrorKind::Interrupted => return Ok(None), _ => return Err(e), }, } loop { fd.revents = 0; if libc::poll(&mut fd, 1, -1) == -1 { let e = io::Error::last_os_error(); return match e.kind() { io::ErrorKind::Interrupted => Ok(None), _ => Err(e), }; } if fd.revents != 0 { break; } } } } } pub fn try_acquire(&self) -> io::Result> { let mut buf = [0]; let mut fifo = &self.read; if let Some(is_non_blocking) = self.is_non_blocking.as_ref() { if !is_non_blocking.load(Ordering::Relaxed) { set_nonblocking(fifo.as_raw_fd(), true)?; is_non_blocking.store(true, Ordering::Relaxed); } } else { return Err(io::ErrorKind::Unsupported.into()); } loop { match fifo.read(&mut buf) { Ok(1) => break Ok(Some(Acquired { byte: buf[0] })), Ok(_) => { break Err(io::Error::new( io::ErrorKind::UnexpectedEof, "early EOF on jobserver pipe", )) } Err(e) if e.kind() == io::ErrorKind::WouldBlock => break Ok(None), Err(e) if e.kind() == io::ErrorKind::Interrupted => continue, Err(err) => break Err(err), } } } pub fn release(&self, data: Option<&Acquired>) -> io::Result<()> { // Note that the fd may be nonblocking but we're going to go ahead // and assume that the writes here are always nonblocking (we can // always quickly release a token). If that turns out to not be the // case we'll get an error anyway! let byte = data.map(|d| d.byte).unwrap_or(b'+'); match (&self.write).write(&[byte])? { 1 => Ok(()), _ => Err(io::Error::new( io::ErrorKind::Other, "failed to write token back to jobserver", )), } } pub fn string_arg(&self) -> String { match &self.creation_arg { ClientCreationArg::Fifo(path) => format!("fifo:{}", path.display()), ClientCreationArg::Fds { read, write } => format!("{},{}", read, write), } } pub fn available(&self) -> io::Result { let mut len = MaybeUninit::::uninit(); cvt(unsafe { libc::ioctl(self.read.as_raw_fd(), libc::FIONREAD, len.as_mut_ptr()) })?; Ok(unsafe { len.assume_init() } as usize) } pub fn configure(&self, cmd: &mut Command) { if matches!(self.creation_arg, ClientCreationArg::Fifo { .. }) { // We `File::open`ed it when inheriting from environment, // so no need to set cloexec for fifo. return; } // Here we basically just want to say that in the child process // we'll configure the read/write file descriptors to *not* be // cloexec, so they're inherited across the exec and specified as // integers through `string_arg` above. let read = self.read.as_raw_fd(); let write = self.write.as_raw_fd(); unsafe { cmd.pre_exec(move || { set_cloexec(read, false)?; set_cloexec(write, false)?; Ok(()) }); } } } #[derive(Debug)] pub struct Helper { thread: JoinHandle<()>, state: Arc, } pub(crate) fn spawn_helper( client: crate::Client, state: Arc, mut f: Box) + Send>, ) -> io::Result { static USR1_INIT: Once = Once::new(); let mut err = None; USR1_INIT.call_once(|| unsafe { let mut new: libc::sigaction = mem::zeroed(); #[cfg(target_os = "aix")] { new.sa_union.__su_sigaction = sigusr1_handler; } #[cfg(not(target_os = "aix"))] { new.sa_sigaction = sigusr1_handler as usize; } new.sa_flags = libc::SA_SIGINFO as _; if libc::sigaction(libc::SIGUSR1, &new, ptr::null_mut()) != 0 { err = Some(io::Error::last_os_error()); } }); if let Some(e) = err.take() { return Err(e); } let state2 = state.clone(); let thread = Builder::new().spawn(move || { state2.for_each_request(|helper| loop { match client.inner.acquire_allow_interrupts() { Ok(Some(data)) => { break f(Ok(crate::Acquired { client: client.inner.clone(), data, disabled: false, })); } Err(e) => break f(Err(e)), Ok(None) if helper.lock().producer_done => break, Ok(None) => {} } }); })?; Ok(Helper { thread, state }) } impl Helper { pub fn join(self) { let dur = Duration::from_millis(10); let mut state = self.state.lock(); debug_assert!(state.producer_done); // We need to join our helper thread, and it could be blocked in one // of two locations. First is the wait for a request, but the // initial drop of `HelperState` will take care of that. Otherwise // it may be blocked in `client.acquire()`. We actually have no way // of interrupting that, so resort to `pthread_kill` as a fallback. // This signal should interrupt any blocking `read` call with // `io::ErrorKind::Interrupt` and cause the thread to cleanly exit. // // Note that we don't do this forever though since there's a chance // of bugs, so only do this opportunistically to make a best effort // at clearing ourselves up. for _ in 0..100 { if state.consumer_done { break; } unsafe { // Ignore the return value here of `pthread_kill`, // apparently on OSX if you kill a dead thread it will // return an error, but on other platforms it may not. In // that sense we don't actually know if this will succeed or // not! libc::pthread_kill(self.thread.as_pthread_t() as _, libc::SIGUSR1); } state = self .state .cvar .wait_timeout(state, dur) .unwrap_or_else(|e| e.into_inner()) .0; thread::yield_now(); // we really want the other thread to run } // If we managed to actually see the consumer get done, then we can // definitely wait for the thread. Otherwise it's... off in the ether // I guess? if state.consumer_done { drop(self.thread.join()); } } } unsafe fn fcntl_check(fd: c_int) -> Result<(), FromEnvErrorInner> { match libc::fcntl(fd, libc::F_GETFD) { -1 => Err(FromEnvErrorInner::CannotOpenFd( fd, io::Error::last_os_error(), )), _ => Ok(()), } } unsafe fn fd_check(fd: c_int, check_pipe: bool) -> Result<(), FromEnvErrorInner> { if check_pipe { let mut stat = mem::zeroed(); if libc::fstat(fd, &mut stat) == -1 { let last_os_error = io::Error::last_os_error(); fcntl_check(fd)?; Err(FromEnvErrorInner::NotAPipe(fd, Some(last_os_error))) } else { // On android arm and i686 mode_t is u16 and st_mode is u32, // this generates a type mismatch when S_IFIFO (declared as mode_t) // is used in operations with st_mode, so we use this workaround // to get the value of S_IFIFO with the same type of st_mode. #[allow(unused_assignments)] let mut s_ififo = stat.st_mode; s_ififo = libc::S_IFIFO as _; if stat.st_mode & s_ififo == s_ififo { return Ok(()); } Err(FromEnvErrorInner::NotAPipe(fd, None)) } } else { fcntl_check(fd) } } fn clone_fd_and_set_cloexec(fd: c_int) -> Result { // Safety: fd is a valid fd dand it remains open until returns unsafe { BorrowedFd::borrow_raw(fd) } .try_clone_to_owned() .map(File::from) .map_err(|err| FromEnvErrorInner::CannotOpenFd(fd, err)) } fn set_cloexec(fd: c_int, set: bool) -> io::Result<()> { unsafe { let previous = cvt(libc::fcntl(fd, libc::F_GETFD))?; let new = if set { previous | libc::FD_CLOEXEC } else { previous & !libc::FD_CLOEXEC }; if new != previous { cvt(libc::fcntl(fd, libc::F_SETFD, new))?; } Ok(()) } } fn set_nonblocking(fd: c_int, set: bool) -> io::Result<()> { let status_flag = if set { libc::O_NONBLOCK } else { 0 }; unsafe { cvt(libc::fcntl(fd, libc::F_SETFL, status_flag))?; } Ok(()) } fn cvt(t: c_int) -> io::Result { if t == -1 { Err(io::Error::last_os_error()) } else { Ok(t) } } extern "C" fn sigusr1_handler( _signum: c_int, _info: *mut libc::siginfo_t, _ptr: *mut libc::c_void, ) { // nothing to do } #[cfg(test)] mod test { use super::Client as ClientImp; use crate::{test::run_named_fifo_try_acquire_tests, Client}; use std::{ fs::File, io::{self, Write}, os::unix::io::AsRawFd, sync::Arc, }; fn from_imp_client(imp: ClientImp) -> Client { Client { inner: Arc::new(imp), } } fn new_client_from_fifo() -> (Client, String) { let file = tempfile::NamedTempFile::new().unwrap(); let fifo_path = file.path().to_owned(); file.close().unwrap(); // Remove the NamedTempFile to create fifo nix::unistd::mkfifo(&fifo_path, nix::sys::stat::Mode::S_IRWXU).unwrap(); let arg = format!("fifo:{}", fifo_path.to_str().unwrap()); ( ClientImp::from_fifo(&arg) .unwrap() .map(from_imp_client) .unwrap(), arg, ) } fn new_client_from_pipe() -> (Client, String) { let (read, write) = nix::unistd::pipe().unwrap(); let read = File::from(read); let mut write = File::from(write); write.write_all(b"1").unwrap(); let arg = format!("{},{}", read.as_raw_fd(), write.as_raw_fd()); ( unsafe { ClientImp::from_pipe(&arg, true) } .unwrap() .map(from_imp_client) .unwrap(), arg, ) } #[test] fn test_try_acquire_named_fifo() { run_named_fifo_try_acquire_tests(&new_client_from_fifo().0); } #[test] fn test_try_acquire_annoymous_pipe_linux_specific_optimization() { #[cfg(not(target_os = "linux"))] assert_eq!( new_client_from_pipe().0.try_acquire().unwrap_err().kind(), io::ErrorKind::Unsupported ); #[cfg(target_os = "linux")] { let client = new_client_from_pipe().0; client.acquire().unwrap().drop_without_releasing(); run_named_fifo_try_acquire_tests(&client); } } #[test] fn test_string_arg() { let (client, arg) = new_client_from_fifo(); assert_eq!(client.inner.string_arg(), arg); let (client, arg) = new_client_from_pipe(); assert_eq!(client.inner.string_arg(), arg); } }