Viewing File: /home/ubuntu/efiexchange-node-base/node_modules/stream-chain/index.js
'use strict';
const {Readable, Writable, Duplex, Transform} = require('stream');
const none = Symbol.for('object-stream.none');
const finalSymbol = Symbol.for('object-stream.final');
const manySymbol = Symbol.for('object-stream.many');
const final = value => ({[finalSymbol]: value});
const many = values => ({[manySymbol]: values});
const isFinal = o => o && typeof o == 'object' && finalSymbol in o;
const isMany = o => o && typeof o == 'object' && manySymbol in o;
const getFinalValue = o => o[finalSymbol];
const getManyValues = o => o[manySymbol];
const runAsyncGenerator = async (gen, stream) => {
for (;;) {
let data = gen.next();
if (data && typeof data.then == 'function') {
data = await data;
}
if (data.done) break;
let value = data.value;
if (value && typeof value.then == 'function') {
value = await value;
}
Chain.sanitize(value, stream);
}
};
const wrapFunction = fn =>
new Transform({
writableObjectMode: true,
readableObjectMode: true,
transform(chunk, encoding, callback) {
try {
const result = fn.call(this, chunk, encoding);
if (result && typeof result.then == 'function') {
// thenable
result.then(
result => (Chain.sanitize(result, this), callback(null)),
error => callback(error)
);
return;
}
if (result && typeof result.next == 'function') {
// generator
runAsyncGenerator(result, this).then(
() => callback(null),
error => callback(error)
);
return;
}
Chain.sanitize(result, this);
callback(null);
} catch (error) {
callback(error);
}
}
});
const wrapArray = fns =>
new Transform({
writableObjectMode: true,
readableObjectMode: true,
transform(chunk, encoding, callback) {
try {
let value = chunk;
for (let i = 0; i < fns.length; ++i) {
const result = fns[i].call(this, value, encoding);
if (result === Chain.none) {
callback(null);
return;
}
if (Chain.isFinal(result)) {
value = Chain.getFinalValue(result);
break;
}
value = result;
}
Chain.sanitize(value, this);
callback(null);
} catch (error) {
callback(error);
}
}
});
// is*NodeStream functions taken from https://github.com/nodejs/node/blob/master/lib/internal/streams/utils.js
const isReadableNodeStream = obj =>
obj &&
typeof obj.pipe === 'function' &&
typeof obj.on === 'function' &&
(!obj._writableState || (typeof obj._readableState === 'object' ? obj._readableState.readable : null) !== false) && // Duplex
(!obj._writableState || obj._readableState); // Writable has .pipe.
const isWritableNodeStream = obj =>
obj &&
typeof obj.write === 'function' &&
typeof obj.on === 'function' &&
(!obj._readableState || (typeof obj._writableState === 'object' ? obj._writableState.writable : null) !== false); // Duplex
const isDuplexNodeStream = obj =>
obj && typeof obj.pipe === 'function' && obj._readableState && typeof obj.on === 'function' && typeof obj.write === 'function';
class Chain extends Duplex {
constructor(fns, options) {
super(options || {writableObjectMode: true, readableObjectMode: true});
if (!(fns instanceof Array) || !fns.length) {
throw Error("Chain's argument should be a non-empty array.");
}
this.streams = fns
.filter(fn => fn)
.map((fn, index, fns) => {
if (typeof fn === 'function' || fn instanceof Array) return Chain.convertToTransform(fn);
if (isDuplexNodeStream(fn) || (!index && isReadableNodeStream(fn)) || (index === fns.length - 1 && isWritableNodeStream(fn))) {
return fn;
}
throw Error('Arguments should be functions, arrays or streams.');
})
.filter(s => s);
this.input = this.streams[0];
this.output = this.streams.reduce((output, stream) => (output && output.pipe(stream)) || stream);
if (!isWritableNodeStream(this.input)) {
this._write = (_1, _2, callback) => callback(null);
this._final = callback => callback(null); // unavailable in Node 6
this.input.on('end', () => this.end());
}
if (isReadableNodeStream(this.output)) {
this.output.on('data', chunk => !this.push(chunk) && this.output.pause());
this.output.on('end', () => this.push(null));
} else {
this._read = () => {}; // nop
this.resume();
this.output.on('finish', () => this.push(null));
}
// connect events
if (!options || !options.skipEvents) {
this.streams.forEach(stream => stream.on('error', error => this.emit('error', error)));
}
}
_write(chunk, encoding, callback) {
let error = null;
try {
this.input.write(chunk, encoding, e => callback(e || error));
} catch (e) {
error = e;
}
}
_final(callback) {
let error = null;
try {
this.input.end(null, null, e => callback(e || error));
} catch (e) {
error = e;
}
}
_read() {
this.output.resume();
}
static make(fns, options) {
return new Chain(fns, options);
}
static sanitize(result, stream) {
if (Chain.isFinal(result)) {
result = Chain.getFinalValue(result);
} else if (Chain.isMany(result)) {
result = Chain.getManyValues(result);
}
if (result !== undefined && result !== null && result !== Chain.none) {
if (result instanceof Array) {
result.forEach(value => value !== undefined && value !== null && stream.push(value));
} else {
stream.push(result);
}
}
}
static convertToTransform(fn) {
if (typeof fn === 'function') return wrapFunction(fn);
if (fn instanceof Array) return fn.length ? wrapArray(fn) : null;
return null;
}
}
Chain.none = none;
Chain.final = final;
Chain.isFinal = isFinal;
Chain.getFinalValue = getFinalValue;
Chain.many = many;
Chain.isMany = isMany;
Chain.getManyValues = getManyValues;
Chain.chain = Chain.make;
Chain.make.Constructor = Chain;
module.exports = Chain;
Back to Directory
File Manager