打造自己的 Node.js Transform Stream

熟悉並學習實作 Node.js Stream,在 Node.js 開發者生涯裡是一件很重要的事,尤其在資料處理的工作上更是需要運用 Stream。在這些應該用的情境下,若不懂得使用 Stream,我們所開發出來的程式其執行效能及穩定性會相當令人擔心。

而如果你從未自己實作過 Stream,從 Transform Stream 開始入手是一個好選擇,也是一個非常實用的開發技巧。

更多關於 Stream 的說明,可以參閱 Node.js 官網上的文件:https://nodejs.org/api/stream.html

什麼是 Transform Stream?

你可能知道 Node.js 裡有多種 Stream 的機制,但其實主要是 ReadableStream 和 WritableStream 兩種基本 Stream 的組成和變化。而對一般開發者來說,最常自己實作的是 Transform Stream,你可以想像這是一個產品生產線上的加工器,進入 Transform Stream 的資料會被加工後輸出。

而以一個 Stream 而言,Transform Stream 同時具有 ReadableStream(讀入)和 WritableStream(輸出)的特性,俗話說「左耳進右耳出」就是其最佳的寫照。

舉一個 Node.js 官方的例子,利用 Gzip 的 Transform Stream 將通過的資料流進行壓縮:

const zlib = require('zlib');
const gzip = zlib.createGzip();
const fs = require('fs');
const inp = fs.createReadStream('input.txt');
const out = fs.createWriteStream('input.txt.gz');

inp.pipe(gzip).pipe(out);

實作第一個 Transform Stream

先不必暸解太多 Stream 的專有名詞和機制,若想要實作一個標準的「耳邊風」Stream,程式碼如下:

const Transform = require('stream').Transform;
const util = require('util');

const MyTransform = module.exports = function(options) {

    // 直接呼叫時建立一個實例
    if (!(this instanceof MyTransform))
        return new Parser(options);

    // 呼叫原始 Transform 的 constructor
    Transform.call(this, options);
};

// 繼承 Transform
util.inherits(MyTransform, Transform);

// 當每一筆資料進來時
MyTransform.prototype._transform = function(data, encoding, callback) {

    // 將輸入進來的資料直接推送出去
    this.push(data);
    
    // 完成這筆資料的處理工作
    callback();
};

實際結合檔案讀取寫入,以使用這個 Transform Stream:

const fs = require('fs');
const myStream = new MyTransform();

// 讀取檔案
const input = fs.createReadStream('/my/file');
const output = fs.createWriteStream('/my/file.out');

// 導入 myStream,輸出後寫入 file.out
input.pipe(myStream).pipe(output);

範例:累積並打包一批資料輸出

有些實際應用中,我們需要累積一定量的資料,然後打包成一包輸出,尤其像是我們在做開放資料的處理時,總是會將整理好的資料一批批的批次寫入資料庫。會這樣做的原因,是因為一筆寫一次太耗時(與資料庫來回的時間),一次 50、100 或更多筆資料寫入資料庫,會有更好的效率。

這時我們可以寫一個自己的 Transform Stream 來做到這件事,如每輸入 10 筆資料後,打包成一個陣列輸出:

const Transform = require('stream').Transform;
const util = require('util');

const BatchStream = module.exports = function(options) {

    // 直接呼叫時建立一個實例
    if (!(this instanceof MyTransform))
        return new Parser(options);

    // 啟用 object mode,讓此 Stream 不是用文字或 Binary 格式,而是以物件形式輸入、輸出資料
    let opts = Object.assign(options, {
        objectMode: true
    });

    // 呼叫原始 Transform 的 constructor
    Transform.call(this, options);
    
    // 建立一個暫存區陣列
    this.batch = [];
};

// 繼承 Transform
util.inherits(BatchStream, Transform);

// 當每一筆資料進來時
BatchStream.prototype._transform = function(data, encoding, callback) {

    // 放入暫存區
    this.batch.push(data);
    
    // 每 10 筆推送出去一次
    if (this.batch.length === 10) {
        this.push(this.batch);
        
        // 清空暫存區
        this.batch = [];
    }
    
    // 完成這筆資料
    callback();
};

// 當前一個 Stream 的資料輸入已經全部完成時
BatchStream.prototype._flush = function(callback) {

    // 將尚未推送出去的資料送出去
    if (this.batch.length > 0) {
        this.push(this.batch);
        
        // 清空暫存區
        this.batch = [];
    }

    // 完成
    callback();
};

實用的 Transform Stream 簡易用法

如果每一個 Transform Stream 都要先設計定義一個原型物件後才能使用,那也太煩人,在實際開發上會相當不便。這時可以運用簡單的方法,建立一個客製化的 Transform Stream:

const fs = require('fs');
const Transform = require('stream').Transform;
const myStream = new MyTransform();

// 讀取檔案
const input = fs.createReadStream('/my/file');
const output = fs.createWriteStream('/my/file.out');

// 導入 myStream,輸出後寫入 file.out
input
    .pipe(new Transform({
        transform(data, encoding, callback) {
        
            // 將輸入進來的資料直接推送出去
            this.push(data);
            
            // 完成這筆資料的處理工作
            callback();
        }
    })
    .pipe(output);

精簡!使用 ECMAScript 新支援的 class 關鍵字

在各種新一代的 JavaScript 引擎上,已經可以使用 class 關鍵字來定義物件了,我們也可以使用 class 來定義自己的 Transform Stream,程式碼會看起來精簡許多:

const Transform = require('stream').Transform;

class MyTransform extends Transform {

    constructor(options) {
        super(options)
    }

    // 當每一筆資料進來時
    _transform(data, encoding, callback) {
    
        // 將輸入進來的資料直接推送出去
        this.push(data);
        
        // 完成這筆資料的處理工作
        callback();
    }
}

再精簡一點的技巧:用 callback 推送資料

this.push() 來推送一筆資料出去,有時確實還太囉唆,我們可以用 callback() 一次搞定:

const Transform = require('stream').Transform;

class MyTransform extends Transform {

    constructor(options) {
        super(options)
    }

    // 當每一筆資料進來時
    _transform(data, encoding, callback) {
    
        // 完成這筆資料的處理工作,同時將輸入進來的資料直接推送出去
        callback(null, data);
    }
}

疑難排解:怪 Bug?為什麼程式會提前結束、資料有漏?

很多人在玩弄 Stream 時,實作自己的 Transform Stream 時會發現,時常掉資料,或是資料還沒跑完,應用程式就提前結束,感覺相當不穩定。

通常,這得提到 Stream 本身的機制才能夠很完善的說明原因,但簡單來說,Stream 本身會在資料滿載處理不過來時暫停運作(可參考 Node.js Stream 的 highWaterMark 設定),所以如果 Stream 後面沒有下一家 Stream 接手消化資料,這條資料流就會堵塞卡死。

所以,通常發生這樣的情況,肯定是因為你沒有幫自己的 Transform Stream 設定下一家該往哪去,例如:

input.pipe(myStream); // 下面沒有了

尤其是,當 Stream 暫停運作後,事件引擎就沒有新的事件在跑。眾所皆知,當 JavaScript 的事件引擎沒有事件時,Node.js 整個應用程式自然就會結束。

此外,Node.js Stream 的設計上,除了用 .pipe() 來設定下一家是誰之外,還有另一種辦法可排泄資料,那就是使用 .on() 監聽 data 事件:

myStream.on('data', function(data) {
    // ...
});

但通常不建議這樣使用,尤其是當你接到資料時,需要做許多非同步(Asynchronous)的複雜工作時。當資料量大時,這樣監聽事件的做法並無法做資料節流,會導致你一瞬間觸發許多非同步工作,進而將你的系統資源耗盡。若在分秒算錢的雲端系統上,你會得到爆量的結果,不是伺服器負荷不過來,就是把你的錢燒盡。

註:如果你想實作一個沒有後面又可以運作的 Stream,你必須要參考 WritableStream 的實作方式,理論上做法大同小異。

後記

之前有人抱怨,搞不太懂 Stream 在做什麼事,或到底怎麼實際開發使用。

其實 Stream 是一個看似單純,但細節很多的機制,而且開發過程中會在觀察者與非觀察者切換,對許多初學者來說更是一大挑戰,更別說會碰上一些掉資料等看似奇怪的行為。所以,很能理解不少人為什麼看不太懂網路上各種說明 Stream 機制的文章,也看不到太多人使用 Stream 機制在實際的開發上。

所以本文以較通俗簡單的範例和方法來說明 Stream 的使用,至於比較嚴格的定義或原理性的說明,就留給讀者自己去翻閱官方相關文件了。:-)

這個網誌中的熱門文章

Web 技術中的 Session 是什麼?

淺談 USB 通訊架構之定義(一)

淺談 USB 通訊架構之定義(二)

NodeJS 與 MongoDB 的邂逅

Koa 2 起手式!