Rework module structure and improve typings

This commit is contained in:
Sami Turcotte 2018-11-29 01:19:18 -05:00
parent 9694ee88e4
commit ce89d6df3e
8 changed files with 257 additions and 250 deletions

1
.gitignore vendored
View File

@ -2,3 +2,4 @@
node_modules
dist
sample_output
yarn-error.log

View File

@ -41,14 +41,12 @@ main();
## API
### { stream }
```ts
/**
* Convert an array into a readable stream of its elements
* @param array The array of elements to stream
*/
export declare function fromArray(array: any[]): NodeJS.ReadableStream;
fromArray(array: any[]): NodeJS.ReadableStream;
/**
* Return a ReadWrite stream that maps streamed chunks
@ -57,15 +55,9 @@ export declare function fromArray(array: any[]): NodeJS.ReadableStream;
* @param options.readableObjectMode Whether this stream should behave as a readable stream of objects
* @param options.writableObjectMode Whether this stream should behave as a writable stream of objects
*/
export declare function map<T, R>(
map<T, R>(
mapper: (chunk: T, encoding: string) => R,
{
readableObjectMode,
writableObjectMode,
}?: {
readableObjectMode?: boolean | undefined;
writableObjectMode?: boolean | undefined;
},
options?: ThroughOptions,
): NodeJS.ReadWriteStream;
/**
@ -75,49 +67,57 @@ export declare function map<T, R>(
* @param options.readableObjectMode Whether this stream should behave as a readable stream of objects
* @param options.writableObjectMode Whether this stream should behave as a writable stream of objects
*/
export declare function flatMap<T, R>(
flatMap<T, R>(
mapper:
| ((chunk: T, encoding: string) => R[])
| ((chunk: T, encoding: string) => Promise<R[]>),
{
readableObjectMode,
writableObjectMode,
}?: {
readableObjectMode?: boolean | undefined;
writableObjectMode?: boolean | undefined;
},
options?: ThroughOptions,
): NodeJS.ReadWriteStream;
/**
* Return a ReadWrite stream that splits streamed chunks using the given separator
* @param separator The separator to split by, defaulting to "\n"
*/
export declare function split(separator?: string): NodeJS.ReadWriteStream;
split(
separator?: string | RegExp,
): NodeJS.ReadWriteStream;
/**
* Return a ReadWrite stream that joins streamed chunks using the given separator
* @param separator The separator to join with
*/
export declare function join(separator: string): NodeJS.ReadWriteStream;
join(separator: string): NodeJS.ReadWriteStream;
/**
* Return a ReadWrite stream that collects streamed objects or bytes into an array or buffer
* Return a ReadWrite stream that collects streamed chunks into an array or buffer
* @param options
* @param options.objectMode Whether this stream should behave as a stream of objects
*/
export declare function collect({
objectMode,
}?: {
objectMode?: boolean | undefined;
}): NodeJS.ReadWriteStream;
collect(
options?: ReadableOptions,
): NodeJS.ReadWriteStream;
/**
* Return a stream of readable streams concatenated together
* @param streams The readable streams to concatenate
*/
export declare function concat(
concat(
...streams: NodeJS.ReadableStream[]
): NodeJS.ReadableStream;
```
### Interfaces
```ts
interface ReadableOptions {
objectMode?: boolean;
}
interface ThroughOptions {
readableObjectMode?: boolean;
writableObjectMode?: boolean;
}
```
### { utils }
@ -128,7 +128,7 @@ export declare function concat(
*
* @param ms The number of milliseconds to wait
*/
export declare function sleep(ms: number): Promise<{}>;
sleep(ms: number): Promise<{}>;
/**
* Resolve a value after the given delay in milliseconds
@ -136,7 +136,7 @@ export declare function sleep(ms: number): Promise<{}>;
* @param value Value to resolve
* @param ms Number of milliseconds to wait
*/
export declare function delay<T>(value: T, ms: number): Promise<T>;
delay<T>(value: T, ms: number): Promise<T>;
/**
* Resolve once the given event emitter emits the specified event
@ -144,7 +144,7 @@ export declare function delay<T>(value: T, ms: number): Promise<T>;
* @param emitter Event emitter to watch
* @param event Event to watch
*/
export declare function once<T>(
once<T>(
emitter: NodeJS.EventEmitter,
event: string,
): Promise<T>;

View File

@ -1,6 +1,6 @@
{
"name": "mhysa",
"version": "0.5.0-beta.0",
"version": "0.5.0-beta.1",
"description": "Promise, Stream and EventEmitter utils for Node.js",
"keywords": [
"promise",
@ -14,7 +14,7 @@
},
"license": "MIT",
"main": "dist/index.js",
"types": "**/*.d.ts",
"types": "dist/index.d.ts",
"files": [
"dist"
],

View File

@ -1,15 +1,8 @@
import test from "ava";
import { expect } from "chai";
import { Readable } from "stream";
import {
fromArray,
map,
flatMap,
split,
join,
collect,
concat,
} from "./stream";
import { fromArray, map, flatMap, split, join, collect, concat } from "./";
test.cb("fromArray() streams array elements in flowing mode", t => {
t.plan(3);
const elements = ["a", "b", "c"];

View File

@ -1,6 +1,217 @@
import * as utils from "./utils";
import * as stream from "./stream";
export = {
utils,
...stream,
import { Transform, Readable } from "stream";
import * as _utils from "./utils";
export const utils = _utils;
export interface ReadableOptions {
objectMode?: boolean;
}
export interface ThroughOptions {
readableObjectMode?: boolean;
writableObjectMode?: boolean;
}
/**
* Convert an array into a readable stream of its elements
* @param array The array of elements to stream
*/
export function fromArray(array: any[]): NodeJS.ReadableStream {
let cursor = 0;
return new Readable({
objectMode: true,
read() {
if (cursor < array.length) {
this.push(array[cursor]);
cursor++;
} else {
this.push(null);
}
},
});
}
/**
* Return a ReadWrite stream that maps streamed chunks
* @param mapper The mapper function, mapping each (chunk, encoding) to a new chunk (or a promise of such)
* @param options
* @param options.readableObjectMode Whether this stream should behave as a readable stream of objects
* @param options.writableObjectMode Whether this stream should behave as a writable stream of objects
*/
export function map<T, R>(
mapper: (chunk: T, encoding: string) => R,
options: ThroughOptions = {
readableObjectMode: true,
writableObjectMode: true,
},
): NodeJS.ReadWriteStream {
return new Transform({
...options,
async transform(chunk, encoding, callback) {
let isPromise = false;
try {
const mapped = mapper(chunk, encoding);
isPromise = mapped instanceof Promise;
callback(undefined, await mapped);
} catch (err) {
if (isPromise) {
// Calling the callback asynchronously with an error wouldn't emit the error, so emit directly
this.emit("error", err);
callback();
} else {
callback(err);
}
}
},
});
}
/**
* Return a ReadWrite stream that flat maps streamed chunks
* @param mapper The mapper function, mapping each (chunk, encoding) to an array of new chunks (or a promise of such)
* @param options
* @param options.readableObjectMode Whether this stream should behave as a readable stream of objects
* @param options.writableObjectMode Whether this stream should behave as a writable stream of objects
*/
export function flatMap<T, R>(
mapper:
| ((chunk: T, encoding: string) => R[])
| ((chunk: T, encoding: string) => Promise<R[]>),
options: ThroughOptions = {
readableObjectMode: true,
writableObjectMode: true,
},
): NodeJS.ReadWriteStream {
return new Transform({
...options,
async transform(chunk, encoding, callback) {
let isPromise = false;
try {
const mapped = mapper(chunk, encoding);
isPromise = mapped instanceof Promise;
(await mapped).forEach(c => this.push(c));
callback();
} catch (err) {
if (isPromise) {
// Calling the callback asynchronously with an error wouldn't emit the error, so emit directly
this.emit("error", err);
callback();
} else {
callback(err);
}
}
},
});
}
/**
* Return a ReadWrite stream that splits streamed chunks using the given separator
* @param separator The separator to split by, defaulting to "\n"
*/
export function split(
separator: string | RegExp = "\n",
): NodeJS.ReadWriteStream {
let buffered: string = "";
return new Transform({
readableObjectMode: true,
writableObjectMode: true,
async transform(chunk: string, encoding, callback) {
const splitted = chunk.split(separator);
if (buffered.length > 0 && splitted.length > 1) {
splitted[0] = buffered.concat(splitted[0]);
buffered = "";
}
buffered += splitted[splitted.length - 1];
splitted.slice(0, -1).forEach((part: string) => this.push(part));
callback();
},
flush(callback) {
callback(undefined, buffered);
},
});
}
/**
* Return a ReadWrite stream that joins streamed chunks using the given separator
* @param separator The separator to join with
*/
export function join(separator: string): NodeJS.ReadWriteStream {
let isFirstChunk = true;
return new Transform({
readableObjectMode: true,
writableObjectMode: true,
async transform(chunk, encoding, callback) {
if (!isFirstChunk) {
this.push(separator);
}
this.push(chunk);
isFirstChunk = false;
callback();
},
});
}
/**
* Return a ReadWrite stream that collects streamed chunks into an array or buffer
* @param options
* @param options.objectMode Whether this stream should behave as a stream of objects
*/
export function collect(
options: ReadableOptions = { objectMode: false },
): NodeJS.ReadWriteStream {
const collected: any[] = [];
return new Transform({
readableObjectMode: options.objectMode,
writableObjectMode: options.objectMode,
transform(data, encoding, callback) {
collected.push(data);
callback();
},
flush(callback) {
this.push(
options.objectMode ? collected : Buffer.concat(collected),
);
callback();
},
});
}
/**
* Return a stream of readable streams concatenated together
* @param streams The readable streams to concatenate
*/
export function concat(
...streams: NodeJS.ReadableStream[]
): NodeJS.ReadableStream {
let isStarted = false;
let currentStreamIndex = 0;
const startCurrentStream = () => {
if (currentStreamIndex >= streams.length) {
wrapper.push(null);
} else {
streams[currentStreamIndex]
.on("data", chunk => {
if (!wrapper.push(chunk)) {
streams[currentStreamIndex].pause();
}
})
.on("error", err => wrapper.emit("error", err))
.on("end", () => {
currentStreamIndex++;
startCurrentStream();
});
}
};
const wrapper = new Readable({
objectMode: true,
read() {
if (!isStarted) {
isStarted = true;
startCurrentStream();
}
if (currentStreamIndex < streams.length) {
streams[currentStreamIndex].resume();
}
},
});
return wrapper;
}

View File

@ -1,199 +0,0 @@
import { Transform, Readable } from "stream";
/**
* Convert an array into a readable stream of its elements
* @param array The array of elements to stream
*/
export function fromArray(array: any[]): NodeJS.ReadableStream {
let cursor = 0;
return new Readable({
objectMode: true,
read() {
if (cursor < array.length) {
this.push(array[cursor]);
cursor++;
} else {
this.push(null);
}
},
});
}
/**
* Return a ReadWrite stream that maps streamed chunks
* @param mapper The mapper function, mapping each (chunk, encoding) to a new chunk (or a promise of such)
* @param options
* @param options.readableObjectMode Whether this stream should behave as a readable stream of objects
* @param options.writableObjectMode Whether this stream should behave as a writable stream of objects
*/
export function map<T, R>(
mapper: (chunk: T, encoding: string) => R,
{ readableObjectMode = true, writableObjectMode = true } = {},
): NodeJS.ReadWriteStream {
return new Transform({
readableObjectMode,
writableObjectMode,
async transform(chunk, encoding, callback) {
let isPromise = false;
try {
const mapped = mapper(chunk, encoding);
isPromise = mapped instanceof Promise;
callback(undefined, await mapped);
} catch (err) {
if (isPromise) {
// Calling the callback asynchronously with an error wouldn't emit the error, so emit directly
this.emit("error", err);
callback();
} else {
callback(err);
}
}
},
});
}
/**
* Return a ReadWrite stream that flat maps streamed chunks
* @param mapper The mapper function, mapping each (chunk, encoding) to an array of new chunks (or a promise of such)
* @param options
* @param options.readableObjectMode Whether this stream should behave as a readable stream of objects
* @param options.writableObjectMode Whether this stream should behave as a writable stream of objects
*/
export function flatMap<T, R>(
mapper:
| ((chunk: T, encoding: string) => R[])
| ((chunk: T, encoding: string) => Promise<R[]>),
{ readableObjectMode = true, writableObjectMode = true } = {},
): NodeJS.ReadWriteStream {
return new Transform({
readableObjectMode,
writableObjectMode,
async transform(chunk, encoding, callback) {
let isPromise = false;
try {
const mapped = mapper(chunk, encoding);
isPromise = mapped instanceof Promise;
(await mapped).forEach(c => this.push(c));
callback();
} catch (err) {
if (isPromise) {
// Calling the callback asynchronously with an error wouldn't emit the error, so emit directly
this.emit("error", err);
callback();
} else {
callback(err);
}
}
},
});
}
/**
* Return a ReadWrite stream that splits streamed chunks using the given separator
* @param separator The separator to split by, defaulting to "\n"
*/
export function split(
separator: string | RegExp = "\n",
): NodeJS.ReadWriteStream {
let buffered: string = "";
return new Transform({
readableObjectMode: true,
writableObjectMode: true,
async transform(chunk: string, encoding, callback) {
const splitted = chunk.split(separator);
if (buffered.length > 0 && splitted.length > 1) {
splitted[0] = buffered.concat(splitted[0]);
buffered = "";
}
buffered += splitted[splitted.length - 1];
splitted.slice(0, -1).forEach((part: string) => this.push(part));
callback();
},
flush(callback) {
callback(undefined, buffered);
},
});
}
/**
* Return a ReadWrite stream that joins streamed chunks using the given separator
* @param separator The separator to join with
*/
export function join(separator: string): NodeJS.ReadWriteStream {
let isFirstChunk = true;
return new Transform({
readableObjectMode: true,
writableObjectMode: true,
async transform(chunk, encoding, callback) {
if (!isFirstChunk) {
this.push(separator);
}
this.push(chunk);
isFirstChunk = false;
callback();
},
});
}
/**
* Return a ReadWrite stream that collects streamed chunks into an array or buffer
* @param options
* @param options.objectMode Whether this stream should behave as a stream of objects
*/
export function collect({ objectMode = false } = {}): NodeJS.ReadWriteStream {
const collected: any[] = [];
return new Transform({
readableObjectMode: objectMode,
writableObjectMode: objectMode,
transform(data, encoding, callback) {
collected.push(data);
callback();
},
flush(callback) {
this.push(objectMode ? collected : Buffer.concat(collected));
callback();
},
});
}
/**
* Return a stream of readable streams concatenated together
* @param streams The readable streams to concatenate
*/
export function concat(
...streams: NodeJS.ReadableStream[]
): NodeJS.ReadableStream {
let isStarted = false;
let currentStreamIndex = 0;
const startCurrentStream = () => {
if (currentStreamIndex >= streams.length) {
wrapper.push(null);
} else {
streams[currentStreamIndex]
.on("data", chunk => {
if (!wrapper.push(chunk)) {
streams[currentStreamIndex].pause();
}
})
.on("error", err => wrapper.emit("error", err))
.on("end", () => {
currentStreamIndex++;
startCurrentStream();
});
}
};
const wrapper = new Readable({
objectMode: true,
read() {
if (!isStarted) {
isStarted = true;
startCurrentStream();
}
if (currentStreamIndex < streams.length) {
streams[currentStreamIndex].resume();
}
},
});
return wrapper;
}

View File

@ -10,7 +10,7 @@
"outDir": "./dist",
"module": "commonjs",
"target": "es5",
"lib": ["es2017"],
"lib": ["es2016"],
"sourceMap": true,
"declaration": true
},

View File

@ -8,6 +8,7 @@
"no-console": false,
"no-implicit-dependencies": [true, "dev"],
"prettier": true,
"ordered-imports": false
"ordered-imports": false,
"interface-name": false
}
}