11 months ago

串流是啥,事實上這個東東,我們每天都有使用,簡單的說,它是一種傳送內容的技術,在沒有使用串流技術時,我們想要在網路上看影片,需要將它下載下來才能播放,但如果使用串流技術那傳送影片,它會將一小短小短的資料,一直傳送給網頁,所以我們可以直接進行觀看,並且在觀看時它還會繼續傳送後面的片段過來。

串流的優點

在node js中,傳送內容基本上有分兩種形式,一種是緩衝而另一種就是串流,我們先來看看緩衝在處理資料傳送時,它是如何處理。

緩衝它的基本概念就是將所有的資料先收集到緩衝區裡,當資料已經完整的讀取完,在傳送給接受者,如下圖所示,它會將HELLO MARK這所有的資料先讀取完,然後才能傳送給接受者。

然後我們來看看串流,它每個時間點一接受到資料就會直接發送給接受者,所以如下圖所示,在時間點t1時,它會接受到HELLO這串資料,然後在t2時會接受到剩下的MARK資料。

那這樣有什麼好處呢 ? 簡單的說有兩個優點,一個是空間效率,因為如果使用緩衝的方法來進行個10gb以上的資料傳輸,就代表這你需要10gb的緩衝空間,那這樣記憶體一定爆掉。而第二個優點就是時間效率,就如同最上面的影片例子來說明,使用串流,你可以直接看影片,然後再看影片時,它還會繼續傳送剩下的影片片段,節省了等待時間。

緩衝與串流的程式碼實做比較

我們簡單寫一個檔案下載的伺服器,然後分別以緩衝與串流的程式碼,來進行實作,在nodejs中,有個 module 名為fs,它是專門用來處理檔案傳輸的工具,它也同時繼承了node 中的串流模組stream

緩衝讀取檔案實作

這是一個下載aaa.avi檔案的伺服器,但假設該檔案大小如果大於1024mb(舊版本node)的話,它會直接死掉,因為它緩衝爆掉了。

基本上這種類形的api被稱為Bulk I/O

const http = require('http');
const fs = require('fs');

const server = http.createServer((req, res) => {
    fs.readFile('aaa.avi', (err, data) => {
        if(err) {
            console.log(err); 
            res.writeHead(500);
            res.end(err.message);
        } else{
            res.writeHead(200, { 'Content-Type': 'video/avi' });
            res.end(data);
        }    
    });
});

server.listen(3000, () => {
    console.log('server up !');
});

串流讀取檔案實作

下列程式碼就是使用串流來實作的讀取檔案伺服器,這樣如果檔案大小在大,它都可以進行處理,因為它是將大檔案分割成小塊小塊,然後一直傳輸。

const http = require('http');
const fs = require('fs');

const server = http.createServer((req, res) => {
    res.writeHead(200, { 'Content-Type': 'video/avi' });
    fs.createReadStream('aaa.avi').pipe(res)
        .on('finish', () => {
            console.log('done');
        });
});

server.listen(3000, () => {
    console.log('server up !');
});

Node.js 的串流工具說明

在串流的世界中,有分為可讀取串流另一種為可寫入串流

可讀取串流

可讀取串流就是指資料的來源,我們可以利用在stream核心模組中的Readable,來抽像化類別進行實作,基本上像fsprocess.stdin都有繼承Readable這模組,所以我們在fs中差可以使用串流的技術來讀取檔案。

接下來我們來實作個Readable串流。

首先建立一個名為Read的類別,並且繼承Readable,在這個類別中,我們需要實作_read這個方法,這個方法最主要的功能就是從你的資源(ex.檔案或文字)中取得資料,並執行this.push來丟到一個internal queue中,你可以想成水缸,每當this.push被呼叫時,Readable就會觸發readable事件,白話文就是和使用者說,這裡有資料喔 ~ 快來拿啊,這邊可以想成打開水缸開關給人拿。

下列程式碼中,我們在_read模擬每次會丟入this.push為一個字元,並且至到字元結束時,就停止丟入this.push(null),然後我們會使用markStream.read()這段用來實際取得現在internal queue內有的資料,我們也可以使用markStream.read(size)來指定每次取得時,要取得多大的資料。

const Readable = require('stream').Readable;

class Read extends Readable {
    constructor(data) {
        super();
        this.data = data;
        this.dataLength= data.length;
        this.count=0;
    }

    _read(){
        const chunk = this.data.slice(this.count,this.count+1);
        if(this.count === this.dataLength){
            this.push(null);
        }else{
            console.log('Pushing chunk of size:' + chunk.length);
            this.push(chunk, 'utf8');
            this.count++;
        }
    }
}

const testStr = new Array(10).join('喝');

const markStream = new Read(testStr);
markStream.on('readable', function(){
    let chunk ;
    while ((chunk = markStream.read()) !== null){
        console.log("chunk received:" + chunk.toString()); 
    }
});

可寫入串流

可寫入串流就是指資料的接受者,例入我們要將資料寫入到一個檔案中,那個檔案就是一個接受者。

下面的程式碼,我們一樣實作一個Writable,首先建立一個類別Write並且繼承stream.Writable,在這裡我們要實作_write方法,這支方法是要告訴stream要如何將資料寫入到接受者內,這範例中,我們的接受者就是一個檔案,存放位置為test.txt

Writable中,我們使用者可以使用write(chunk)來將資料寫入到stream中,最後當使用者呼叫end()時就是告訴stream已經沒有資料了,這時它會觸發finish事件來通知使用者。

const stream = require('stream');
const fs = require('fs');
const path = require('path');
const mkdirp = require('mkdirp');

class Write extends stream.Writable {
    constructor() {
        super();
    }

    _write(chunk, encoding, cb) {
        const data_path = "test.txt";
        mkdirp(path.dirname(data_path), function(err) {
            if (err) {
                return cb(err);
            }
            fs.writeFile(data_path, chunk, cb);
        });
    }
}

const markStream = new Write();
markStream.write("Hello mark , how are you today?");
markStream.end(()=>{
    console.log('finish');
});

回壓 (Backpressure)

stream就如同水管中的水流動,但是水管大小一定要限制,如果水量過大,而出口過小一定會發生問題,這在資料傳送時也一定會發生,如果寫入資料快於串流能消化的速度時就一定會遇到。

而回壓這個機製就是為了解決這個問題。

我們先來寫段程式碼,來模擬水管爆掉的情況,基本上我們使用和上面的範例相同,都是寫入資料到檔案中,_watch沒有變動,比較不同的是執行寫入時的下面段,在執行write()時除了寫入到stream中,它事實上還會回傳一個布林值,用來告訴我們這個水管是否滿了,如果滿了,它會回傳true

對了還有我們本次測試時使用chance模組,它可以隨機產生字串,而likelihood: 95這段代表chance.bool95%的機率會回傳true,而我們為了盡可能的模擬出水管爆滿的情況,所以我們將丟入的資料大小增加到16KB左右。

有個預設的highWaterMark限制為16KB,這就是水管的大小。

const testStream = new Write();
function generateMoreData() {
    while (chance.bool({likelihood: 95})) {
        let is_continue_write_data = testStream.write(
            chance.string({length: (16* 1024) -1}) 
        ); 

        if(!is_continue_write_data){
            console.log('Backpressure');
            
            // 這行先別看
            //return testStream.once('drain', generateMoreData);
        }
    }
}

generateMoreData();

這是全部的程式碼,Write的地方完全相同。

const stream = require('stream');
const fs = require('fs');
const path = require('path');
const mkdirp = require('mkdirp');
const chance = require('chance').Chance();

class Write extends stream.Writable {
    constructor() {
        super();
    }

    _write(chunk, encoding, cb) {
        const data_path = "test.txt";
        mkdirp(path.dirname(data_path), function(err) {
            if (err) {
                return cb(err);
            }
            console.log('push')
            fs.writeFile(data_path, chunk, cb);
        });
    }
}


const testStream = new Write();
function generateMoreData() {
    while (chance.bool({likelihood: 95})) {
        let is_continue_write_data = testStream.write(
            chance.string({length: (16* 1024) -1}) 
        ); 

        if(!is_continue_write_data){
            console.log('Backpressure');
            //return testStream.once('drain', generateMoreData);
        }
    }
}

generateMoreData();

然後我們來看看執行後的結果,下面的意思就是說,我們丟第一次資料時水管就滿了,所以接下來丟的資料都進行不去,但這時我們還是硬給他一直丟(一堆Backpressure),這時就只是浪費社會資源。

Backpressure
Backpressure
Backpressure
Backpressure
Backpressure
Backpressure
Backpressure
Backpressure
Backpressure
Backpressure
Backpressure
Backpressure
Push
Push
Push
Push

那這時我們要如何解決呢,有一個事件叫drain,它就是用來通知我們水管可以丟了,所以我們現在就是監聽drain事件,當它通知我們時,就代表我們可以在丟資料了generateMoreData

const testStream = new Write();
function generateMoreData() {
    while (chance.bool({likelihood: 95})) {
        let is_continue_write_data = testStream.write(
            chance.string({length: (16* 1024) -1}) 
        ); 

        if(!is_continue_write_data){
            console.log('Backpressure');
            
              // 這邊 !
            return testStream.once('drain', generateMoreData);
        }
    }
}

generateMoreData();

我們來看看增加這行的執行結果,這樣好多了,不會一直往水管中丟資料。

Backpressure
push
push
Backpressure
push
push
Backpressure
push
push

參考資料

← 排序之桶子排序法(Bucket Sort) Node.js 的串流之旅之雙工串流與管道 →
 
comments powered by Disqus