diff --git a/lib/index.js b/lib/index.js index ddd33e4..2c9426a 100644 --- a/lib/index.js +++ b/lib/index.js @@ -7,10 +7,14 @@ import EventEmitter from 'events'; import { basename, resolve } from 'path'; import { stat, readFile } from 'fs-promise'; import resumer from 'resumer'; +import splitArray from 'split-array'; // limit of size of files to find const ONEMB = bytes('1mb'); +// how many concurrent HTTP/2 stream uploads +const MAX_CONCURRENT = 10; + export default class Now extends EventEmitter { constructor (token, { forceNew = false, debug = false }) { super(); @@ -100,37 +104,49 @@ export default class Now extends EventEmitter { } upload () { - Promise.all(this._missing.map((sha) => retry(async (bail) => { - const file = this._files.get(sha); - const { data, name } = file; + const parts = splitArray(this._missing, MAX_CONCURRENT); - if (this._debug) console.time(`> [debug] /sync ${name}`); + if (this._debug) { + console.log('> [debug] Will upload ' + + `${this._missing.length} files in ${parts.length} ` + + `steps of ${MAX_CONCURRENT} uploads.`); + } - const stream = resumer().queue(data).end(); - const res = await this._fetch('/sync', { - method: 'POST', - headers: { - 'Content-Type': 'application/octet-stream', - 'Content-Length': data.length, - 'x-now-deployment-id': this._id, - 'x-now-sha': sha, - 'x-now-file': toRelative(name, this._path), - 'x-now-size': data.length - }, - body: stream - }); - if (this._debug) console.timeEnd(`> [debug] /sync ${name}`); + const uploadChunk = () => { + Promise.all(parts.shift().map((sha) => retry(async (bail) => { + const file = this._files.get(sha); + const { data, name } = file; + + if (this._debug) console.time(`> [debug] /sync ${name}`); + + const stream = resumer().queue(data).end(); + const res = await this._fetch('/sync', { + method: 'POST', + headers: { + 'Content-Type': 'application/octet-stream', + 'Content-Length': data.length, + 'x-now-deployment-id': this._id, + 'x-now-sha': sha, + 'x-now-file': toRelative(name, this._path), + 'x-now-size': data.length + }, + body: stream + }); + if (this._debug) console.timeEnd(`> [debug] /sync ${name}`); + + // no retry on 403 + if (403 === res.status) { + if (this._debug) console.log('> [debug] bailing on creating due to 403'); + return bail(responseError(res)); + } - // no retry on 403 - if (403 === res.status) { - if (this._debug) console.log('> [debug] bailing on creating due to 403'); - return bail(responseError(res)); - } + this.emit('upload', file); + }, { retries: 5, randomize: true, onRetry: this._onRetry }))) + .then(() => parts.length ? uploadChunk() : this.emit('complete')) + .catch((err) => this.emit('error', err)); + }; - this.emit('upload', file); - }, { retries: 5, randomize: true, onRetry: this._onRetry }))) - .then(() => this.emit('complete')) - .catch((err) => this.emit('error', err)); + uploadChunk(); } _onRetry (err) { diff --git a/package.json b/package.json index 23e5da1..aa74f8f 100644 --- a/package.json +++ b/package.json @@ -25,7 +25,8 @@ "progress": "1.1.8", "resumer": "0.0.0", "retry": "0.9.0", - "spdy": "3.2.3" + "spdy": "3.2.3", + "split-array": "1.0.1" }, "devDependencies": { "alpha-sort": "1.0.2",