Browse Source

wip

wip-new-parallel
Alex Crichton 5 years ago
parent
commit
b4c0bb797c
  1. 5
      Cargo.toml
  2. 129
      src/lib.rs

5
Cargo.toml

@ -17,10 +17,11 @@ categories = ["development-tools::build-utils"]
exclude = ["/.travis.yml", "/appveyor.yml"]
[dependencies]
rayon = { version = "1.0", optional = true }
num_cpus = { version = "1.10", optional = true }
jobserver = { version = "0.1.16", optional = true }
[features]
parallel = ["rayon"]
parallel = ["num_cpus", "jobserver"]
[dev-dependencies]
tempdir = "0.3"

129
src/lib.rs

@ -58,9 +58,6 @@
#![allow(deprecated)]
#![deny(missing_docs)]
#[cfg(feature = "parallel")]
extern crate rayon;
use std::collections::HashMap;
use std::env;
use std::ffi::{OsStr, OsString};
@ -945,21 +942,125 @@ impl Build {
#[cfg(feature = "parallel")]
fn compile_objects(&self, objs: &[Object]) -> Result<(), Error> {
use self::rayon::prelude::*;
use std::sync::Once;
use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
if let Some(amt) = self.getenv("NUM_JOBS") {
if let Ok(amt) = amt.parse() {
let _ = rayon::ThreadPoolBuilder::new()
.num_threads(amt)
.build_global();
// When compiling objects in parallel we do a few dirty tricks to speed
// things up:
//
// * First is that we use the `jobserver` crate to limit the parallelism
// of this build script. The `jobserver` crate will use a jobserver
// configured by Cargo for build scripts to ensure that parallelism is
// coordinated across C compilations and Rust compilations. Before we
// compile anything we make sure to wait until we acquire a token.
//
// Note that this jobserver is cached globally so we only used one per
// process and only worry about creating it once.
//
// * Next we use a raw `thread::spawn` per thread to actually compile
// objects in parallel. We only actually spawn a thread after we've
// acquired a token to perform some work
//
// * Finally though we want to keep the dependencies of this crate
// pretty light, so we avoid using a safe abstraction like `rayon` and
// instead rely on some bits of `unsafe` code. We know that this stack
// frame persists while everything is compiling so we use all the
// stack-allocated objects without cloning/reallocating. We use a
// transmute to `State` with a `'static` lifetime to persist
// everything we need across the boundary, and the join-on-drop
// semantics of `JoinOnDrop` should ensure that our stack frame is
// alive while threads are alive.
//
// With all that in mind we compile all objects in a loop here, after we
// acquire the appropriate tokens, Once all objects have been compiled
// we join on all the threads and propagate the results of compilation.
//
// Note that as a slight optimization we try to break out as soon as
// possible as soon as any compilation fails to ensure that errors get
// out to the user as fast as possible.
let server = jobserver();
let error = AtomicBool::new(false);
let mut threads = Vec::new();
for obj in objs {
if error.load(SeqCst) {
break;
}
let token = server.acquire()?;
let state = State {
build: self,
obj,
error: &error,
};
let state = unsafe {
std::mem::transmute::<State, State<'static>>(state)
};
let thread = thread::spawn(move || {
let state: State = state; // erase the `'static` lifetime
let result = state.build.compile_object(state.obj);
if result.is_err() {
state.error.store(true, SeqCst);
}
drop(token);
return result;
});
threads.push(JoinOnDrop(Some(thread)));
}
// Check for any errors and return the first one found.
objs.par_iter()
.with_max_len(1)
.map(|obj| self.compile_object(obj))
.collect()
for mut thread in threads {
if let Some(thread) = thread.0.take() {
thread.join().expect("thread should not panic")?;
}
}
return Ok(());
struct State<'a> {
build: &'a Build,
obj: &'a Object,
error: &'a AtomicBool,
}
//
fn jobserver() -> &'static jobserver::Client {
static INIT: Once = Once::new();
static mut JOBSERVER: Option<jobserver::Client> = None;
fn _assert_sync<T: Sync>() {}
_assert_sync::<jobserver::Client>();
unsafe {
INIT.call_once(|| {
let server = default_jobserver();
JOBSERVER = Some(server);
});
JOBSERVER.as_ref().unwrap()
}
}
unsafe fn default_jobserver() -> jobserver::Client {
if let Some(client) = jobserver::Client::from_env() {
return client;
}
let mut parallelism = num_cpus::get();
if let Ok(amt) = env::var("NUM_JOBS") {
if let Ok(amt) = amt.parse() {
parallelism = amt;
}
}
jobserver::Client::new(parallelism).expect("failed to create jobserver")
}
struct JoinOnDrop(Option<thread::JoinHandle<Result<(), Error>>>);
impl Drop for JoinOnDrop {
fn drop(&mut self) {
if let Some(thread) = self.0.take() {
drop(thread.join());
}
}
}
}
#[cfg(not(feature = "parallel"))]

Loading…
Cancel
Save