The Missing Guide on Node.js Stream Part 1: Pipe, Pipeline and Writing Transform

NodeJS Jul 14, 2023
credit: labmanager.com/study-identifies-gaps-in-monitoring-of-streams-27990

Simplest Example

Save image above to some directory copy the code and run code below

// index.js in the same image.png directory

import zlib from 'zlib'
import { pipeline } from 'stream'
import fs from 'fs'

const src = fs.createReadStream('image.png')
const dest = fs.createWriteStream('image.png.gz')

src.pipe(zlib.createGzip()).pipe(dest)


Most Common Operation: Transform

Normally the Readable stream and Writable stream in this mode is obtained from other source e.g., network, file, database and other.

So the most common things you might need to do is write the intermediate, Transform to do something with the stream e.g., collect cache, log, count etc.

Below is the simplest way to implement the Transform stream

const logger = new Transform({
  transform(chunk, encoding, callback) {
    // callback this must be call to continue
    console.log("transform", chunk);
    callback(null, chunk);
  },
});

Not that if you are not familiar with shorthand two below is the same

transform(chunk, encoding, callback) {}
transform: function(chunk, encoding, callback) {}
// SAME

You can have variable inside this function by adding the construct override.

Also adding try-catch to make the error passable by callback(error) and callback(null) if succeed.

Now the code turned to

const loggerAndCounter = new Transform({
  construct(callback) {
    this.counter = 0;
    callback(null);
  },
  transform(chunk, encoding, callback) {
    try {
      // callback this must be call to continue
      console.log("transform", this.counter++, chunk);
      callback(null, chunk);
    } catch (error) {
      callback(error);
    }
  },
});

Then we can consume this with pipe as below

src.pipe(zlib.createGzip()).pipe(loggerAndCounter).pipe(dest)

At the terminal you can see the console is loggin like below

transform 0 <Buffer 1f 8b 08 ... 16334 more bytes>
transform 1 <Buffer 2c 99 be ... 16334 more bytes>
transform 2 <Buffer 8e c2 82 ... 16334 more bytes>
...

Pipe VS Pipeline Original VS Pipeline Promises

You can pipe the stream with default pipe but if you want good error handling option you would better use pipeline

And there is two types of pipeline

2.1) Pipeline with callback — the default from stream

2.2) Pipeline without callback — the subset of stream/promises

// OPTION 1
readable.pipe(transform).pipe(writable)

// OPTION 2
import { pipeline } from 'stream/promises'
pipeline(readable, transform, writable)

// OPTION 3
import { pipeline } from 'stream'
pipeline(readable, transform, writable, (err) => {
  if(err) // handle error
  console.log('Callback function is needed for this api')
})

Otherwise you need to attach on('error') to each step

readable.on('error', () => {...}
.pipe(transform)
.on('error', () => {...}
.pipe(writable)
.on('error', () => {...}

This is hard to read so pipeline is prefer.

Bonus: Composing pipeline

You can also pipelining the pipeline

See the example like below

src.pipe(zlib.createGzip()).pipe(loggerAndCounter).pipe(dest);

This can be breakable into two pipeline

pipeline command return as stream so if it is readable you can read from it
const logError = (err) => { console.log(err) }
const returnedReadable = pipeline(src, zlib.createGzip(), logError)
pipeline(returnedReadable, loggerAndCounter, dest, logError)

// Note that I use pipeline from 'stream'
// So you need the final callback as function

Or mixing as pipe is also possible

const logError = (err) => { console.log(err) }
const returnedReadable = pipeline(src, zlib.createGzip(), logError)
returnedReadable.pipe(loggerAndCounter).pipe(dest)

Hope this help !

Tags

TeamCMD

We are CODEMONDAY team and provide a variety of content about Business , technology, and Programming. Let's enjoy it with us.