From 9af673f5233593ee3a04105e589042b29e5255a9 Mon Sep 17 00:00:00 2001 From: Sami Turcotte Date: Tue, 4 Dec 2018 00:01:10 -0500 Subject: [PATCH] Add support for multi-byte characters to split, join and replace streams --- src/index.spec.ts | 85 +++++++++++++++++++++++++++++++++++++++++++++-- src/index.ts | 75 +++++++++++++++++++++++++++-------------- 2 files changed, 132 insertions(+), 28 deletions(-) diff --git a/src/index.spec.ts b/src/index.spec.ts index 6066ae6..a15050d 100644 --- a/src/index.spec.ts +++ b/src/index.spec.ts @@ -479,7 +479,7 @@ test.cb("split() splits chunks using the specified separator", t => { let i = 0; source .pipe(split("|")) - .on("data", part => { + .on("data", (part: string) => { expect(part).to.equal(expectedParts[i]); t.pass(); i++; @@ -496,7 +496,7 @@ test.cb("split() splits chunks using the specified separator", t => { }); test.cb( - "split() splits utf-8 encoded buffers using the specified separator", + "split() splits utf8 encoded buffers using the specified separator", t => { t.plan(3); const expectedElements = ["a", "b", "c"]; @@ -519,6 +519,30 @@ test.cb( }, ); +test.cb( + "split() splits utf8 encoded buffers with multi-byte characters using the specified separator", + t => { + t.plan(3); + const expectedElements = ["一", "一", "一"]; + let i = 0; + const through = split(","); + const buf = Buffer.from("一,一,一"); // Those spaces are multi-byte utf8 characters (code: 4E00) + through + .on("data", element => { + expect(element).to.equal(expectedElements[i]); + i++; + t.pass(); + }) + .on("error", t.end) + .on("end", t.end); + + for (let j = 0; j < buf.length; ++j) { + through.write(buf.slice(j, j + 1)); + } + through.end(); + }, +); + test.cb("join() joins chunks using the specified separator", t => { t.plan(9); const source = new Readable({ objectMode: true }); @@ -542,6 +566,35 @@ test.cb("join() joins chunks using the specified separator", t => { source.push(null); }); +test.cb( + "join() joins chunks using the specified separator without breaking up multi-byte characters " + + "spanning multiple chunks", + t => { + t.plan(5); + const source = new Readable({ objectMode: true }); + const expectedParts = ["ø", "|", "ö", "|", "一"]; + let i = 0; + source + .pipe(join("|")) + .on("data", part => { + expect(part).to.equal(expectedParts[i]); + t.pass(); + i++; + }) + .on("error", t.end) + .on("end", t.end); + + source.push(Buffer.from("ø").slice(0, 1)); // 2-byte character spanning two chunks + source.push(Buffer.from("ø").slice(1, 2)); + source.push(Buffer.from("ö").slice(0, 1)); // 2-byte character spanning two chunks + source.push(Buffer.from("ö").slice(1, 2)); + source.push(Buffer.from("一").slice(0, 1)); // 3-byte character spanning three chunks + source.push(Buffer.from("一").slice(1, 2)); + source.push(Buffer.from("一").slice(2, 3)); + source.push(null); + }, +); + test.cb( "replace() replaces occurrences of the given string in the streamed elements with the specified " + "replacement string", @@ -592,6 +645,32 @@ test.cb( }, ); +test.cb( + "replace() replaces occurrences of the given multi-byte character even if it spans multiple chunks", + t => { + t.plan(3); + const source = new Readable({ objectMode: true }); + const expectedElements = ["ø", "O", "a"]; + let i = 0; + source + .pipe(replace("ö", "O")) + .on("data", part => { + expect(part).to.equal(expectedElements[i]); + t.pass(); + i++; + }) + .on("error", t.end) + .on("end", t.end); + + source.push(Buffer.from("ø").slice(0, 1)); // 2-byte character spanning two chunks + source.push(Buffer.from("ø").slice(1, 2)); + source.push(Buffer.from("ö").slice(0, 1)); // 2-byte character spanning two chunks + source.push(Buffer.from("ö").slice(1, 2)); + source.push("a"); + source.push(null); + }, +); + test.cb("parse() parses the streamed elements as JSON", t => { t.plan(3); const source = new Readable({ objectMode: true }); @@ -623,7 +702,7 @@ test.cb("parse() emits errors on invalid JSON", t => { .on("end", t.end); source.push("{}"); - source.push(""); + source.push({}); source.push([]); source.push(null); }); diff --git a/src/index.ts b/src/index.ts index 2147cc3..6da9256 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,5 +1,6 @@ import { Transform, Readable, Writable, Duplex } from "stream"; import { ChildProcess } from "child_process"; +import { StringDecoder } from "string_decoder"; export interface ThroughOptions { objectMode?: boolean; @@ -8,6 +9,9 @@ export interface TransformOptions { readableObjectMode?: boolean; writableObjectMode?: boolean; } +export interface WithEncoding { + encoding: string; +} /** * Convert an array into a Readable stream of its elements @@ -200,19 +204,22 @@ export function reduce( /** * Return a ReadWrite stream that splits streamed chunks using the given separator * @param separator Separator to split by, defaulting to "\n" + * @param options + * @param options.encoding Encoding written chunks are assumed to use */ export function split( separator: string | RegExp = "\n", + options: WithEncoding = { encoding: "utf8" }, ): NodeJS.ReadWriteStream { - let buffered: string = ""; + let buffered = ""; + const decoder = new StringDecoder(options.encoding); + return new Transform({ readableObjectMode: true, - writableObjectMode: true, - transform(chunk: string | Buffer, encoding, callback) { - const asString = - chunk instanceof Buffer ? chunk.toString(encoding) : chunk; + transform(chunk: Buffer, encoding, callback) { + const asString = decoder.write(chunk); const splitted = asString.split(separator); - if (buffered.length > 0 && splitted.length > 1) { + if (splitted.length > 1) { splitted[0] = buffered.concat(splitted[0]); buffered = ""; } @@ -221,7 +228,7 @@ export function split( callback(); }, flush(callback) { - callback(undefined, buffered); + callback(undefined, buffered + decoder.end()); }, }); } @@ -229,20 +236,27 @@ export function split( /** * Return a ReadWrite stream that joins streamed chunks using the given separator * @param separator Separator to join with + * @param options + * @param options.encoding Encoding written chunks are assumed to use */ -export function join(separator: string): NodeJS.ReadWriteStream { +export function join( + separator: string, + options: WithEncoding = { encoding: "utf8" }, +): NodeJS.ReadWriteStream { let isFirstChunk = true; + const decoder = new StringDecoder(options.encoding); return new Transform({ readableObjectMode: true, - writableObjectMode: true, - async transform(chunk: string | Buffer, encoding, callback) { - const asString = - chunk instanceof Buffer ? chunk.toString(encoding) : chunk; - if (!isFirstChunk) { - this.push(separator); + async transform(chunk: Buffer, encoding, callback) { + const asString = decoder.write(chunk); + // Take care not to break up multi-byte characters spanning multiple chunks + if (asString !== "" || chunk.length === 0) { + if (!isFirstChunk) { + this.push(separator); + } + this.push(asString); + isFirstChunk = false; } - this.push(asString); - isFirstChunk = false; callback(); }, }); @@ -253,33 +267,44 @@ export function join(separator: string): NodeJS.ReadWriteStream { * the streamed chunks with the specified replacement string * @param searchValue Search string to use * @param replaceValue Replacement string to use + * @param options + * @param options.encoding Encoding written chunks are assumed to use */ export function replace( searchValue: string | RegExp, replaceValue: string, + options: WithEncoding = { encoding: "utf8" }, ): NodeJS.ReadWriteStream { + const decoder = new StringDecoder(options.encoding); return new Transform({ readableObjectMode: true, - writableObjectMode: true, - transform(chunk: string | Buffer, encoding, callback) { - const asString = - chunk instanceof Buffer ? chunk.toString(encoding) : chunk; - callback(undefined, asString.replace(searchValue, replaceValue)); + transform(chunk: Buffer, encoding, callback) { + const asString = decoder.write(chunk); + // Take care not to break up multi-byte characters spanning multiple chunks + if (asString !== "" || chunk.length === 0) { + callback( + undefined, + asString.replace(searchValue, replaceValue), + ); + } else { + callback(); + } }, }); } /** - * Return a ReadWrite stream that parses the streamed chunks as JSON + * Return a ReadWrite stream that parses the streamed chunks as JSON. Each streamed chunk + * must be a fully defined JSON string. */ export function parse(): NodeJS.ReadWriteStream { + const decoder = new StringDecoder("utf8"); // JSON must be utf8 return new Transform({ readableObjectMode: true, writableObjectMode: true, - async transform(chunk: string | Buffer, encoding, callback) { + async transform(chunk: Buffer, encoding, callback) { try { - const asString = - chunk instanceof Buffer ? chunk.toString(encoding) : chunk; + const asString = decoder.write(chunk); // Using await causes parsing errors to be emitted callback(undefined, await JSON.parse(asString)); } catch (err) {