The Missing Guide on Node.js Stream Part 1: Pipe, Pipeline and Writing Transform
Simplest Example
Save image above to some directory copy the code and run code below
// index.js in the same image.png directory
import zlib from 'zlib'
import { pipeline } from 'stream'
import fs from 'fs'
const src = fs.createReadStream('image.png')
const dest = fs.createWriteStream('image.png.gz')
src.pipe(zlib.createGzip()).pipe(dest)
Most Common Operation: Transform
Normally the Readable
stream and Writable
stream in this mode is obtained from other source e.g., network, file, database and other.
So the most common things you might need to do is write the intermediate, Transform to do something with the stream e.g., collect cache, log, count etc.
Below is the simplest way to implement the Transform
stream
const logger = new Transform({
transform(chunk, encoding, callback) {
// callback this must be call to continue
console.log("transform", chunk);
callback(null, chunk);
},
});
Not that if you are not familiar with shorthand two below is the same
transform(chunk, encoding, callback) {}
transform: function(chunk, encoding, callback) {}
// SAME
You can have variable inside this function by adding the construct
override.
Also adding try-catch
to make the error passable by callback(error)
and callback(null)
if succeed.
Now the code turned to
const loggerAndCounter = new Transform({
construct(callback) {
this.counter = 0;
callback(null);
},
transform(chunk, encoding, callback) {
try {
// callback this must be call to continue
console.log("transform", this.counter++, chunk);
callback(null, chunk);
} catch (error) {
callback(error);
}
},
});
Then we can consume this with pipe
as below
src.pipe(zlib.createGzip()).pipe(loggerAndCounter).pipe(dest)
At the terminal you can see the console is loggin like below
transform 0 <Buffer 1f 8b 08 ... 16334 more bytes>
transform 1 <Buffer 2c 99 be ... 16334 more bytes>
transform 2 <Buffer 8e c2 82 ... 16334 more bytes>
...
Pipe VS Pipeline Original VS Pipeline Promises
You can pipe the stream with default pipe
but if you want good error handling option you would better use pipeline
And there is two types of pipeline
2.1) Pipeline with callback — the default from stream
2.2) Pipeline without callback — the subset of stream/promises
// OPTION 1
readable.pipe(transform).pipe(writable)
// OPTION 2
import { pipeline } from 'stream/promises'
pipeline(readable, transform, writable)
// OPTION 3
import { pipeline } from 'stream'
pipeline(readable, transform, writable, (err) => {
if(err) // handle error
console.log('Callback function is needed for this api')
})
Otherwise you need to attach on('error')
to each step
readable.on('error', () => {...}
.pipe(transform)
.on('error', () => {...}
.pipe(writable)
.on('error', () => {...}
This is hard to read so pipeline
is prefer.
Bonus: Composing pipeline
You can also pipelining the pipeline
See the example like below
src.pipe(zlib.createGzip()).pipe(loggerAndCounter).pipe(dest);
This can be breakable into two pipeline
pipeline
command return asstream
so if it is readable you can read from it
const logError = (err) => { console.log(err) }
const returnedReadable = pipeline(src, zlib.createGzip(), logError)
pipeline(returnedReadable, loggerAndCounter, dest, logError)
// Note that I use pipeline from 'stream'
// So you need the final callback as function
Or mixing as pipe
is also possible
const logError = (err) => { console.log(err) }
const returnedReadable = pipeline(src, zlib.createGzip(), logError)
returnedReadable.pipe(loggerAndCounter).pipe(dest)
Hope this help !