From ce89d6df3eff74161abb5b4d8536c835c4554add Mon Sep 17 00:00:00 2001 From: Sami Turcotte Date: Thu, 29 Nov 2018 01:19:18 -0500 Subject: [PATCH] Rework module structure and improve typings --- .gitignore | 3 +- README.md | 62 +++---- package.json | 4 +- src/{stream.spec.ts => index.spec.ts} | 11 +- src/index.ts | 223 +++++++++++++++++++++++++- src/stream.ts | 199 ----------------------- tsconfig.json | 2 +- tslint.json | 3 +- 8 files changed, 257 insertions(+), 250 deletions(-) rename src/{stream.spec.ts => index.spec.ts} (99%) delete mode 100644 src/stream.ts diff --git a/.gitignore b/.gitignore index da196ce..e14fcc3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .vscode node_modules dist -sample_output \ No newline at end of file +sample_output +yarn-error.log \ No newline at end of file diff --git a/README.md b/README.md index 655c9a4..27ee6d3 100644 --- a/README.md +++ b/README.md @@ -41,14 +41,12 @@ main(); ## API -### { stream } - ```ts /** * Convert an array into a readable stream of its elements * @param array The array of elements to stream */ -export declare function fromArray(array: any[]): NodeJS.ReadableStream; +fromArray(array: any[]): NodeJS.ReadableStream; /** * Return a ReadWrite stream that maps streamed chunks @@ -57,15 +55,9 @@ export declare function fromArray(array: any[]): NodeJS.ReadableStream; * @param options.readableObjectMode Whether this stream should behave as a readable stream of objects * @param options.writableObjectMode Whether this stream should behave as a writable stream of objects */ -export declare function map( +map( mapper: (chunk: T, encoding: string) => R, - { - readableObjectMode, - writableObjectMode, - }?: { - readableObjectMode?: boolean | undefined; - writableObjectMode?: boolean | undefined; - }, + options?: ThroughOptions, ): NodeJS.ReadWriteStream; /** @@ -75,49 +67,57 @@ export declare function map( * @param options.readableObjectMode Whether this stream should behave as a readable stream of objects * @param options.writableObjectMode Whether this stream should behave as a writable stream of objects */ -export declare function flatMap( +flatMap( mapper: | ((chunk: T, encoding: string) => R[]) | ((chunk: T, encoding: string) => Promise), - { - readableObjectMode, - writableObjectMode, - }?: { - readableObjectMode?: boolean | undefined; - writableObjectMode?: boolean | undefined; - }, + options?: ThroughOptions, ): NodeJS.ReadWriteStream; /** * Return a ReadWrite stream that splits streamed chunks using the given separator * @param separator The separator to split by, defaulting to "\n" */ -export declare function split(separator?: string): NodeJS.ReadWriteStream; +split( + separator?: string | RegExp, +): NodeJS.ReadWriteStream; /** * Return a ReadWrite stream that joins streamed chunks using the given separator * @param separator The separator to join with */ -export declare function join(separator: string): NodeJS.ReadWriteStream; +join(separator: string): NodeJS.ReadWriteStream; /** - * Return a ReadWrite stream that collects streamed objects or bytes into an array or buffer + * Return a ReadWrite stream that collects streamed chunks into an array or buffer * @param options * @param options.objectMode Whether this stream should behave as a stream of objects */ -export declare function collect({ - objectMode, -}?: { - objectMode?: boolean | undefined; -}): NodeJS.ReadWriteStream; +collect( + options?: ReadableOptions, +): NodeJS.ReadWriteStream; /** * Return a stream of readable streams concatenated together * @param streams The readable streams to concatenate */ -export declare function concat( +concat( ...streams: NodeJS.ReadableStream[] ): NodeJS.ReadableStream; + +``` + +### Interfaces + +```ts +interface ReadableOptions { + objectMode?: boolean; +} + +interface ThroughOptions { + readableObjectMode?: boolean; + writableObjectMode?: boolean; +} ``` ### { utils } @@ -128,7 +128,7 @@ export declare function concat( * * @param ms The number of milliseconds to wait */ -export declare function sleep(ms: number): Promise<{}>; +sleep(ms: number): Promise<{}>; /** * Resolve a value after the given delay in milliseconds @@ -136,7 +136,7 @@ export declare function sleep(ms: number): Promise<{}>; * @param value Value to resolve * @param ms Number of milliseconds to wait */ -export declare function delay(value: T, ms: number): Promise; +delay(value: T, ms: number): Promise; /** * Resolve once the given event emitter emits the specified event @@ -144,7 +144,7 @@ export declare function delay(value: T, ms: number): Promise; * @param emitter Event emitter to watch * @param event Event to watch */ -export declare function once( +once( emitter: NodeJS.EventEmitter, event: string, ): Promise; diff --git a/package.json b/package.json index c50c96a..55d8f48 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "mhysa", - "version": "0.5.0-beta.0", + "version": "0.5.0-beta.1", "description": "Promise, Stream and EventEmitter utils for Node.js", "keywords": [ "promise", @@ -14,7 +14,7 @@ }, "license": "MIT", "main": "dist/index.js", - "types": "**/*.d.ts", + "types": "dist/index.d.ts", "files": [ "dist" ], diff --git a/src/stream.spec.ts b/src/index.spec.ts similarity index 99% rename from src/stream.spec.ts rename to src/index.spec.ts index 1cb3f35..035667b 100644 --- a/src/stream.spec.ts +++ b/src/index.spec.ts @@ -1,15 +1,8 @@ import test from "ava"; import { expect } from "chai"; import { Readable } from "stream"; -import { - fromArray, - map, - flatMap, - split, - join, - collect, - concat, -} from "./stream"; +import { fromArray, map, flatMap, split, join, collect, concat } from "./"; + test.cb("fromArray() streams array elements in flowing mode", t => { t.plan(3); const elements = ["a", "b", "c"]; diff --git a/src/index.ts b/src/index.ts index ac82103..8eca646 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,6 +1,217 @@ -import * as utils from "./utils"; -import * as stream from "./stream"; -export = { - utils, - ...stream, -}; +import { Transform, Readable } from "stream"; +import * as _utils from "./utils"; +export const utils = _utils; + +export interface ReadableOptions { + objectMode?: boolean; +} +export interface ThroughOptions { + readableObjectMode?: boolean; + writableObjectMode?: boolean; +} + +/** + * Convert an array into a readable stream of its elements + * @param array The array of elements to stream + */ +export function fromArray(array: any[]): NodeJS.ReadableStream { + let cursor = 0; + return new Readable({ + objectMode: true, + read() { + if (cursor < array.length) { + this.push(array[cursor]); + cursor++; + } else { + this.push(null); + } + }, + }); +} + +/** + * Return a ReadWrite stream that maps streamed chunks + * @param mapper The mapper function, mapping each (chunk, encoding) to a new chunk (or a promise of such) + * @param options + * @param options.readableObjectMode Whether this stream should behave as a readable stream of objects + * @param options.writableObjectMode Whether this stream should behave as a writable stream of objects + */ +export function map( + mapper: (chunk: T, encoding: string) => R, + options: ThroughOptions = { + readableObjectMode: true, + writableObjectMode: true, + }, +): NodeJS.ReadWriteStream { + return new Transform({ + ...options, + async transform(chunk, encoding, callback) { + let isPromise = false; + try { + const mapped = mapper(chunk, encoding); + isPromise = mapped instanceof Promise; + callback(undefined, await mapped); + } catch (err) { + if (isPromise) { + // Calling the callback asynchronously with an error wouldn't emit the error, so emit directly + this.emit("error", err); + callback(); + } else { + callback(err); + } + } + }, + }); +} + +/** + * Return a ReadWrite stream that flat maps streamed chunks + * @param mapper The mapper function, mapping each (chunk, encoding) to an array of new chunks (or a promise of such) + * @param options + * @param options.readableObjectMode Whether this stream should behave as a readable stream of objects + * @param options.writableObjectMode Whether this stream should behave as a writable stream of objects + */ +export function flatMap( + mapper: + | ((chunk: T, encoding: string) => R[]) + | ((chunk: T, encoding: string) => Promise), + options: ThroughOptions = { + readableObjectMode: true, + writableObjectMode: true, + }, +): NodeJS.ReadWriteStream { + return new Transform({ + ...options, + async transform(chunk, encoding, callback) { + let isPromise = false; + try { + const mapped = mapper(chunk, encoding); + isPromise = mapped instanceof Promise; + (await mapped).forEach(c => this.push(c)); + callback(); + } catch (err) { + if (isPromise) { + // Calling the callback asynchronously with an error wouldn't emit the error, so emit directly + this.emit("error", err); + callback(); + } else { + callback(err); + } + } + }, + }); +} + +/** + * Return a ReadWrite stream that splits streamed chunks using the given separator + * @param separator The separator to split by, defaulting to "\n" + */ +export function split( + separator: string | RegExp = "\n", +): NodeJS.ReadWriteStream { + let buffered: string = ""; + return new Transform({ + readableObjectMode: true, + writableObjectMode: true, + async transform(chunk: string, encoding, callback) { + const splitted = chunk.split(separator); + if (buffered.length > 0 && splitted.length > 1) { + splitted[0] = buffered.concat(splitted[0]); + buffered = ""; + } + buffered += splitted[splitted.length - 1]; + splitted.slice(0, -1).forEach((part: string) => this.push(part)); + callback(); + }, + flush(callback) { + callback(undefined, buffered); + }, + }); +} + +/** + * Return a ReadWrite stream that joins streamed chunks using the given separator + * @param separator The separator to join with + */ +export function join(separator: string): NodeJS.ReadWriteStream { + let isFirstChunk = true; + return new Transform({ + readableObjectMode: true, + writableObjectMode: true, + async transform(chunk, encoding, callback) { + if (!isFirstChunk) { + this.push(separator); + } + this.push(chunk); + isFirstChunk = false; + callback(); + }, + }); +} + +/** + * Return a ReadWrite stream that collects streamed chunks into an array or buffer + * @param options + * @param options.objectMode Whether this stream should behave as a stream of objects + */ +export function collect( + options: ReadableOptions = { objectMode: false }, +): NodeJS.ReadWriteStream { + const collected: any[] = []; + return new Transform({ + readableObjectMode: options.objectMode, + writableObjectMode: options.objectMode, + transform(data, encoding, callback) { + collected.push(data); + callback(); + }, + flush(callback) { + this.push( + options.objectMode ? collected : Buffer.concat(collected), + ); + callback(); + }, + }); +} + +/** + * Return a stream of readable streams concatenated together + * @param streams The readable streams to concatenate + */ +export function concat( + ...streams: NodeJS.ReadableStream[] +): NodeJS.ReadableStream { + let isStarted = false; + let currentStreamIndex = 0; + const startCurrentStream = () => { + if (currentStreamIndex >= streams.length) { + wrapper.push(null); + } else { + streams[currentStreamIndex] + .on("data", chunk => { + if (!wrapper.push(chunk)) { + streams[currentStreamIndex].pause(); + } + }) + .on("error", err => wrapper.emit("error", err)) + .on("end", () => { + currentStreamIndex++; + startCurrentStream(); + }); + } + }; + + const wrapper = new Readable({ + objectMode: true, + read() { + if (!isStarted) { + isStarted = true; + startCurrentStream(); + } + if (currentStreamIndex < streams.length) { + streams[currentStreamIndex].resume(); + } + }, + }); + return wrapper; +} diff --git a/src/stream.ts b/src/stream.ts deleted file mode 100644 index 9b9e2e3..0000000 --- a/src/stream.ts +++ /dev/null @@ -1,199 +0,0 @@ -import { Transform, Readable } from "stream"; - -/** - * Convert an array into a readable stream of its elements - * @param array The array of elements to stream - */ -export function fromArray(array: any[]): NodeJS.ReadableStream { - let cursor = 0; - return new Readable({ - objectMode: true, - read() { - if (cursor < array.length) { - this.push(array[cursor]); - cursor++; - } else { - this.push(null); - } - }, - }); -} - -/** - * Return a ReadWrite stream that maps streamed chunks - * @param mapper The mapper function, mapping each (chunk, encoding) to a new chunk (or a promise of such) - * @param options - * @param options.readableObjectMode Whether this stream should behave as a readable stream of objects - * @param options.writableObjectMode Whether this stream should behave as a writable stream of objects - */ -export function map( - mapper: (chunk: T, encoding: string) => R, - { readableObjectMode = true, writableObjectMode = true } = {}, -): NodeJS.ReadWriteStream { - return new Transform({ - readableObjectMode, - writableObjectMode, - async transform(chunk, encoding, callback) { - let isPromise = false; - try { - const mapped = mapper(chunk, encoding); - isPromise = mapped instanceof Promise; - callback(undefined, await mapped); - } catch (err) { - if (isPromise) { - // Calling the callback asynchronously with an error wouldn't emit the error, so emit directly - this.emit("error", err); - callback(); - } else { - callback(err); - } - } - }, - }); -} - -/** - * Return a ReadWrite stream that flat maps streamed chunks - * @param mapper The mapper function, mapping each (chunk, encoding) to an array of new chunks (or a promise of such) - * @param options - * @param options.readableObjectMode Whether this stream should behave as a readable stream of objects - * @param options.writableObjectMode Whether this stream should behave as a writable stream of objects - */ -export function flatMap( - mapper: - | ((chunk: T, encoding: string) => R[]) - | ((chunk: T, encoding: string) => Promise), - { readableObjectMode = true, writableObjectMode = true } = {}, -): NodeJS.ReadWriteStream { - return new Transform({ - readableObjectMode, - writableObjectMode, - async transform(chunk, encoding, callback) { - let isPromise = false; - try { - const mapped = mapper(chunk, encoding); - isPromise = mapped instanceof Promise; - (await mapped).forEach(c => this.push(c)); - callback(); - } catch (err) { - if (isPromise) { - // Calling the callback asynchronously with an error wouldn't emit the error, so emit directly - this.emit("error", err); - callback(); - } else { - callback(err); - } - } - }, - }); -} - -/** - * Return a ReadWrite stream that splits streamed chunks using the given separator - * @param separator The separator to split by, defaulting to "\n" - */ -export function split( - separator: string | RegExp = "\n", -): NodeJS.ReadWriteStream { - let buffered: string = ""; - return new Transform({ - readableObjectMode: true, - writableObjectMode: true, - async transform(chunk: string, encoding, callback) { - const splitted = chunk.split(separator); - if (buffered.length > 0 && splitted.length > 1) { - splitted[0] = buffered.concat(splitted[0]); - buffered = ""; - } - buffered += splitted[splitted.length - 1]; - splitted.slice(0, -1).forEach((part: string) => this.push(part)); - callback(); - }, - flush(callback) { - callback(undefined, buffered); - }, - }); -} - -/** - * Return a ReadWrite stream that joins streamed chunks using the given separator - * @param separator The separator to join with - */ -export function join(separator: string): NodeJS.ReadWriteStream { - let isFirstChunk = true; - return new Transform({ - readableObjectMode: true, - writableObjectMode: true, - async transform(chunk, encoding, callback) { - if (!isFirstChunk) { - this.push(separator); - } - this.push(chunk); - isFirstChunk = false; - callback(); - }, - }); -} - -/** - * Return a ReadWrite stream that collects streamed chunks into an array or buffer - * @param options - * @param options.objectMode Whether this stream should behave as a stream of objects - */ -export function collect({ objectMode = false } = {}): NodeJS.ReadWriteStream { - const collected: any[] = []; - return new Transform({ - readableObjectMode: objectMode, - writableObjectMode: objectMode, - transform(data, encoding, callback) { - collected.push(data); - callback(); - }, - flush(callback) { - this.push(objectMode ? collected : Buffer.concat(collected)); - callback(); - }, - }); -} - -/** - * Return a stream of readable streams concatenated together - * @param streams The readable streams to concatenate - */ -export function concat( - ...streams: NodeJS.ReadableStream[] -): NodeJS.ReadableStream { - let isStarted = false; - let currentStreamIndex = 0; - const startCurrentStream = () => { - if (currentStreamIndex >= streams.length) { - wrapper.push(null); - } else { - streams[currentStreamIndex] - .on("data", chunk => { - if (!wrapper.push(chunk)) { - streams[currentStreamIndex].pause(); - } - }) - .on("error", err => wrapper.emit("error", err)) - .on("end", () => { - currentStreamIndex++; - startCurrentStream(); - }); - } - }; - - const wrapper = new Readable({ - objectMode: true, - read() { - if (!isStarted) { - isStarted = true; - startCurrentStream(); - } - if (currentStreamIndex < streams.length) { - streams[currentStreamIndex].resume(); - } - }, - }); - return wrapper; -} diff --git a/tsconfig.json b/tsconfig.json index 5c40fb3..8df64fa 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -10,7 +10,7 @@ "outDir": "./dist", "module": "commonjs", "target": "es5", - "lib": ["es2017"], + "lib": ["es2016"], "sourceMap": true, "declaration": true }, diff --git a/tslint.json b/tslint.json index 97f5194..b1c37a5 100644 --- a/tslint.json +++ b/tslint.json @@ -8,6 +8,7 @@ "no-console": false, "no-implicit-dependencies": [true, "dev"], "prettier": true, - "ordered-imports": false + "ordered-imports": false, + "interface-name": false } }