diff --git a/Cargo.toml b/Cargo.toml index e29002d..123ac39 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,10 +17,11 @@ categories = ["development-tools::build-utils"] exclude = ["/.travis.yml", "/appveyor.yml"] [dependencies] -rayon = { version = "1.0", optional = true } +num_cpus = { version = "1.10", optional = true } +jobserver = { version = "0.1.16", optional = true } [features] -parallel = ["rayon"] +parallel = ["num_cpus", "jobserver"] [dev-dependencies] tempdir = "0.3" diff --git a/src/lib.rs b/src/lib.rs index f5bfd64..dd86925 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -58,9 +58,6 @@ #![allow(deprecated)] #![deny(missing_docs)] -#[cfg(feature = "parallel")] -extern crate rayon; - use std::collections::HashMap; use std::env; use std::ffi::{OsStr, OsString}; @@ -944,22 +941,132 @@ impl Build { } #[cfg(feature = "parallel")] - fn compile_objects(&self, objs: &[Object]) -> Result<(), Error> { - use self::rayon::prelude::*; + fn compile_objects<'me>(&'me self, objs: &[Object]) -> Result<(), Error> { + use std::sync::atomic::{AtomicBool, Ordering::SeqCst}; + use std::sync::Once; + + // When compiling objects in parallel we do a few dirty tricks to speed + // things up: + // + // * First is that we use the `jobserver` crate to limit the parallelism + // of this build script. The `jobserver` crate will use a jobserver + // configured by Cargo for build scripts to ensure that parallelism is + // coordinated across C compilations and Rust compilations. Before we + // compile anything we make sure to wait until we acquire a token. + // + // Note that this jobserver is cached globally so we only used one per + // process and only worry about creating it once. + // + // * Next we use a raw `thread::spawn` per thread to actually compile + // objects in parallel. We only actually spawn a thread after we've + // acquired a token to perform some work + // + // * Finally though we want to keep the dependencies of this crate + // pretty light, so we avoid using a safe abstraction like `rayon` and + // instead rely on some bits of `unsafe` code. We know that this stack + // frame persists while everything is compiling so we use all the + // stack-allocated objects without cloning/reallocating. We use a + // transmute to `State` with a `'static` lifetime to persist + // everything we need across the boundary, and the join-on-drop + // semantics of `JoinOnDrop` should ensure that our stack frame is + // alive while threads are alive. + // + // With all that in mind we compile all objects in a loop here, after we + // acquire the appropriate tokens, Once all objects have been compiled + // we join on all the threads and propagate the results of compilation. + // + // Note that as a slight optimization we try to break out as soon as + // possible as soon as any compilation fails to ensure that errors get + // out to the user as fast as possible. + let server = jobserver(); + let error = AtomicBool::new(false); + let mut threads = Vec::new(); + for obj in objs { + if error.load(SeqCst) { + break; + } + let token = server.acquire()?; + let state = State { + build: self, + obj, + error: &error, + }; + let state = unsafe { std::mem::transmute::>(state) }; + let thread = thread::spawn(|| { + let state: State<'me> = state; // erase the `'static` lifetime + let result = state.build.compile_object(state.obj); + if result.is_err() { + state.error.store(true, SeqCst); + } + drop(token); // make sure our jobserver token is released after the compile + return result; + }); + threads.push(JoinOnDrop(Some(thread))); + } - if let Some(amt) = self.getenv("NUM_JOBS") { - if let Ok(amt) = amt.parse() { - let _ = rayon::ThreadPoolBuilder::new() - .num_threads(amt) - .build_global(); + for mut thread in threads { + if let Some(thread) = thread.0.take() { + thread.join().expect("thread should not panic")?; } } - // Check for any errors and return the first one found. - objs.par_iter() - .with_max_len(1) - .map(|obj| self.compile_object(obj)) - .collect() + return Ok(()); + + /// Shared state from the parent thread to the child thread. This + /// package of pointers is temporarily transmuted to a `'static` + /// lifetime to cross the thread boundary and then once the thread is + /// running we erase the `'static` to go back to an anonymous lifetime. + struct State<'a> { + build: &'a Build, + obj: &'a Object, + error: &'a AtomicBool, + } + + /// Returns a suitable `jobserver::Client` used to coordinate + /// parallelism between build scripts. + fn jobserver() -> &'static jobserver::Client { + static INIT: Once = Once::new(); + static mut JOBSERVER: Option = None; + + fn _assert_sync() {} + _assert_sync::(); + + unsafe { + INIT.call_once(|| { + let server = default_jobserver(); + JOBSERVER = Some(server); + }); + JOBSERVER.as_ref().unwrap() + } + } + + unsafe fn default_jobserver() -> jobserver::Client { + // Try to use the environmental jobserver which Cargo typically + // initializes for us... + if let Some(client) = jobserver::Client::from_env() { + return client; + } + + // ... but if that fails for whatever reason fall back to the number + // of cpus on the system or the `NUM_JOBS` env var. + let mut parallelism = num_cpus::get(); + if let Ok(amt) = env::var("NUM_JOBS") { + if let Ok(amt) = amt.parse() { + parallelism = amt; + } + } + jobserver::Client::new(parallelism).expect("failed to create jobserver") + } + + struct JoinOnDrop(Option>>); + + impl Drop for JoinOnDrop { + fn drop(&mut self) { + if let Some(thread) = self.0.take() { + drop(thread.join()); + } + } + } } #[cfg(not(feature = "parallel"))]