Browse Source

Add progress events (#322)

no-retry
Vadim Demedes 8 years ago
committed by Sindre Sorhus
parent
commit
be7b80252a
  1. 171
      index.js
  2. 6
      package.json
  3. 31
      readme.md
  4. 189
      test/progress.js

171
index.js

@ -3,9 +3,12 @@ const EventEmitter = require('events');
const http = require('http'); const http = require('http');
const https = require('https'); const https = require('https');
const PassThrough = require('stream').PassThrough; const PassThrough = require('stream').PassThrough;
const Transform = require('stream').Transform;
const urlLib = require('url'); const urlLib = require('url');
const fs = require('fs');
const querystring = require('querystring'); const querystring = require('querystring');
const duplexer3 = require('duplexer3'); const duplexer3 = require('duplexer3');
const intoStream = require('into-stream');
const isStream = require('is-stream'); const isStream = require('is-stream');
const getStream = require('get-stream'); const getStream = require('get-stream');
const timedOut = require('timed-out'); const timedOut = require('timed-out');
@ -13,17 +16,51 @@ const urlParseLax = require('url-parse-lax');
const urlToOptions = require('url-to-options'); const urlToOptions = require('url-to-options');
const lowercaseKeys = require('lowercase-keys'); const lowercaseKeys = require('lowercase-keys');
const decompressResponse = require('decompress-response'); const decompressResponse = require('decompress-response');
const mimicResponse = require('mimic-response');
const isRetryAllowed = require('is-retry-allowed'); const isRetryAllowed = require('is-retry-allowed');
const Buffer = require('safe-buffer').Buffer; const Buffer = require('safe-buffer').Buffer;
const isURL = require('isurl'); const isURL = require('isurl');
const isPlainObj = require('is-plain-obj'); const isPlainObj = require('is-plain-obj');
const PCancelable = require('p-cancelable'); const PCancelable = require('p-cancelable');
const pTimeout = require('p-timeout'); const pTimeout = require('p-timeout');
const pify = require('pify');
const pkg = require('./package'); const pkg = require('./package');
const getMethodRedirectCodes = new Set([300, 301, 302, 303, 304, 305, 307, 308]); const getMethodRedirectCodes = new Set([300, 301, 302, 303, 304, 305, 307, 308]);
const allMethodRedirectCodes = new Set([300, 303, 307, 308]); const allMethodRedirectCodes = new Set([300, 303, 307, 308]);
const isFormData = body => isStream(body) && typeof body.getBoundary === 'function';
const getBodySize = opts => {
const body = opts.body;
if (opts.headers['content-length']) {
return Number(opts.headers['content-length']);
}
if (!body && !opts.stream) {
return 0;
}
if (typeof body === 'string') {
return Buffer.byteLength(body);
}
if (isFormData(body)) {
return pify(body.getLength.bind(body))();
}
if (body instanceof fs.ReadStream) {
return pify(fs.stat)(body.path).then(stat => stat.size);
}
if (isStream(body) && Buffer.isBuffer(body._buffer)) {
return body._buffer.length;
}
return null;
};
function requestAsEventEmitter(opts) { function requestAsEventEmitter(opts) {
opts = opts || {}; opts = opts || {};
@ -32,6 +69,8 @@ function requestAsEventEmitter(opts) {
const redirects = []; const redirects = [];
let retryCount = 0; let retryCount = 0;
let redirectUrl; let redirectUrl;
let uploadBodySize;
let uploaded = 0;
const get = opts => { const get = opts => {
if (opts.protocol !== 'http:' && opts.protocol !== 'https:') { if (opts.protocol !== 'http:' && opts.protocol !== 'https:') {
@ -46,7 +85,17 @@ function requestAsEventEmitter(opts) {
fn = electron.net || electron.remote.net; fn = electron.net || electron.remote.net;
} }
let progressInterval;
const req = fn.request(opts, res => { const req = fn.request(opts, res => {
clearInterval(progressInterval);
ee.emit('uploadProgress', {
percent: 1,
transferred: uploaded,
total: uploadBodySize
});
const statusCode = res.statusCode; const statusCode = res.statusCode;
res.url = redirectUrl || requestUrl; res.url = redirectUrl || requestUrl;
@ -85,22 +134,65 @@ function requestAsEventEmitter(opts) {
return; return;
} }
const downloadBodySize = Number(res.headers['content-length']) || null;
let downloaded = 0;
setImmediate(() => { setImmediate(() => {
const progressStream = new Transform({
transform(chunk, encoding, callback) {
downloaded += chunk.length;
const percent = downloadBodySize ? downloaded / downloadBodySize : 0;
// Let flush() be responsible for emitting the last event
if (percent < 1) {
ee.emit('downloadProgress', {
percent,
transferred: downloaded,
total: downloadBodySize
});
}
callback(null, chunk);
},
flush(callback) {
ee.emit('downloadProgress', {
percent: 1,
transferred: downloaded,
total: downloadBodySize
});
callback();
}
});
mimicResponse(res, progressStream);
progressStream.redirectUrls = redirects;
const response = opts.decompress === true && const response = opts.decompress === true &&
typeof decompressResponse === 'function' && typeof decompressResponse === 'function' &&
req.method !== 'HEAD' ? decompressResponse(res) : res; req.method !== 'HEAD' ? decompressResponse(progressStream) : progressStream;
if (!opts.decompress && ['gzip', 'deflate'].indexOf(res.headers['content-encoding']) !== -1) { if (!opts.decompress && ['gzip', 'deflate'].indexOf(res.headers['content-encoding']) !== -1) {
opts.encoding = null; opts.encoding = null;
} }
response.redirectUrls = redirects;
ee.emit('response', response); ee.emit('response', response);
ee.emit('downloadProgress', {
percent: 0,
transferred: 0,
total: downloadBodySize
});
res.pipe(progressStream);
}); });
}); });
req.once('error', err => { req.once('error', err => {
clearInterval(progressInterval);
const backoff = opts.retries(++retryCount, err); const backoff = opts.retries(++retryCount, err);
if (backoff) { if (backoff) {
@ -111,7 +203,44 @@ function requestAsEventEmitter(opts) {
ee.emit('error', new got.RequestError(err, opts)); ee.emit('error', new got.RequestError(err, opts));
}); });
ee.on('request', req => {
ee.emit('uploadProgress', {
percent: 0,
transferred: 0,
total: uploadBodySize
});
req.connection.on('connect', () => {
const uploadEventFrequency = 150;
progressInterval = setInterval(() => {
const lastUploaded = uploaded;
const headersSize = Buffer.byteLength(req._header);
uploaded = req.connection.bytesWritten - headersSize;
// Prevent the known issue of `bytesWritten` being larger than body size
if (uploadBodySize && uploaded > uploadBodySize) {
uploaded = uploadBodySize;
}
// Don't emit events with unchanged progress and
// prevent last event from being emitted, because
// it's emitted when `response` is emitted
if (uploaded === lastUploaded || uploaded === uploadBodySize) {
return;
}
ee.emit('uploadProgress', {
percent: uploadBodySize ? uploaded / uploadBodySize : 0,
transferred: uploaded,
total: uploadBodySize
});
}, uploadEventFrequency);
});
});
if (opts.gotTimeout) { if (opts.gotTimeout) {
clearInterval(progressInterval);
timedOut(req, opts.gotTimeout); timedOut(req, opts.gotTimeout);
} }
@ -121,8 +250,16 @@ function requestAsEventEmitter(opts) {
}; };
setImmediate(() => { setImmediate(() => {
get(opts); Promise.resolve(getBodySize(opts))
.then(size => {
uploadBodySize = size;
get(opts);
})
.catch(err => {
ee.emit('error', err);
});
}); });
return ee; return ee;
} }
@ -131,7 +268,9 @@ function asPromise(opts) {
pTimeout(requestPromise, opts.gotTimeout.request, new got.RequestError({message: 'Request timed out', code: 'ETIMEDOUT'}, opts)) : pTimeout(requestPromise, opts.gotTimeout.request, new got.RequestError({message: 'Request timed out', code: 'ETIMEDOUT'}, opts)) :
requestPromise; requestPromise;
return timeoutFn(new PCancelable((onCancel, resolve, reject) => { const proxy = new EventEmitter();
const promise = timeoutFn(new PCancelable((onCancel, resolve, reject) => {
const ee = requestAsEventEmitter(opts); const ee = requestAsEventEmitter(opts);
let cancelOnRequest = false; let cancelOnRequest = false;
@ -191,10 +330,21 @@ function asPromise(opts) {
}); });
ee.on('error', reject); ee.on('error', reject);
ee.on('uploadProgress', proxy.emit.bind(proxy, 'uploadProgress'));
ee.on('downloadProgress', proxy.emit.bind(proxy, 'downloadProgress'));
})); }));
promise.on = (name, fn) => {
proxy.on(name, fn);
return promise;
};
return promise;
} }
function asStream(opts) { function asStream(opts) {
opts.stream = true;
const input = new PassThrough(); const input = new PassThrough();
const output = new PassThrough(); const output = new PassThrough();
const proxy = duplexer3(input, output); const proxy = duplexer3(input, output);
@ -256,6 +406,8 @@ function asStream(opts) {
ee.on('redirect', proxy.emit.bind(proxy, 'redirect')); ee.on('redirect', proxy.emit.bind(proxy, 'redirect'));
ee.on('error', proxy.emit.bind(proxy, 'error')); ee.on('error', proxy.emit.bind(proxy, 'error'));
ee.on('uploadProgress', proxy.emit.bind(proxy, 'uploadProgress'));
ee.on('downloadProgress', proxy.emit.bind(proxy, 'downloadProgress'));
return proxy; return proxy;
} }
@ -320,7 +472,7 @@ function normalizeArguments(url, opts) {
throw new TypeError('options.body must be a plain Object or Array when options.form or options.json is used'); throw new TypeError('options.body must be a plain Object or Array when options.form or options.json is used');
} }
if (isStream(body) && typeof body.getBoundary === 'function') { if (isFormData(body)) {
// Special case for https://github.com/form-data/form-data // Special case for https://github.com/form-data/form-data
headers['content-type'] = headers['content-type'] || `multipart/form-data; boundary=${body.getBoundary()}`; headers['content-type'] = headers['content-type'] || `multipart/form-data; boundary=${body.getBoundary()}`;
} else if (opts.form && canBodyBeStringified) { } else if (opts.form && canBodyBeStringified) {
@ -336,6 +488,13 @@ function normalizeArguments(url, opts) {
headers['content-length'] = length; headers['content-length'] = length;
} }
// Convert buffer to stream to receive upload progress events
// see https://github.com/sindresorhus/got/pull/322
if (Buffer.isBuffer(body)) {
opts.body = intoStream(body);
opts.body._buffer = body;
}
opts.method = (opts.method || 'POST').toUpperCase(); opts.method = (opts.method || 'POST').toUpperCase();
} else { } else {
opts.method = (opts.method || 'GET').toUpperCase(); opts.method = (opts.method || 'GET').toUpperCase();

6
package.json

@ -53,13 +53,16 @@
"decompress-response": "^3.2.0", "decompress-response": "^3.2.0",
"duplexer3": "^0.1.4", "duplexer3": "^0.1.4",
"get-stream": "^3.0.0", "get-stream": "^3.0.0",
"into-stream": "^3.1.0",
"is-plain-obj": "^1.1.0", "is-plain-obj": "^1.1.0",
"is-retry-allowed": "^1.0.0", "is-retry-allowed": "^1.0.0",
"is-stream": "^1.0.0", "is-stream": "^1.0.0",
"isurl": "^1.0.0-alpha5", "isurl": "^1.0.0-alpha5",
"lowercase-keys": "^1.0.0", "lowercase-keys": "^1.0.0",
"mimic-response": "^1.0.0",
"p-cancelable": "^0.3.0", "p-cancelable": "^0.3.0",
"p-timeout": "^1.1.1", "p-timeout": "^1.1.1",
"pify": "^3.0.0",
"safe-buffer": "^5.0.1", "safe-buffer": "^5.0.1",
"timed-out": "^4.0.0", "timed-out": "^4.0.0",
"url-parse-lax": "^1.0.0", "url-parse-lax": "^1.0.0",
@ -70,10 +73,9 @@
"coveralls": "^2.11.4", "coveralls": "^2.11.4",
"form-data": "^2.1.1", "form-data": "^2.1.1",
"get-port": "^3.0.0", "get-port": "^3.0.0",
"into-stream": "^3.0.0",
"nyc": "^11.0.2", "nyc": "^11.0.2",
"pem": "^1.4.4", "pem": "^1.4.4",
"pify": "^3.0.0", "slow-stream": "0.0.4",
"tempfile": "^2.0.0", "tempfile": "^2.0.0",
"tempy": "^0.1.0", "tempy": "^0.1.0",
"universal-url": "1.0.0-alpha", "universal-url": "1.0.0-alpha",

31
readme.md

@ -21,6 +21,7 @@ Created because [`request`](https://github.com/request/request) is bloated *(sev
- [Request cancelation](#aborting-the-request) - [Request cancelation](#aborting-the-request)
- [Follows redirects](#followredirect) - [Follows redirects](#followredirect)
- [Retries on network failure](#retries) - [Retries on network failure](#retries)
- [Progress events](#onuploadprogress-progress)
- [Handles gzip/deflate](#decompress) - [Handles gzip/deflate](#decompress)
- [Timeout handling](#timeout) - [Timeout handling](#timeout)
- [Errors with metadata](#errors) - [Errors with metadata](#errors)
@ -202,6 +203,36 @@ got.stream('github.com')
`redirect` event to get the response object of a redirect. The second argument is options for the next request to the redirect location. `redirect` event to get the response object of a redirect. The second argument is options for the next request to the redirect location.
##### .on('uploadProgress', progress)
##### .on('downloadProgress', progress)
Progress events for uploading (sending request) and downloading (receiving response). The `progress` argument is an object like:
```js
{
percent: 0.1,
transferred: 1024,
total: 10240
}
```
If it's not possible to retrieve the body size (can happen when streaming), `total` will be `null`.
**Note**: Progress events can also be used with promises.
```js
got('todomvc.com')
.on('downloadProgress', progress => {
// Report download progress
})
.on('uploadProgress', progress => {
// Report upload progress
})
.then(response => {
// Done
});
```
##### .on('error', error, body, response) ##### .on('error', error, body, response)
`error` event emitted in case of protocol error (like `ENOTFOUND` etc.) or status error (4xx or 5xx). The second argument is the body of the server response in case of status error. The third argument is response object. `error` event emitted in case of protocol error (like `ENOTFOUND` etc.) or status error (4xx or 5xx). The second argument is the body of the server response in case of status error. The third argument is response object.

189
test/progress.js

@ -0,0 +1,189 @@
import fs from 'fs';
import SlowStream from 'slow-stream';
import intoStream from 'into-stream';
import getStream from 'get-stream';
import FormData from 'form-data';
import tempfile from 'tempfile';
import pify from 'pify';
import test from 'ava';
import got from '..';
import {createServer} from './helpers/server';
const checkEvents = (t, events, bodySize = null) => {
t.true(events.length >= 2);
const hasBodySize = typeof bodySize === 'number';
let lastEvent = events.shift();
if (!hasBodySize) {
t.is(lastEvent.percent, 0);
}
for (const [index, event] of events.entries()) {
if (hasBodySize) {
t.is(event.percent, event.transferred / bodySize);
t.true(event.percent > lastEvent.percent);
} else {
const isLastEvent = index === events.length - 1;
t.is(event.percent, isLastEvent ? 1 : 0);
}
t.true(event.transferred >= lastEvent.transferred);
t.is(event.total, bodySize);
lastEvent = event;
}
};
const file = Buffer.alloc(1024 * 1024 * 2);
let s;
test.before('setup', async () => {
s = await createServer();
s.on('/download', (req, res) => {
res.setHeader('content-length', file.length);
intoStream(file)
.pipe(new SlowStream({maxWriteInterval: 50}))
.pipe(res);
});
s.on('/download/no-total', (req, res) => {
res.write('hello');
res.end();
});
s.on('/upload', (req, res) => {
req
.pipe(new SlowStream({maxWriteInterval: 100}))
.on('end', () => res.end());
});
await s.listen(s.port);
});
test('download progress', async t => {
const events = [];
const res = await got(`${s.url}/download`, {encoding: null})
.on('downloadProgress', e => events.push(e));
checkEvents(t, events, res.body.length);
});
test('download progress - missing total size', async t => {
const events = [];
await got(`${s.url}/download/no-total`)
.on('downloadProgress', e => events.push(e));
checkEvents(t, events);
});
test('download progress - stream', async t => {
const events = [];
const stream = got.stream(`${s.url}/download`, {encoding: null})
.on('downloadProgress', e => events.push(e));
await getStream(stream);
checkEvents(t, events, file.length);
});
test('upload progress - file', async t => {
const events = [];
await got.post(`${s.url}/upload`, {body: file})
.on('uploadProgress', e => events.push(e));
checkEvents(t, events, file.length);
});
test('upload progress - file stream', async t => {
const path = tempfile();
fs.writeFileSync(path, file);
const events = [];
await got.post(`${s.url}/upload`, {body: fs.createReadStream(path)})
.on('uploadProgress', e => events.push(e));
checkEvents(t, events, file.length);
});
test('upload progress - form data', async t => {
const events = [];
const body = new FormData();
body.append('key', 'value');
body.append('file', file);
const size = await pify(body.getLength.bind(body))();
await got.post(`${s.url}/upload`, {body})
.on('uploadProgress', e => events.push(e));
checkEvents(t, events, size);
});
test('upload progress - json', async t => {
const body = JSON.stringify({key: 'value'});
const size = Buffer.byteLength(body);
const events = [];
await got.post(`${s.url}/upload`, {body})
.on('uploadProgress', e => events.push(e));
checkEvents(t, events, size);
});
test('upload progress - stream with known body size', async t => {
const events = [];
const options = {
headers: {'content-length': file.length}
};
const req = got.stream.post(`${s.url}/upload`, options)
.on('uploadProgress', e => events.push(e));
await getStream(intoStream(file).pipe(req));
checkEvents(t, events, file.length);
});
test('upload progress - stream with unknown body size', async t => {
const events = [];
const req = got.stream.post(`${s.url}/upload`)
.on('uploadProgress', e => events.push(e));
await getStream(intoStream(file).pipe(req));
checkEvents(t, events);
});
test('upload progress - no body', async t => {
const events = [];
await got.post(`${s.url}/upload`)
.on('uploadProgress', e => events.push(e));
t.deepEqual(events, [
{
percent: 0,
transferred: 0,
total: 0
},
{
percent: 1,
transferred: 0,
total: 0
}
]);
});
test.after('cleanup', async () => {
await s.close();
});
Loading…
Cancel
Save