Published Mar 10, 2017

Streams: My Favorite Tool

Streams: My Favorite Tool

Before we talk about why Streams is my favorite tool in the world, let's look at a couple of scenarios.

Scenario

Problem: Help! I need to load a 300 GB file process it then insert each item into SQL!

Solution: Streams

Scenario 2

Problem: I mapped over an array about 300 times and need to return the final result

Solution: Streams

Scenario 3

Problem: I'm looking to iterate over a buffer for which I don't know when I'll get the next item. And I don't particularly want to create some crazy logic tree to wait for it

Solution: Streams

Scenario 4

Problem: I have a series of mapping and write functions that I need to run on an endless generator function, array and an eventemitter

Solution: Streams

I think you get my point. In all the scenarios I've described, Streams can be the solution.

Streams is one of the things that I can't live without. The pipe method is a perfect example of how Stream does things right.

//streams at their prime
var rawData = createRawStream();
var digestRaw = createRawToJSValues();
var handleData = createJSONMapper();
var prepareForOutput = createOutputPreparation();
var output = createWriteStream();

rawData.pipe(digestRaw).pipe(handleData).pipe(prepareForOutput).pipe(output);

A little History

Here's the Wikipedia page on Stream Processing Paradigm. In the Wiki page, the authors mentioned GPU and other high intensity processing. While that is an understandable reason for why people might use Streams, it's hard to track down why node.js chose to implement their FS implementations as Streams. I suspect the reason is heavily influenced by unix pipes, since it uses the pipe syntax.

Pipe syntax can be fun to play with sometimes, but is mostly quite useless unless you're just experimenting with it. For browsers, one would assume that the closest implementation has been mediastreams. However, they have not been standardized. Web Audio API, while not using node.js Stream API, is certainly implementing a similar one. Their web API uses AudioSourceNode and AudioDestinationNode instead of ReadStreams and WriteStreams. Hopefully we will have a standardized Streams available in the browser.

Subtle Aspects

Push and Pull Streams Sources

There are two main forms of Stream sources. The Pull Source allows Stream consumers (write or transform) to retrieve items as they need them. The push Stream sends data down the pipe without care. Even though in most cases, this wouldn't be important to you, if you start getting into situations where backpressure is a problem, or when being "live" is more important than ensuring "success," it will soon become apparent what these two are for.

Processing as you need it

Fs.createReadStream is a great example for processing items as you need. Here, we can directly specify how many bytes should be skipped and how to use the documented read method to take chunks at a time. Its important to note that the read is not recomended. However, this sort of processing can be done in alternative ways. If you have a a source that allows you to read chunks at a time, through a method, here is a way to implement it.

const Readable = require('stream').Readable;

class OnDemandMinimalChunks extends Readable {
  constructor(chunkSize, readFn, initialOffset, limit) {
    super({ allowHalfOpen: true });
    this.chunkSize = chunkSize;
    this.offset = initialOffset;
    this.limit = limit;
    this.readFn = readDn;
    this.isReading = false;
  }
  _read(size) {
    if(this.isReading){
      return false;
    }
    this.isReading = true;
    readFn(
      this.offset,
      Math.min(this.offset + this.chunkSize, this.limit)
    ).then((value) =>{
      this.push(value);
      this.offset += this.chunkSize;
      if(this.offset >= this.limit){
        this.push(null);
      }
      this.isReading = false;
    });
  }
}

Merging multiple sources

This may sound like a cute gimmick, but it has very real possibilities. A Merged Stream can act as a collective logger, as game event emitter, or event join diffs of a buffered string from multiple sources. It's tempting to write off a merged Stream as ludicrous, but I'm confident you'll understand when you need it. Though merging everything at once may be fine, you may also be interested in ordered merges.

Ensuring no data is lost

Since we don't know when the readstream will begin, if you're not using it syncronously, generally it'd be a good idea to pipe it to a passthrough, especially if you're creating a consumer asyncronously. While the readstream does buffer, you may need to pause it in order to prevent any loss. I have personally run into issues where initial data was lost.

Heres an example of what I have implemented in the past

const Duplex = require('stream').Duplex;

class StreamBuffer extends Duplex {
  constructor() {
    super({ allowHalfOpen: true });
    this.bufferedData = Buffer.alloc(0);
    this.waiting = false;
  }
  _write(chunk, encoding, callback) {
    var leftover = false;
    if(this.waiting !== false){
      if(this.waiting > chunk.size()){
        this.push(Buffer.from(chunk, encoding));
      } else {
        var temp = Buffer.from(chunk, encoding);
        this.push(temp.slice(0, this.waiting))
        leftover = temp.slice(this.waiting);
      }
    } else {
      leftover = Buffer.from(chunk, encoding)
    }

    if(leftover !== false) {
      this.bufferedData = Buffer.concat(
        [this.buffer, leftover],
        [this.buffer.length + leftover.length]
      );
    }
    callback();
  }
  _flush(cb){
    this.hasEnded = true;
    if(this.waiting !== false){
    	return this.push(null);
    }
    cb();
  }
  _read(size) {
    if(this.bufferedData.length === 0){
      if(this.hasEnded){
        return this.push(null);
      }
      this.waiting += size
      return false;
    }
    if(size >= this.bufferedData.length){
      this.waiting = size - this.bufferedData.length;
      this.push(this.bufferedData);
      this.bufferedData = Buffer.alloc(0);
      return false;
    }
    this.push(this.bufferedData.slice(0, size));
    this.bufferedData = this.bufferedData.slice(size);
  }
}

Versitility of the duplex

A duplex is both a readstream and a write stream. Sure, that alone sounds awesome and dandy but the possibilities aren't limited to "transform streams".

Only processing the "latest"

First off, you will need to "itemize" your Streams so you can know how late is too late. But transforming data doesn't just mean mapping it or filtering it. It can also mean flow control. This includes LIFO as well as only processing the latest found. You can adapt it to become multiple

const Duplex = require('stream').Duplex;

class OnlyLatestBuffer extends Duplex {
  constructor(limit, fastForward) {
    super({ allowHalfOpen: true });
    limit = limit || 1;
    this.limit = limit;
    this.bufferedItems = [];
    this.waiting = false;
  }
  _write(chunk, encoding, callback) {
    if(this.waiting !== false){
      this.waiting = false;
      this.push(chunk);
    }else{
      if(this.bufferedItems.length === this.limit){
        this.bufferedItems.shift();
      }
      this.bufferedItems.push(chunk);
    }
    callback();
  }
  _flush(cb){
    this.hasEnded = true;
    if(this.waiting !== false){
    	this.push(null);
    }
    cb();
  }
  _read(size) {
    if(this.bufferedItems.length === 0){
    	if(this.hasEnded){
        	return this.push(null);
        }
    	this.waiting = true;
        return false;
    }
  	this.push(this.bufferedItems.shift());
  }
}

IO

Often times, a duplex will act as an IO to an external source — a TCP connection is a great example. My favorite websocket library is web-driver-node simply because it isn't a blackbox server and you don't have to just wrap it around the body and socket in a constructor. Instead, you pipe all the TCP data to the websocket driver, which transforms and emits data as message events. It also transforms and sends all the data down the socket you direct it to send to. I especially appreciate the purity in the implementation, even if it's not the fastest.

var http = require('http'),
    websocket = require('websocket-driver');

var server = http.createServer();

server.on('upgrade', function(request, socket, body) {
  if (!websocket.isWebSocket(request)) return;

  var driver = websocket.http(request);
  driver.io.write(body);
  socket.pipe(driver.io).pipe(socket);

  driver.messages.on('data', function(message) {
    console.log('Got a message', message);
  });

  driver.start();
});

Transform Handle

While most transforms are implemented with the expectation that they will happen in the context they were created and run in, it's possible to actually let transforms handle other APIs or even configurations. A simple example is the transform api. Here, you can call this.push(item) and/or callback any time. You can also run the function offsite through an AJAX call.

new Transform({
  transform(chunk, encoding, callback){
    fetch("domain.com/some/api", { method: post, body: chunk })
    .then(function(resp){
      if(resp.status !== 200){
        return Promise.resolve(resp.string()).then(function(str){
          throw str;
        })
      }
    return Promise.resolve(resp.string())
  }).then(callback.bind(void 0, void 0), callback);
  }
})

This can also be reapplied to target a series of workers or even piping transform on demand, i.e. with node red.

Alias Repiping

Another neat thing you can do with transforms is that they can pipe results back to itself until there are results that can't be repiped. An implementation for this would be an alias system. In an alias system, "some name" might be an alias for "closer to real", which may be an alias for "/some/rediculous/path". However, if you are simply piping it twice or three times, you would be missing out on all the possible functionalities.

var alias_map = {};
new Transform({
  readableObjectMode: true, writableObjectMode: true,
  transform(chunk, encoding, callback){
    if(chunk in alias_map){
      this.write(chunk);
    } else{
      this.push(chunk)
    }
    callback();
  }
})

Writestreams

Promising A Finish Line

While stream-to-promise handles both write streams and read streams, you might only be interested in the way write stream handles it (unless you don't like Streams at all...but I hope you'll fall in love with it! 😉). This makes it possible for promises and streams to coexist.

getReadableStream().then(function(readable){
  readable.pipe(transform).pipe(transform).pipe(writable)
  return streamToPromise(writable);
}).then(function(){
  console.log("done");
})

Streamed UI

React is certainly a wonderful system. However, in order to use Streams with React, your best bet is to use Redux. Otherwise, you can also turn your written stream into an updatable list, which emits an event each write and unpipes and ends on component did unmount.

// How it should be
class ComponentWritable extends Writable {
  constructor(component, key){
    super({ objectMode: true });
    this.component = component;
    this.key = key;
    this.list = [];
    component.on("willUnmount", ()=>{
      this.end();
    });
    component.on("willMount", ()=>{
    this.component.setState(this.createState())
    });
  }
  _write(chunk, encoding, callback){
    this.list = this.list.concat([ chunk ]);
    this.component.on("didUpdate", function(){
      setTimeout(callback, 0);
    });
    this.component.setState(this.createState());
  }
  createState(){
    return {
      [this.key]: this.list
    }
  }
}

// how it be

function mountWritable(component, stateKey){
  if(!component.writableStreams){
    component.writableStreams = {};
    }
    component.writableStreams[stateKey] = new Writable({
      objectMode: true,
      write: (chunk, encoding, callback)=>{
        this.setState({
          [stateKey]: component.writableStreams[stateKey].val.concat([ chunk ])
        });
        component.writableStreams[stateKey].cb = callback;
      }
    );
    component.writableStreams[stateKey].val = [];
    component.writableStreams[stateKey].cb = false;
}

function handleUnmount(component){
  component.writableStreams &&
  Object.keys(component.writableStreams).forEach(function(key){
    var stream = component.writableStreams[key]
  	var cb = stream.cb;
    stream.cb = false;
  cb && cb();
  });
}

function handleUpdate(component){
  component.writableStreams &&
  Object.keys(component.writableStreams).forEach(function(key){
    var stream = component.writableStreams[key]
    stream.end();
    delete component.writableStreams[key];
  });
}

class WritableComponent extends Component {
  componentWillMount(){
    var stateKey = "some-key";
    mountWritable(this, stateKey);
  }
  componentWillUnmount(){
    handleUnmount(this);
    
  }
  componentDidUpdate(){
    handleUpdate(this)
  }
}

Keeping it simple

RxJS

This seems pretty standard for Angular. While I haven't looked into it too deeply, it is on my list of "standards-to-learn."

Highland Js

Highland has been in development for a long time; however, I haven't decided to bite the bullet and use it to replace Streams.

Through2

While creating Streams is fun and all, keeping things clean without creating classes is often difficult. Through2 allows you to make things fast and easy.

Events Stream

This is a library that transforms a what-wg EventDispatcher into readable Streams.