Add buffer support to split, join, replace and parse streams

This commit is contained in:
Sami Turcotte 2018-12-03 09:18:49 -05:00
parent 407bb79260
commit 63c3681211
4 changed files with 53 additions and 9 deletions

View File

@ -1,6 +1,6 @@
{ {
"name": "mhysa", "name": "mhysa",
"version": "1.0.0", "version": "1.0.1",
"description": "Streams and event emitter utils for Node.js", "description": "Streams and event emitter utils for Node.js",
"keywords": [ "keywords": [
"promise", "promise",

12
samples/split.js Normal file
View File

@ -0,0 +1,12 @@
const mhysa = require("mhysa");
let stream = mhysa.split(",");
const buf = Buffer.from("a,b,c");
stream.on("data", function(data) {
console.log(data);
});
for (let i = 0; i < buf.length; ++i) {
stream.write(buf.slice(i, i + 1));
}
stream.end();

View File

@ -495,6 +495,30 @@ test.cb("split() splits chunks using the specified separator", t => {
source.push(null); source.push(null);
}); });
test.cb(
"split() splits utf-8 encoded buffers using the specified separator",
t => {
t.plan(3);
const expectedElements = ["a", "b", "c"];
let i = 0;
const through = split(",");
const buf = Buffer.from("a,b,c");
through
.on("data", element => {
expect(element).to.equal(expectedElements[i]);
i++;
t.pass();
})
.on("error", t.end)
.on("end", t.end);
for (let j = 0; j < buf.length; ++j) {
through.write(buf.slice(j, j + 1));
}
through.end();
},
);
test.cb("join() joins chunks using the specified separator", t => { test.cb("join() joins chunks using the specified separator", t => {
t.plan(9); t.plan(9);
const source = new Readable({ objectMode: true }); const source = new Readable({ objectMode: true });

View File

@ -208,8 +208,10 @@ export function split(
return new Transform({ return new Transform({
readableObjectMode: true, readableObjectMode: true,
writableObjectMode: true, writableObjectMode: true,
async transform(chunk: string, encoding, callback) { transform(chunk: string | Buffer, encoding, callback) {
const splitted = chunk.split(separator); const asString =
chunk instanceof Buffer ? chunk.toString(encoding) : chunk;
const splitted = asString.split(separator);
if (buffered.length > 0 && splitted.length > 1) { if (buffered.length > 0 && splitted.length > 1) {
splitted[0] = buffered.concat(splitted[0]); splitted[0] = buffered.concat(splitted[0]);
buffered = ""; buffered = "";
@ -233,11 +235,13 @@ export function join(separator: string): NodeJS.ReadWriteStream {
return new Transform({ return new Transform({
readableObjectMode: true, readableObjectMode: true,
writableObjectMode: true, writableObjectMode: true,
async transform(chunk: string, encoding, callback) { async transform(chunk: string | Buffer, encoding, callback) {
const asString =
chunk instanceof Buffer ? chunk.toString(encoding) : chunk;
if (!isFirstChunk) { if (!isFirstChunk) {
this.push(separator); this.push(separator);
} }
this.push(chunk); this.push(asString);
isFirstChunk = false; isFirstChunk = false;
callback(); callback();
}, },
@ -257,8 +261,10 @@ export function replace(
return new Transform({ return new Transform({
readableObjectMode: true, readableObjectMode: true,
writableObjectMode: true, writableObjectMode: true,
transform(chunk: string, encoding, callback) { transform(chunk: string | Buffer, encoding, callback) {
callback(undefined, chunk.replace(searchValue, replaceValue)); const asString =
chunk instanceof Buffer ? chunk.toString(encoding) : chunk;
callback(undefined, asString.replace(searchValue, replaceValue));
}, },
}); });
} }
@ -270,10 +276,12 @@ export function parse(): NodeJS.ReadWriteStream {
return new Transform({ return new Transform({
readableObjectMode: true, readableObjectMode: true,
writableObjectMode: true, writableObjectMode: true,
async transform(chunk: string, encoding, callback) { async transform(chunk: string | Buffer, encoding, callback) {
try { try {
const asString =
chunk instanceof Buffer ? chunk.toString(encoding) : chunk;
// Using await causes parsing errors to be emitted // Using await causes parsing errors to be emitted
callback(undefined, await JSON.parse(chunk)); callback(undefined, await JSON.parse(asString));
} catch (err) { } catch (err) {
callback(err); callback(err);
} }