Browse Source

Rejigger parallel compilation support

This commit reimplements parallel compilation support with a few goals
in mind:

* Primarily the `jobserver` crate is now used to limit parallelism
  across build scripts
* The `rayon` crate is no longer used to ensure this is a pretty
  lightweight dependency crate.

It's hoped that after this bakes for a bit we may be able to turn this
on by default!
gh-actions
Alex Crichton 5 years ago
parent
commit
4fda8db89e
  1. 5
      Cargo.toml
  2. 137
      src/lib.rs

5
Cargo.toml

@ -17,10 +17,11 @@ categories = ["development-tools::build-utils"]
exclude = ["/.travis.yml", "/appveyor.yml"]
[dependencies]
rayon = { version = "1.0", optional = true }
num_cpus = { version = "1.10", optional = true }
jobserver = { version = "0.1.16", optional = true }
[features]
parallel = ["rayon"]
parallel = ["num_cpus", "jobserver"]
[dev-dependencies]
tempdir = "0.3"

137
src/lib.rs

@ -58,9 +58,6 @@
#![allow(deprecated)]
#![deny(missing_docs)]
#[cfg(feature = "parallel")]
extern crate rayon;
use std::collections::HashMap;
use std::env;
use std::ffi::{OsStr, OsString};
@ -944,22 +941,132 @@ impl Build {
}
#[cfg(feature = "parallel")]
fn compile_objects(&self, objs: &[Object]) -> Result<(), Error> {
use self::rayon::prelude::*;
fn compile_objects<'me>(&'me self, objs: &[Object]) -> Result<(), Error> {
use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
use std::sync::Once;
// When compiling objects in parallel we do a few dirty tricks to speed
// things up:
//
// * First is that we use the `jobserver` crate to limit the parallelism
// of this build script. The `jobserver` crate will use a jobserver
// configured by Cargo for build scripts to ensure that parallelism is
// coordinated across C compilations and Rust compilations. Before we
// compile anything we make sure to wait until we acquire a token.
//
// Note that this jobserver is cached globally so we only used one per
// process and only worry about creating it once.
//
// * Next we use a raw `thread::spawn` per thread to actually compile
// objects in parallel. We only actually spawn a thread after we've
// acquired a token to perform some work
//
// * Finally though we want to keep the dependencies of this crate
// pretty light, so we avoid using a safe abstraction like `rayon` and
// instead rely on some bits of `unsafe` code. We know that this stack
// frame persists while everything is compiling so we use all the
// stack-allocated objects without cloning/reallocating. We use a
// transmute to `State` with a `'static` lifetime to persist
// everything we need across the boundary, and the join-on-drop
// semantics of `JoinOnDrop` should ensure that our stack frame is
// alive while threads are alive.
//
// With all that in mind we compile all objects in a loop here, after we
// acquire the appropriate tokens, Once all objects have been compiled
// we join on all the threads and propagate the results of compilation.
//
// Note that as a slight optimization we try to break out as soon as
// possible as soon as any compilation fails to ensure that errors get
// out to the user as fast as possible.
let server = jobserver();
let error = AtomicBool::new(false);
let mut threads = Vec::new();
for obj in objs {
if error.load(SeqCst) {
break;
}
let token = server.acquire()?;
let state = State {
build: self,
obj,
error: &error,
};
let state = unsafe { std::mem::transmute::<State, State<'static>>(state) };
let thread = thread::spawn(|| {
let state: State<'me> = state; // erase the `'static` lifetime
let result = state.build.compile_object(state.obj);
if result.is_err() {
state.error.store(true, SeqCst);
}
drop(token); // make sure our jobserver token is released after the compile
return result;
});
threads.push(JoinOnDrop(Some(thread)));
}
if let Some(amt) = self.getenv("NUM_JOBS") {
if let Ok(amt) = amt.parse() {
let _ = rayon::ThreadPoolBuilder::new()
.num_threads(amt)
.build_global();
for mut thread in threads {
if let Some(thread) = thread.0.take() {
thread.join().expect("thread should not panic")?;
}
}
// Check for any errors and return the first one found.
objs.par_iter()
.with_max_len(1)
.map(|obj| self.compile_object(obj))
.collect()
return Ok(());
/// Shared state from the parent thread to the child thread. This
/// package of pointers is temporarily transmuted to a `'static`
/// lifetime to cross the thread boundary and then once the thread is
/// running we erase the `'static` to go back to an anonymous lifetime.
struct State<'a> {
build: &'a Build,
obj: &'a Object,
error: &'a AtomicBool,
}
/// Returns a suitable `jobserver::Client` used to coordinate
/// parallelism between build scripts.
fn jobserver() -> &'static jobserver::Client {
static INIT: Once = Once::new();
static mut JOBSERVER: Option<jobserver::Client> = None;
fn _assert_sync<T: Sync>() {}
_assert_sync::<jobserver::Client>();
unsafe {
INIT.call_once(|| {
let server = default_jobserver();
JOBSERVER = Some(server);
});
JOBSERVER.as_ref().unwrap()
}
}
unsafe fn default_jobserver() -> jobserver::Client {
// Try to use the environmental jobserver which Cargo typically
// initializes for us...
if let Some(client) = jobserver::Client::from_env() {
return client;
}
// ... but if that fails for whatever reason fall back to the number
// of cpus on the system or the `NUM_JOBS` env var.
let mut parallelism = num_cpus::get();
if let Ok(amt) = env::var("NUM_JOBS") {
if let Ok(amt) = amt.parse() {
parallelism = amt;
}
}
jobserver::Client::new(parallelism).expect("failed to create jobserver")
}
struct JoinOnDrop(Option<thread::JoinHandle<Result<(), Error>>>);
impl Drop for JoinOnDrop {
fn drop(&mut self) {
if let Some(thread) = self.0.take() {
drop(thread.join());
}
}
}
}
#[cfg(not(feature = "parallel"))]

Loading…
Cancel
Save