A transform stream that collects chunks and passes them on as batches.
Let's say you want to stream documents from a MongoDB collection to ElasticSearch. For every document in the collection, the write stream will emit an event that you use to call ElasticSearch index function. That means that 100.000 documents will result in 100.000 API calls.
The ElasticSearch library has a function to bulk index, but since the stream emits a write for each document, we cannot group multiple index operations together.
The batch2 transform stream can help by buffering the chunks/docs and passing them on as batches. For example, we can now create batches of 500 docs each and reduce the number of API calls to ElasticSearch from 100.000 to 200, which will improve speed.
mongoReadStream()
.pipe(batch2({ size: 5 }) // transforms multiple chunks (mongo docs) to [chunk, chunk, chunk, chunk, chunk]
.pipe(transformToElasticBulkOperation())
.pipe(elasticWriteStream())
$ npm install batch2
batch2([options], [transformFunction])
Consult the stream.Transform documentation for the exact rules of the transformFunction
(i.e. this._transform
) and the optional flushFunction
(i.e. this._flush
).
The options argument is optional and is passed straight through to stream.Transform
. So you can use objectMode:true
if you are processing non-binary streams (or just use batch2.obj()
).
The transformFunction
must have the following signature: function (chunk, encoding, callback) {}
. A minimal implementation should call the callback
function to indicate that the transformation is done, even if that transformation means discarding the chunk.
To queue a new chunk, call this.push(chunk)
—this can be called as many times as required before the callback()
if you have multiple pieces to send on.
Alternatively, you may use callback(err, chunk)
as shorthand for emitting a single chunk or an error.
If you do not provide a transformFunction
then you will get a simple pass-through stream.
If you would like to help out with some code, check the details.
Not a coder, but still want to support? Have a look at the options available to donate.
Licensed under MIT.