5 days ago

在上一篇文章中,我們說明了如何的設計像 line 的聊天群的架構設計,而這一篇我們要來說明聊天室的架構設計,這東西和上一篇有什麼差別 ?

通常聊天群是會由用戶提出申請,然後管理者來加入到該群裡,而聊天室則不相同,它是用戶可以自由自在的加入或退出,這也代表這,通常聊天群會限制人數,像 line 好像就限制 500 人,而聊天室則否,他通常不會限制人數。

那這也代表我們要面對什麼問題呢 ? 我目前想想主要有兩個 :

  1. 由於沒有限制人數,所以通常我們的架構要考慮擴展性。
  2. 訊息的即時性非常的要求,如果一個訊息傳輸慢了,會導致其它人無法理解上下文。

最簡單的聊天室架構 V-1

基本上和聊天群的架構相同,都是一個Business Server和一個Message Server,其中前者做的事情是為所有需要使用 http 協議的工作,更正確的說是 http 短連接的工作,如新增聊天室、登入、登出、註冊這類事情的,都屬於 Business Server ,而所有使用 websocket 協議的都是屬於 Message Server 的工作。

聊天室 V-2

上面的架構有沒有啥問題呢 ? 有的 ! 請想像一個情境 :

用戶 A 從 business server 登入後,然後再去 message server 建立連線,但問題是 message server 怎麼知道這條連線是用戶 A 呢 ?

在一般的 web 應用中,每當 client 連結 server 時,server 會產生唯一個 sessionId ,並用它來連結 server 內的存放空間,然後會將 sessionId 存放到 cookie 中,這樣每一次 client 進行請求時,server 都會去 cookie 中取得 sessionId 然後再去 session 取得資料。

從上面的說明可知 session 是存放在 server 中,所以上面的情境變成 business server 產生 session 但問題是 message server 沒有 session。

所以我們將架構修改成如下 :

我們會新增一個 redis 用來專門存放 session 當使用者登入後產生 session 資訊存放入 redis 中,接下來 client 要與 message server 建立連結時,由於它是使用 http 進行連線,所以它會有包含 cookie ,內含已加密過的 sessionId 進來,最後到 message server 時他會用這個 sessionId 再去 redis 取得使用者資訊。而每當登出時,business server 相同的也會更新 redis 裡面的資料。

V-2 版本大致上就是如此 ~

聊天室 V-3

上面的架構理論上基本應該可以運行,但根據我們的要求,用戶數有可能會激增,所以我們這邊的Message Server需要考慮擴展,會優先考慮擴展他主要原因在於,每一個 websocket 都代表這一條 tcp 連線,而由於他是持久連接,白話文就是它會一直佔著一定的資源,它並不像 http 用完一定時間就收,所以如果一個 message server 有 1 萬人在聊天室,就也代表這需要同時維持 1萬條 tcp 連線,會上西天的,所以說我們需要一個代理器來做 Load balance 可以幫助我們將流量分散,架構變成如下 :

其中 Proxy 可以做很多的事情,其中最重要的就是要幫助我們分散流量,它會將我們 http 的請求 (登入、登出) 導到 business server。 websocket 的連接,則導到其中一個message server

這時我們來模擬一下,用戶在聊天室時發送訊息的流程 :

  1. client 發送訊息 。
  2. proxy 收到請求,並將它導至 message server A 。
  3. 然後 message server A 將該訊息 broadcast 到聊天室的用戶 。

目前是建議 proxy 使用 nginx 來處理,他本身就提供了 load balance 的功能,而且它也可以幫助我們來防禦一些 ddos 攻擊。

這邊有一個重點要記得:

它建立websocket 通道是長成:

client ==== proxy ==== message server
而不是
client ==== message server

聊天室 V-4

嗯嗯 ~ 看上去都沒問題。

但這是有前提假設的,那個假設就是 :

那個聊天室的所有用戶,都在同一個 message server 裡 。

你想想如果有個用戶不在 message server A 裡,那 broadcast 時不就代表那個用戶不會收到訊息 ?

那這邊要如何解決呢 ? 我目前想到的二個方法就是 :

  1. 在 load balance 時,將同一聊天室的用戶都導到同一個 message server 裡 (QQ)。
  2. 建立一個 pub/sub 的redis 來處理。

我先說說第一個的方法,我們使用 load balance 根據聊天室來將 websocket 連線導至某台 message server ,也就是說同一個聊天室的用戶都放在相同的 message server 裡,所以當有用戶發訊息到聊天室時,那個特定的 message server 就會自動的 bordcast 出去。

而當用戶要加入某個聊天室,而想發出xxxx加入聊天室時,因為那個聊天室在那個 message server 所以只要往那個 message server 發送訊息。但這種方法的缺點就在於你要如何分配聊天室在那個 message server , 分配不好,大部份流量都往那個 message server 去,反而失去我們擴展 message server 的用因。

這方法我覺得還有一個問題,那就是假設我們其中一台 message server 上西天了,那不就代表該聊天室的用戶都無法進行連線了 ? 除非我們的 load balance 演算法有設計好,不然問題很大。

目前比較推第二個方法,就是架構個 pub/sub 功能的 redis ,如下圖 :

然後我們來說一下它的運作流程,我們先看看下圖,當用戶 A、B、C 加入某個叫 kkbox 的聊天室時,當在建立連線時 message server 會去 redis 進行 kkbox這個 channel 的訂閱 (subscribe)。

接下來當用戶 A 發送訊息時,會前往 redis 對應的 kkbox channel 進行訊息 pub ,然後存在用戶 B、C 的 message server 收到訂閱的敲門,就會將消息推送到用戶 B、C了。

聊天室 V-5

上圖為我們到目前的架構,基本上可以動,理論上應該運行的不錯,但實際上呢 ?
不知道 !,現在環境難以預測,所以可能現實環境會發生下面的事情 :

啊 ! proxy gg 了,所有的請求都不能進來了 !

由於我們 proxy 那裡是使用 nginx 來做 load balance ,所以如果我們那裡掛掉,那我們的還有服務事實上就說掰掰了,所以為了避免這種狀況,我們需要用到keepalive的功能。

keepalive是啥 ? 它主要的功用就是如下 :

在一個集群中,會隨時的檢查每一台機器的健康狀態,保證該集群可以服務。

所以說,我們可以在 proxy 建立一個集群,也代表這可能有多個 nginx,然後我們在搭配 使用 keepalive,這樣每當有一台 nginx 上西天後,keepalive 就會檢查到,然後會立即的轉換成另一台可以用的 nginx,這樣也確保了我們的整體服務不會因為一台 nginx 上西天就全死了。

聊天室 V-6 (在想想)

基本上,上面的架構的確可以運行,但是呢 ~ 我們有沒有辦法確定訊息的即時性是正確的,例如下面的情況 :

聊天室中有 A 然後他發了三句話 :

A(最早) : 肚子餓了吃啥 ?

B(第二) : 吃拉麵如何 ?

C(最後) : 還是吃飯 !

但有沒有可能在實際的聊天室看到的是變成這樣 ?

C(最後) : 還是吃飯 !

B(第二) : 吃拉麵如何 ?

A(最早) : 肚子餓了吃啥 ?

會出現這種狀況,最有可能的場景就是分散式的問題,因為每一個服務都是不同的機器上,而不同的機器上會有自已的本地時鐘。

像假設我們 message server A 在台灣、message server B 在日本,這時他們的本地時鐘就是不相同的,所以我們在判斷訊息的先後順序不能使用 server 端時鐘,同理 client 端的時鐘也不行。

我這邊簡單的整理一下,很難保證時間順序性的原因 :

時鐘不一致 :

就像我們上面說的範例,你放在不同的機器,會有不同的本地時鐘。

多用戶端 :

假設我們有兩個用戶 A、 B ,一個 server,然後 A 先發送訊息,接下來是 B 再發送訊息,但因為網路傳輸的問題,我們不能保證 server 先收到 A 。

多伺服器端 :

假設我們有一個用戶 A ,然後兩個 server A 與 B ,然後先發送訊息到 server A、在發送訊息到 server B ,但因為兩台機器時鐘不一定相同,所以可能會導致時間不正確。這邊你可以想成,假設有 load balance 時,第一次訊息它導至 A,而第二次訊息導至 B。

不好意思,請參考這篇文章 如何保证IM实时消息的“时序性”与“一致性”? ,不過這篇文章中有幾個解法我是有點疑問的,像它裡面有個在單點 server 上,生成有順序的 id ,但問題在於如果你進來的順序就已經亂掉了(網路問題),那你生成的這 id 事實上也沒什麼意思,除非你可以確保,你進到 server 時的順序是正確的,這 id 才能使用。

所以這方面的問題,改天在另生一篇文章出來寫寫,這邊就先降 ~

參考資料

由於本篇參考不少資料,所以以下只列出所參考資料出處。

 
9 days ago

本篇文章中,我們講要說明,如何開發一個簡單的聊天群系統,這個東東雖然我們很常見到,到和我們平常開發的一些 WEB 有很大的差別。

差別在那呢 ? 假設我們開一個todolist功能,事實上大部份的工作就是crud的事情,每當要新增一個 todo 時,只要發送 http 到後端新增資料到資料庫裡去,然後在回傳結果就好了,但聊天群這種,如果你每發送一個訊息都使用 http 那一定爆掉的。

像聊天群這樣類型的,我們稱為InstantMessaging IM中文為即時通訊,本篇文章我們將會說明要建立這種IM應用所需要的基本知識。

開始吧 ~

從 Web 到 IM 的通信過程轉變

在最開始時瀏覽器它沒有辦法直接連接到另一個瀏覽器的通信功能,也就是說你不能從 A client 直接傳送訊息到 B client 去,我們只能在它們的中間,建立一個 server ,來將 A 要傳送的訊息儲放起來,然後 B 在自已去 server 取得資料,如下圖 :

這種做不行嗎 ?

說實話,功能是有做出來 ~ 但浪費太多的資源,你想想,根據上面的說法,當 A 發送訊息到 server 後,你 B 要如何知道 server 有你的訊息 ? 記好 http 只能從client發送到server,不能反之,所以這也代表這你 B 只能定時的去 server 問問看,說有沒有我的資料啊 ~

很明顯的,你可能問了十次,只有一次才有你要的訊息,那其它九次,不就都浪費掉了,這也代表你的 IM 系統有 90 % 的效能在處理沒用的事情

當然中間處理的其它方法先不說,後來 html5 提出了一個應用層的協議websocket,來解決這事兒 ~

Hello WebSocket

這個協議可以幫助我們可以實現,從 server 端推送資料到 client 端,而且從建立的通道是持久連接,在 http 1.0 時你每發送一次請求,都需要做一次 tcp 握手,而 http 1.1 時,則可以多次使用一個 tcp,但在 websocket 你就只要建立一次,當完成握手後,就會產生一個全雙工的通道。

全雙工代表這,可以從任何一方傳送資料或接受資料。

它的 server 與 client 的運作圖如下 :

最簡單的聊天群架構

從上面的概念中我們知道,我們這邊主要需要使用的東西是 websocket,接下來我們可以來開始的規畫我們聊天群的架構。

我們先從最簡單的來看,如下圖 :

Business Server : 用來處理用戶需要用到 http 的操作,例如用戶登入、登出、加入聊天群、離開聊天群之類的,並且每當使用者要加入聊天群時,會發送訊息給 message server 然後它會在往 client 端推送說 ~ 某用戶加入聊天群囉 ~

Message Server : 用來發送訊息與接受訊息。

上面的架構很簡單,就是 client 對一個 server,但我們來想想會發生什麼問題 ? 首先假設我們這個系統有多少人,server 就需要建立幾條的 websocket,那我想問,一台 server 可以接受幾個 websocket 連線呢 ?

一台 server 可以有多少連線呢 ?

這個問題說實話很難回答,因為還要考慮 server 性能、程式碼撰寫等,不過我這邊會簡單的根據,來大概的算出可能的連線數。

要理解這個問題,我們需要先理解下面這個知識,

file hanle 的限制

在 unix 中每一個 tcp 連線都要占用一個file descriptor,而它有一定的限制數量,當使用完後,新的 tcp 連線到來就會發生錯誤以下的錯誤訊息 :

Socket/File: Can't open so many files。

那一個 process 我們可以開啟幾個檔案呢 ? 我們可以用以下的指令來看看 :

ulimit -n

像我這台 mac pro 的預設是 :

4864

而如果是AWS EC2那在最基本版本,什麼都沒動過的則為 :

1024

所以白話文的說,如果我沒有修改預設,我在這邊最多同時間,只能連線4864個 tcp 。
它當然可以改預設,不過每一個系統一定都是有上限的,記憶中有些系統的上線大約 1百萬 到2百萬之間,這邊到不確定。

簡單的總結一下,一個 server 可以有多少連線呢 ? 直說我很難回答正確的答案,但我只能說,如果你沒調整預設的 process 限制,那必定只能連這個系統的上限(大概)。

但是每個系統有設定限制,就代表它預設一定有它的理由,所以不確定修改是不是個好方法。

聊天群架構 V-2

上面的架構非常非常的簡單,但它有什麼問題呢 ? 我們想想以下這個場景 :

我們某個聊天群裡總共 4 人,其中 A、B、C 三人目前在線上,而 D 不在線上,那麼 ~
要著麼確保 D 上線時,可以看到其它三人所發送的訊息呢 ?

目前大概的想法為,增加兩個服務,其中第一個為使用 redies 建立的用來保存用戶狀態(上線與未上線)的服務,每當使用者登入或登出時,都會透過business server發送訊息來更新redies狀態。

然後每當有訊息準備要傳送時,message server會先到redies取得聊天群內的用戶資訊,然後如果在線上的,則進行傳送,不在線上的則將知存放到temp message server裡。

然後每當用戶上線時,business server會發個訊息到messge server去,然後去看看temp message server裡有沒有離線訊息,有的話就傳送出去。

聊天群架構 V-3

上面的架構看似沒問題,但是它事實上有個缺點,有沒有注意到,很多的邏輯運算都在message server裡面做,我原本是將它定義成只用來收發訊息,但是我現在很多判斷都在裡面做,雖然在量不大時,還沒什麼問題,但如果量很大的話message server會上天堂的。

所以我決定在增加一個服務,就取名為Logical Server,架構圖如下 :

它主要的工作就是要處理所有要送出與收到的邏輯運算,像我們剛剛上面說到,要更新使用者狀態也改成從這裡運作,而決定那些訊息要存放到離線 server 也是在這處理,而在每一次使用者登入時,要傳送的訊息也都是從這裡處理好,然後在去通知message server和他說,該送貨囉 ~~

聊天群架構 V-4

上面的架構中,我們基本上已經可以處理不少需求了,但我們在思考看看,有沒有那個地方可以改進,我們從一個角度來想,流量,我們仔細想一下那一個地方的流量是最大的 ? 嗯就是message server,假設我們一台message serverhold 不住要著麼辦呢 ?

這時就是針對message server進行擴充,架構會變成如下 :

上面新增加了兩個服務 :

Proxy : 用來決定要使用那台message serverclient進行連線,事實上就是 load balance 的功能。

Dispatch : 用來決定訊息要傳送到那一個message server

我們每一次要建立 websocket 連線時,都會需要先連到proxy由它來決定你要去那個message server建立 websocket 連線。

然後每當有訊息進來後,會到一個名為dispatch的服務,先將該訊息進行包裝,並且也會注明該訊息是從那個 message server 過來的,然後再傳送到 logical server 中,判斷那將該訊息送到那些用戶上,最後在回送到dispatch上,由它來決定要送到那個message server 上,最後再由它回傳給用戶上。

結論

在這篇文章中,我們所討論到的聊天群架構,事實上到 V-4 版本,已經可以處理大致上WEB!聊天群會用的功能,下一篇文章中我們將要來討論如果來設計聊天室

參考資料

 
12 days ago

在前面的幾篇有說到,不同的process間,我們可以使用 IPC 通信來進行溝通,但如果是不同電腦呢 ? 要如何溝通呢 ? 我們這時就可以使用 socket 來進行溝通。

在開始說明 socket 前,我們需要先準備一些基本知識,那就是常聽到的 tcp/ip

TCP/IP 通訊模型

tcp/ip 它是一種網路協定,它定義了點對點如何的傳輸,如何將資料封裝、定址、傳輸、路由以及在目的地如何接受,全部都加以標準化,它基本上可以分為四層應用層傳輸層網路互連層網路介面層,它常被視為簡化的七層 OSI 模型。

圖片來源:鳥哥

在了解 socket 前,我們需要了解應用層傳輸層的基本概念。

應用層

這個層級主要是定義 :

應用程式的溝通協定,也可以理解為不同應用程式如何協同工作。

在這個層級的協定,大部份都會使用到兩個傳輸協定tcpudp,至於何時使用 tcp 或 udp 取決於,該協定是否保證資料完整的傳送到另一端,這邊我們只要記得tcp可靠udp不可靠這兩件事情就夠了。

我們常用的 http 就是屬於這一層協定,smtp 也屬於這層,我們簡單的來說明一下 http 的概念。

HTTP (超文字傳輸協定)

它是一種應用層的傳輸協定,它主要定義了下面的事情 :

它是一個用戶端與伺服器端請求和應答的標準

通常 http 用戶端的發出一個請求,它會建立一個到伺服器端的 TCP 連線 。

傳輸層

這個層級主要是定義 :

定義點到點如何傳輸

其中tcp、udp就是這一層,我們簡單的來說明一下 tcp 的工作,就會知道這個層級主要是做啥事情。

TCP (傳輸控制協定)

它是根據傳輸層的定義,所完成的協定,這個協定宗旨在於 :

提供一個可靠(不會掉資料)的資料流傳送服務

那它用什麼方法來處理可靠的問題呢 ? 答案就是tcp三次對話

我們簡單的用下面例子來說明,假設 A 和 B 兩台電腦要傳輸資料了,這時就開始要準備建立 tcp 連線,但在連線之前需要有三次對話 :

A : 我想發資料給你(B),好嗎 ?

B : 喔好啊,我會發送一個同意連接要求同步(ack1)的給你喔 。

A : 好我收到了,然後我回發了一個同意要求同步(ack2)的給你喔。

經過以上三次對話,A 才能正式的傳送資料給 B。

只要是對可靠性要求的傳輸,都必須使用 tcp 協定。

那 Socket 是啥 ?

在簡單的理解完上面的網路概念後,我們就可以來理解,什麼是 socket 。

socket 是在應用層傳輸層之間的一個抽象層,它是一組接口,隱藏了底層的複雜操作,同時你也可以把他想成一個雙向的 endpoint ,可以給人連線或連線它人,而且由於它有綁定一個特定的 port 所以這也代表傳輸層它們那邊可以用它來定位應用程式。

你可以吧 socket 想成一個兩個節點的節點的溝通機制的概念,然後在由其它語言來實作這個概念。

而他的運作流程圖如下 :

Node 的 Socket 實作

在上面大概理解了 socket 機制後,我們簡單的使用 nodejs 來建立兩個 socket 節點,一個為 server ,另一個則為 client。

Server端

我們 server socket 主要是用來接受 socket client 的資料。

var net = require('net');

var HOST = '127.0.0.1';
var PORT = '61111';


net.createServer(function(sock){
    console.log('Server open !');

    sock.on('data',function(data){
        console.log('I receved data from client :' + data);
    });

    sock.on('close',function(){
        console.log('close');
    });
}).listen(PORT, HOST);

Client端

而我們的 client 端則為每一秒傳輸資料到 server 端去。

var net = require('net');

var HOST = '127.0.0.1';
var PORT = '61111';

var client = net.Socket();
client.connect(PORT, HOST, function(){
    console.log('client connected');

    setInterval(function(){
        client.write('I am Mark');
    },1000)
});

client.on('close', function(){
    console.log('close client');
});

Socket VS Websocket

不一樣的東西,前面有提到,socket 基本上是屬於傳輸層與應用層的抽象層,而 websocket 它基本是屬於應用層的協定了。

ws://example.com/wsapi
wss://secure.example.com/

上面先說結論,我們先簡單的來看一下什麼是 websocket 。

輪詢

在 web 開發時,我們有時後會遇到這樣的需求,就是當 server 端資料有變動時,client 端畫面會變動,很古老的做法是輪詢,也就是說定時的去 server 端問問看有沒有空的資料,但很明顯的,這浪費很多資源,有可能 10 次呼叫,只有一次才真的有新的資料。

Comet

再來出現的是comet它是一種推播技術,也就是 server 那可以更新資訊時傳送到 client 端,它有一種實現方式長時間輪詢 long-polling,它是 client 和 server 進行溝通後,它的連線會先留一段時間,等某段時間沒資料時,再發送請求到 server 端,
但他還是有缺點,那就是當 server 沒有資料時,那個連線還會繼續連接,會造成 server 資源浪費的。

Websocket

最後是websocket,它是 html5 規範發布的新協議,等同於應用層(ex. http),它的基本概念為 server 端與 client 端的建立是持久連接,使用這項協議後, server 端可以主動傳送資料給 client 端,而且它 tcp 握手只要一次,不像 http1.0 每次使用都需要 1 次的 tcp 握手,但 http1.1 時,則可以在一次的連接處理多個請求,但還是比不上 websocket 的一次。

這種東東很適合用來處理聊天室報價系統這類型的應用。

NodeJS 中的 Socket.io

socket.io是一個 nodejs 的套件,它做了什麼事情呢 ? 它將上面提的溝通方式全部的整合在一起,讓我們前端與後端可以處理推播功能,它有支援以下的傳輸方式 :

  • xhr-polling
  • xhr-multipart
  • htmlfile
  • websocket
  • flashsocket
  • jsonp-polling

上面對種類中有polling字樣就是我們上面所說的溝通方式,我們直接拿官網的程式碼來看看使用的方法,下面為 server 端的程式碼。

var io = require('socket.io').listen(8080);

io.sockets.on('connection', function (socket) {
    socket.emit('news', { hello: 'world' });
    socket.on('my other event', function (data) {
        console.log(data);
    });
});

而下面為前端。

<script src="/socket.io/socket.io.js"></script>
<script>
    var socket = io.connect('http://localhost:8080');
    socket.on('news', function (data) {
        console.log(data);
        socket.emit('my other event', { my: 'data' });
    });
</script>

這樣就可以完成推播功能。但這邊有個地方有注意一下,我們都沒有設定要用什麼傳輸方式,那它要用那種傳輸方式呢 ? polling ? websocket ?

socket.io 自動會根據瀏覽器支援到什麼程度來決定使用什麼。

很方便吧 ~ 事實上 socket.io 就等同於 net 開發者都很熟悉的signaR

Network Socket VS Unix Socket

在網路上尋找 socket 文章時,常常會看到 socket 有分為network socketunix socket,那這兩個有什麼不同呢 ?

上面一章節有說到 socket 本身是建立在 tcp/ip 網路層級的應用層與傳輸層中間,所以說它本身一開始是定位於網路溝通使用,也就是我們這邊說的network socket,但後來發現在 socket 的概念上,建立一個 IPC 通信,可以使我們電腦內的溝通更有效率,這時的產生出來的東東,就是我們所謂的unix socket

unix socket不像network socket一樣,它不需要經過網路協定,tcp 連線、握手等步驟,而只是將應用層的資料,從一個 process 傳輸到另一個 process上,它於其它 ipc 機制相比來說,算是目前最常使用的 ipc 機制 (會有另一篇來比較 ipc 機制)。

在 nodejs 中,我們也可以建立 unix socket 程式碼如下,這裡的程式碼基本上與 network socket 相似,不同點在於我們 listen 不是個網址,而是個檔案,而你可以將這個檔案想成為socket 節點

const net = require('net');
const server = net.createServer((c) => {
  // 'connection' listener
  console.log('client connected');
  c.on('end', () => {
    console.log('client disconnected');
  });
  c.write('hello\r\n');
  c.pipe(c);
});
server.on('error', (err) => {
  throw err;
});
server.listen("/tmp/echo.sock", () => {
  console.log('server bound');
});

然後我們就可以使用以下指令,建立一個連線到/tmp/echo.sock的 client 端 socket 。

nc -U /tmp/echo.sock

然後 client 端這邊應該會收到從 server 端的 socket 傳送回來的訊息 :

hello

然後 server 端應該也會輸出以下訊息 :

client connected

參考資料

 
about 2 months ago

本篇文章中,我們想要知道以下兩件事情 :

  1. 為什麼要使用它呢 ?
  2. 什麼是策略模式呢 ?

為什麼要使用策略模式呢 ?

我們簡單的寫一下,一個多需要用不同方法的登入方法,它可以選擇使用googlefacebook的方法,來進行登入。

var user = {
    login: function (type) {
        if (type == "google") {
            doGoogleLoginSomething();
            console.log("google login process");
        } else if (type == "facebook") {
            doFbLoginSomething();
            console.log("facebook login process");
        } else {
            doSomething();
            console.log("custom login process");
        }
    }
}


user.login("google");

那上面這段程式碼中,有那些缺點呢 ?

首先第一個,它包含了很多的 if else 判斷,這樣反而增加了該函數的邏輯分支。

第二個為該函數缺泛彈性,如果你想增加twitter的登入,那就必須修改這函數的內部實作,這樣違反了開放封閉原則

開放封閉原則 : 白話文就是當你增加新功能時,盡量不修改原有的程式碼。

好處 : 較好維護較好測試可重複使用

所以說,當碰到這種情況時,就可以使用策略模式囉 ~

策略模式簡單的來說,就是為了處理以下的情況 :

當使用者有相同的行為,但不同的場景時,有不同的方法。

例如 : 使用者想要進行登入,但我們可以使用googlefacebook等不同的方法來登入。

在白話文一點就是 : 當你有大量的if elseswitch就可以使用策略模式了。

策略模式是什麼 ?

接下來我們就要使用策略模式來修改上面的程式碼,但在開始前,我們要先知道策略模式是什麼。

簡單的說,它的定義如下 :

定義一系列的演算法,把它們一個個封裝起來,並且可以相互替換。

以我們上面登入的範例來看,login裡面的每一種登入方法就是一種演算法,但他都丟在裡面,所以我們策略模式就是要將他,一個一個封裝起來,並且可以相互的替換。

如下面的程式碼,我們將每一個登入的演算法都封裝起來,然後在需要那個的時後,就使用那個。

var user = {
    login: function (stragtegy) {
        stragtegy();
    }
}


var loginStrategy = {
    fb : function(){
        doFbLoginSomething();
    },
    google : function(){
        doGoogleLoginSomething();
    },
    custom : function(){
        doSomething();
    }
}

user.login(loginStrategy.fb);

首先我們先來看看可維護性,上面的程式碼中,我們假設要新增一個twitter,那你只需要在loginStrategy裡新增一個twitter的策略,這樣就不會動到主題的login函數,以防止,當你修改了login會影響到其它登入方法的問題。

再來來看看可測試性,我們只需要mock你要測試的策略,然後在將他丟到login裡面,進行測試,非常的簡單,如果是沒用策略模式的程式碼,我還真的要動腦想一下著麼寫單元測試了。

結論

上面我們簡單的用登入系統的例子,來說明策略模式,在 node 中,有一個叫passport js的東西,就是使用策略模式,來實作登入系統的實例,筆上之前就有寫過一篇關於 passport 的文章,可參考參考。

Passport.js 之 Hello 你好嗎 ~

參考資料

 
about 2 months ago

本篇文章中,我們想要知道以下的重點 :

  1. passport 是啥鬼 ?
  2. 要如何使用它呢 ?
  3. 要如何使用一個 passport 的登入系統呢 ?

passport 是啥 ?

passport.js是 node 中的一段登入驗證中間層(middleware),也就是說可以讓你簡單的使用 google 登入使用 fb 登入,它的架構就是所謂的策略模式,接下來我們來實際上看看他是如何使用的。

passort.js 活著的目的就是為了驗證 request

要使用 passport 來進行驗證,需要設定三個東西 :

  • 驗證策略 (Authentication strategies)
  • 應用程式的中間件 (Application middleware)
  • Sessions (可選擇)

驗證策略的建立

上面我們有提到 passport 本身就是使用策略模式的實作,而它的定義就是 :

定義一系列的演算法,把它們一個個封裝起來,並且可以相互替換。

所以在這邊,我們需要定義驗證的策略(演算法),例如使用 facebook 登入驗證、google 登入驗證或自訂的驗證策略。

而我們這裡直接看官網的自訂驗證策略localStrategy,下面的程式碼中,我們會定義一個localStrategy,它準備用來驗證我們的request

LocalStrategy的兩個參數為optionsverify,我們option需要先定義要用來驗證的欄位usernamepassowrd,然後verify就是驗證規則,就是下面那個function裡面的東東。

var users = {
    zack: {
        username: 'zack',
        password: '1234',
        id: 1,
    },
    node: {
        username: 'node',
        password: '5678',
        id: 2,
    },
}

// LacalStrategy(options,verify)
var localStrategy = new LocalStrategy({
    usernameField: 'username',
    passwordField: 'password',
},
    function (username, password, done) {
        user = users[username];

        if (user == null) {
            return done(null, false, { message: 'Invalid user' });
        };

        if (user.password !== password) {
            return done(null, false, { message: 'Invalid password' });
        };

        done(null, user);
    }
)

那這邊有個問題 ~ 那就是奇怪,為什麼他沒有驗證 username 或 password 這兩個欄位是否合法呢 ?

因為LocalStrategy已經幫我們處理好了,我們直接來看一下它的原始碼 :

Strategy.prototype.authenticate = function(req, options) {
  options = options || {};
  var username = lookup(req.body, this._usernameField) || lookup(req.query, this._usernameField);
  var password = lookup(req.body, this._passwordField) || lookup(req.query, this._passwordField);
  
  if (!username || !password) {
    return this.fail({ message: options.badRequestMessage || 'Missing credentials' }, 400);
  }
  
  var self = this;
  
  function verified(err, user, info) {
    if (err) { return self.error(err); }
    if (!user) { return self.fail(info); }
    self.success(user, info);
  }
  
  try {
    if (self._passReqToCallback) {
      this._verify(req, username, password, verified);
    } else {
      this._verify(username, password, verified);
    }
  } catch (ex) {
    return self.error(ex);
  }
};

上面這一段是當我們執行了下面這段時程式碼時,就會執行的東東,從上面程式碼中我們可以知道,它會先去req.body中,尋找我們定義的兩個欄位usernamepassowrd,然後檢查看看他是否合法,當一切都 ok 時,我們就會執行上面有提到的verify函數,來進行驗證。

Passport.authenticate('local', { session: false })

中間件的設定

接下來我們將要在route中,增加 passport 這個中間件 (middleware),我們這邊選擇使用 express 來當我們的 web framework。

我們在使用時需要先選擇我們要使用的策略,我們直接用上面所建立的localStrategy,如果你有建立其它的例如facebookgoogle的策略也都可以使用。

// 註冊策略
Passport.use('local', localStrategy);

var app = Express();
app.use(BodyParser.urlencoded({ extended: false }));
app.use(BodyParser.json());
app.use(Passport.initialize());

其中Passport.use('local', localStrategy);這行就是將我們剛剛建立的策略註冊到 passport 中,我們直接看他的原始碼,會更了解它在做啥 :

嗯他非常的簡單,就是將我們的註冊的策略丟到一個物件中。

Authenticator.prototype.use = function(name, strategy) {
  if (!strategy) {
    strategy = name;
    name = strategy.name;
  }
  if (!name) { throw new Error('Authentication strategies must have a name'); }
  
  this._strategies[name] = strategy;
  return this;
};

然後呢我們就要在 route 上加 passport 中間件,這樣的話,我們每一個進來到這個 route 的 request 都會被 passport 我們指定的策略進行驗證。

app.post(
    '/login',
    Passport.authenticate('local', { session: false }),
    function (req, res) {
        res.send('User ID ' + req.user.id.toString());
    }
);

Session的設定

我們在驗證完畢後應該是會取得到某個使用者,像我們上面範例中的這行 :

user = users[username];

當然,這只是範例,正常情況下應該是去 db 或其它地方取得使用者,但我們這裡就一切從簡。

接下來我們兩個問題 ~

session 和 cookie 是啥 ?

雖然我還算理解,但是這邊還是簡單的說一下。

首先這兩個都是個儲放機制。

再來 session 是只能在 server 進行維持,每當 client 在連接 server 時,會由 server 產生成一個唯一的 sessionId,並用它來連接 server 內的 session 存放空間,而通常來說 sessionId,也同時會保存在客戶端的 cookie 中,每次 client 在訪問 server 時都會用它來存取 session 資料。

而 cookie 則是在客戶端的儲放機制,它是由瀏覽器來維持,但注意,它可以在 client 端與 server 端進行修改,為什麼會有 cookie 呢 ? 主要就是因為 http 是無狀態的協議,每一次讀取頁面時,都是獨立的狀態,所以就需要使用 cookie 來連結前後文。

對了 cookie 還有一點要記,那就是每一次的請求,cookie 都會一起被發送到 server 端喔。

我們懂了以下的基本知道後,再來問個問題。

我們每一次登入,就要取資料庫驗證和取得一次使用者嗎 ????

答案是否定的。

正常不太會這樣處理,假設我們是用 fb 登入,那不就變成,每一次使用者到這頁面時,畫面都會掉到 fb 要你登入,然後在跳回來原本頁面,這樣太浪費時間了。

所以說,passport 它會做以下兩件事情 :

  • 將使用者資訊存放在 server 的 session 中。
  • 然後會在使用者的瀏覽器設定 cookie。

那我們在 passport 要如何使用呢 ? 首先關於第一點,passport 提供了serializeUser讓我們將使用者資訊存放置 server 的 session中。

passport.serializeUser(function(user, done) {
  done(null, user.id);
});

然後關於第二點,每一次進行請求時,passport 都會將傳進來的 cookie 中某個存放該session資訊的欄位,取得到我們剛剛存的user.id,然後在使用它,來取得完整的user資訊,並將它存放到req.user之中。

passport.deserializeUser(function(id, done) {
  User.findById(id, function(err, user) {
    done(err, user);
  });
});

實作一個登入系統

我們這邊來實作個登入系統,使用者只要有登入後,接下來的 route 都可以從 session 中取得到該名使用者的資訊。

所有的程式碼

1. app.js 基本的註冊

var Passport = require('passport');
var LocalStrategy = require('passport-local').Strategy;
var Express = require('express');
var BodyParser = require('body-parser');
var session = require('express-session');
var cookieParser = require('cookie-parser');

var app = Express();
app.use(BodyParser.urlencoded({ extended: false }));
app.use(BodyParser.json());
app.use(cookieParser());

app.use(session({
    secret: "test",
    resave: false,
    saveUninitialized: false,
}))
app.use(Passport.initialize());
app.use(Passport.session()); // 一定要在 initialize 之後

2. 驗證策略

var users = {
    mark: {
        username: 'mark',
        password: '1234',
        id: 1,
    },
    node: {
        username: 'node',
        password: '5678',
        id: 2,
    },
}

var localStrategy = new LocalStrategy({
    usernameField: 'username',
    passwordField: 'password',
},
    function (username, password, done) {
        user = users[username];

        if (user == null) {
            return done(null, false, { message: 'Invalid user' });
        };

        if (user.password !== password) {
            return done(null, false, { message: 'Invalid password' });
        };

        done(null, user);
    }
)

Passport.use('local', localStrategy);

3. 建立登入的route

下面為我們登入的 route 建立。

app.post(
    '/login',
    Passport.authenticate('local',{session: true}),
    function (req, res) {
        res.header("Access-Control-Allow-Origin", "*");
        res.header("Access-Control-Allow-Headers", "X-Requested-With");
        res.send('User ID ' + req.user.id.toString());
    }
);

那 passport 是那裡置至 session 和 cookie 呢 ?

答案是在這裡 :

passport.serializeUser(function(user, done) {
  done(null, user.id);
});

然後我們來呼叫這個 route,然後你到 chrome 的 application 看,你會發現,他有存放個 cookie 。

4. 建立取得使用者的 route

然後接下來,我們執行http://127.0.0.1:3000/getInfo後,這段程式碼app.use(Passport.session());就會將我們從前端傳回來的 cookie,進行分析,並和 session 進行比對,然後就會將使用者資料存放到req.user裡囉

app.get('/getInfo',function(req,res){
    const user = req.user;
    res.send(user);
})

注意點 : deserializeUser 無法被呼叫到

有一點要注意一下,如果你發現你的deserializeUser老是無法被呼叫到,那問題是在下面這段 :

app.use(session({
    secret: "test",
    resave: false,
    saveUninitialized: false,
}))

有些人會寫成下面這樣,cookie: { secure: true }這個參數需要配合https才能使用。

你的好朋友 stackoverflow

app.use(session({
    secret: 'goodjob secret',
    resave: false, // don't save session if unmodified
    saveUninitialized: false,
    cookie: { secure: true },
}));

參考資料

 
2 months ago

上一章節中我們有提到rabbitmq,它是用來建立中介式架構broker,但這種架構有什麼問題呢 ? 那就是分散式架構的頭號公敵單點失效(single point of failure)。

所以後來就有人提出使用對等式架構來解決這個問題,這個架構就是會將broker給移除掉,每一個用戶端同時也是伺服器端,像比特幣這種應用就是用該結構來處理。

但相對的,它也有缺點,那就是要建置起來較為複雜,用在大規模的網路上,管理較難、安全性較低。

使用 ZEROMQ 進行對等式架構 (peer-to-peer)實作

zeromq它是一套網路通訊函式庫,記得他不是一個伺服器,而是一個lib,它在socket api之上做了一層封裝,將網路、通訊、進程等抽象化為統一的 API 接口,它和 socket 的區別是 :

  • socket : 點對對的關係(一對一)
  • zeromq : 多點對多點的關係(多對多)

那 zeromq 有什麼特點呢 ? 它有以下四個特點 :

  • 去中心化 (無 broker)
  • 強調訊息收發模式
  • 以統一的接口支持多種底層通信方式
  • 異步
  • 速度飛快 (請參考這篇比較)

不過有一點要注意一下,zeromq 它不是一個獨立的伺服器進程 (rabbitmq 是),所以嚴格來說它不是 mq ,它沒有 mq 這種在中間解耦合的能力,事實上他的名命也說了 zero mq 。

zeromq 主要提供三種類型的通訊模式分別如下 :

REQ (request) / REP (reply) 模式

這模式就是傳統的一個 reuest 配一個 response 的模式,非常的簡單。

下面這段程式碼是發送請求(request)的程式碼。

var zmq =  require('zeromq');
var requester = zmq.socket('req');

requester.on('message', function (reply) {
    console.log(`Received reply: ${reply.toString()}`)
})

console.log('Send msg');
requester.send('Hello Mark');


requester.connect('tcp://localhost:5555');

process.on('SIGINT', function () {
    requester.close();
})

然後下面這段程式碼為收到請求後,進行回傳的程式碼,這也可以理解成一個 server,它會一直等待 request 的 loop,然後針對每次的請求都進行回覆(Reply)。

var zmq = require('zeromq');

var responder = zmq.socket('rep');

responder.on('message', function (request) {
    console.log(`Received request : ${request.toString()}`);

    setTimeout(function () {
       responder.send("Ok ~ I Received your msg"); 
    },1000);
})

responder.bind('tcp://127.0.0.1:5555', function (err) {
    if(err){
        console.log(err);
    }else{
        console.log('Listening on 5555');
    }
})

process.on('SIGINT', function () {
    responder.close();
    
})

不過這邊有二點要注意,當你將 server 進行重啟時,client 不會自動的重新連上 server ,如果想要建立一個高可靠性的 server 請參考官網該篇文章,它說明的很詳細囉 ~

reliable-request-reply

而另外一點就是,不論先開啟 client 或 server 都沒關係,在傳統觀念上 server 就是要先開,然後 client 才連的上,但在這裡,它們的關係是節點對節點,也就是說沒有主或從的關係,只有誰發誰送的問題。

Pub / Sub 模式

它基本上是一種很常見的設計模式,像我們在使用jquery時的事件機制就很常看到它,如下 :

$(".test").on('click', function(){
    /do something...
})

上面的程式碼中,當頁面獨發了click事件後,就會發佈(pub)一個訊息,給有訂閱(sub)的使用者說,我獨發了 click 了喔,然後使用者在來處理獨發後的事情。

這種模式的優點就在於解耦合,發佈者無須預先知道訊息的接受者,則也使得這種模式很適合用在變化多端的分佈式架構中。

我們簡單的用一句話來說明 zeromq 的 pub/sub 模式,就是下面這句 :

當訊息透過 pub socket 傳送後,便會擴播至所有已連線的 sub socket

這種類型的模式,很適合用來處理股價報價,每個 subscriber 都會去和 publisher 訂閱事件,當有新個報價時,就會通知所有有訂閱報價的 subscriber。

接下來我們來開使實作程式碼。

首先我們下面這段程式碼是用來建立 zeromq 的publisher,也就是會將訊息從這邊發送出去給已連線的subscriber

// pub.js

var zmq = require('zeromq');
var pubSocket = zmq.socket('pub');

pubSocket.bindSync('tcp://127.0.0.1:3000');
console.log('Publisher bound to port 3000');

setInterval(function(){
    pubSocket.send(['mar',new Date()]);
},1000);

而下面這段程式碼就是subscriber,它用來訂閱訊息來源,然後會使用on這監聽器,來收得 pub 過來的訊息。

// sub.js
var zmq = require('zeromq');

var subSocket = zmq.socket('sub');
var port = "3000";

subSocket.connect(`tcp://127.0.0.1:${port}`);
subSocket.subscribe('mark');
console.log(`Subscriber connected to port ${port}`);

subSocket.on('message', function(topic, message){
    console.log(topic.toString());
    console.log(message.toString());
})

我們可以看你心情來決定要先開啟publishersubscriber,zeromq 它有提供一個機制,他會自動重新連線,也就是說,當然二個都開啟後,如果將publisher關掉在重啟,你的subscriber還是可以繼續收到資料。

然後我們來執行程式碼看看。我們會開啟一個publisher和二個subscriber

node pub.js
node sub.js
node sub.js

然後我們應該是會看到如下的結果,兩個subscriber每隔十秒鐘會收到一次從publisher來的資料。

mark
Thu Jul 20 2017 17:18:58 GMT+0800 (CST)

mark
Thu Jul 20 2017 17:18:59 GMT+0800 (CST)

Push / Pull

這種模式又被稱為管道(pipe)模式,它是單向的,從 push 單向推送到 pull 端,這種模式和上面的pub/sub最模式最大的差別在於 :

push 傳送的一堆資料,會被平均分散至多個 pull 端,就像是 load balance的機制一樣。

以下的程式碼為 pull 端的建立。

// pull.js 

var zmq = require('zeromq');
var pullSocket = zmq.socket('pull');

pullSocket.connect('tcp://127.0.0.1:3000');
console.log('Worker connected to port 3000');

pullSocket.on('message',function(msg){
    console.log(msg.toString());
})

而下面的程式碼為 push 端的建立。

// push.js
var zmq = require('zeromq');
var sockPush = zmq.socket('push');

sockPush.bindSync('tcp://127.0.0.1:3000');
console.log('Producer bound to port 3000');

var i =0;
setInterval(function(){
    sockPush.send(`mark wake up ~ : ${i}`);
    i++;
},1000);

然我們開始執行。

node push.js
node pull.js
node pull.js

這時你會看到下面的結果顯示出,每一個 push 出去的資料都會平分給另外兩個 pull 端。

mark wake up ~ : 10
mark wake up ~ : 11
mark wake up ~ : 12
mark wake up ~ : 14
mark wake up ~ : 16
mark wake up ~ : 18
mark wake up ~ : 20
mark wake up ~ : 22
mark wake up ~ : 24
mark wake up ~ : 13
mark wake up ~ : 15
mark wake up ~ : 17
mark wake up ~ : 19
mark wake up ~ : 21
mark wake up ~ : 23

這種模式事實上很像我們之前所談到的負載平衡,他們的概念的確是一樣的沒錯,這種模式也代表我們可以將一個複雜的任務平均分配下去,當各 pull 端完成時,在全部一起收集起來使用。

接下來我們再來建置一個分散式的雜湊碼破解器 ~

建立一個分散式的雜湊碼破解器

這個應用主要是可以根據一組字母表做出各種排列組合,藉此對輸入的雜湊碼(MD5、SHA1等)來進行破解,這個架構就是一個典型的平行管線。

這個爆力破解的過程如下

首先我們會先建立一個push端,他們將我們指定的字串,進行各種排列組合,例如abc,會產生abcacbbac等……,然後使用串流來讀取出來,並且 push 到每一個 pull端。

我們下面的程式碼中alphabet代表這我們要進行的排序組合,然後不可能英文 26 個字母全部排列,會出人命的,所以我們會用maxLength來進行限制,我們該值為 4 的意思代表只從 26 個字母內選取出 4 個字來進行排列組合。

也因為上面 4 個字的限制,我們測試時輸入的單字要只有 4 個字母。

//ventilator.js
var zmq = require('zeromq');
var variationsStream = require('variations-stream');
var alphabet = 'abcdefghijklmnopqrstuvwxyz';
var batchSize = 10000;
var maxLength = 4;
var searchHash = process.argv[3];

var ventilator = zmq.socket('push');
ventilator.bindSync('tcp://127.0.0.1:5000');

var batch = [];
variationsStream(alphabet, maxLength)
    .on('data', function (combination) {
        console.log(combination);
        batch.push(combination);
        if (batch.length === batchSize) {
            var msg = {
                searchHash: searchHash,
                variations: batch,
            }
            
            ventilator.send(JSON.stringify(msg));
            batch = [];
        }
    }).on('end', function () {
        var msg = {
            searchHash: searchHash,
            variations: batch,
        }
        ventilator.send(JSON.stringify(msg));
    });

接下來在 pull 端收到從 push 端來的字串後,我們會將該字串轉換成sha1 hash碼,然後我們在將該碼與輸入碼(我們要破解的碼)進行比對,最後當比對到時相同的東西時,我們就會將結果 push 到另一個收集結果的 pull 端 (就是toSink所連結的地方)

// worker.js
var zmq = require('zeromq');
var crypto = require('crypto');
var fromVentilator = zmq.socket('pull');
var toSink = zmq.socket('push');

fromVentilator.connect('tcp://127.0.0.1:5000');
toSink.connect('tcp://127.0.0.1:5001');
console.log('Worker connect to 5001');

fromVentilator.on('message',function (buffer) {
    var msg = JSON.parse(buffer);
    var variations = msg.variations;
    variations.forEach(function(word) {
        console.log(`Processing: ${word}`);            
        var shasum = crypto.createHash('sha1');
        shasum.update(word);
        var digest = shasum.digest('hex');
        if(digest === msg.searchHash){
            console.log(`Found! => ${word}`);
            toSink.send(`Found! ${digest} => ${word}`);
        }
    });
})

其中下面這段,是指將我們從 26 個字母中產生任選出 4 個所產生出的排列組合的單字,進行sha1 hash加密,產生出 hash 碼。

    var shasum = crypto.createHash('sha1');
    shasum.update(word);
    var digest = shasum.digest('hex');

最後,當我們從 worker 那收到破解後的結果,就進行輸入。

var zmq = require('zeromq');
var sink = zmq.socket('pull');
sink.bindSync('tcp://127.0.0.1:5001');

sink.on('message',function (buffer) {
    console.log(`Message from worker: ${buffer.toString()}`);
})

我們來執行看看,我們要先開啟兩個 worker 和一個 sink。

node worker.js
node worker.js
node sink.js

然後我們在開啟ventilator.js,用來開始啟產生單字的排列組合,其中f1b5a91d4d6ad523f2610114591c007e75d15084是指marksha1 hash碼。

node ventilator.js f1b5a91d4d6ad523f2610114591c007e75d15084

然後當破解完,你可以看到 sink 那的輸出結果。

Message from worker: Found! f1b5a91d4d6ad523f2610114591c007e75d15084 => mark

結論

本篇文章中,我們說明了如何使用zeromq進行對等式架構建置,並且還了它的三種模式 :

  • REQ / REP
  • PUB / SUB
  • PUSH / PULL

這三種模式是 zeromq 中的基本,它們還有更多的變化類型,但都只是這三個的組合型,如果想了解更多,官網的資料已經夠多囉,請慢慢自已研究吧 ~

參考資料

 
2 months ago

在前幾篇文章中,我們說明了如何將系統進行擴展,而接下來呢,我們將要說明如何使用訊息佇列來進行整合,事實上之前的每篇文章中都要提到一個名稱IPC通信,其中裡面就包含了訊息佇列 (message queue)

訊息佇列基本上是用來行程間溝通或是同行程內不同執行序溝通,他提供了異步的溝通協定,這個意思就是指當你傳送一堆訊息給 A 時,A 可以不用即時的來處理這些訊息,這也代表這訊息可堆積再處理,白話文就是 :

訊息接受者如果爆了,我訊息發送者還是可以一直發送訊息,等你好了,你還是可以取得完整的訊息。

我們可以想想http協定他是一個同步協定,這也代表你傳送一個request必須等待伺服器發送response

至於我們為什麼要用message queue請參考下面這篇文章,他真的已經寫的很完整了。

使用訊息佇列的十個理由---簡中

然後我們先簡單的說明一下訊息系統的基礎。

訊息系統架構

基本上分為以下兩種 :

對等式 (peer-to-peer)

在對等式的架構下,每一個節點都直接將訊息傳送給接受者,這種方法基本上會比較複雜,因為他還要決定各自結點的的通訊協定,但還是有一些優點 :

  • 避免單點故障。
  • 和中介者模式比較來少了中間一層,速度應該是比較快。
  • 彈性較高。

以下為對等式架構的圖示 :

zeroMQ 他可以幫助我們建立對等式架構

中介者模式 (message broker)

中介者模式就是所有的節點,都會連結到某個broker,一切都由broke來處理,每個節點不需要知道,我和誰溝通,只需要知道要傳送的訊息內容即可。但缺點就是上面對等式的優點。

以下為中介者架構的圖示 :

RabbitMQ 就是專門用來建立這個架構的東東。

接下來的文章中,我們將要先來實作一些rabbitmq

RabbitMQ

在上面的章節中,我們應該有說到,分佈式架構除了對等式架構外,還有一個是中介者架構,中介者的主要作用就是讓訊息接受者與傳送者之間完全的解偶,而rabbitmq就是一個支援AMQP (Advanced Message Queuing Protocol)協議的中間介者。

如下圖所示,它就是中間綠綠那個,我們稱他為中介者 broker

AMQP是什麼 ?

它是一種協議,AMQP 是一個提供統一訊息服務的應用層標準協議(osi第七層),也就是設定於其它應用軟體之間的通訊,像 http、https、ftp 等都是應用層協議。

根據該協議,客戶端與訊息中間件(broker)可傳送訊息,不受客戶端/中間件不同產品,不同開發語言的條件限制。

它有三個總要概念

佇列 (queue) :

這東東它是儲存訊息的架構,然後裡面的訊息它會被客戶端拿走。一個佇列可能會推多個客戶端取走訊息,這時處理的方式和我們之前說的負載平衡差不多。

佇列它還有以下三種特性 :

  • 可延續性 : 意即若 broker 重新啟動時,則佇列也自動重新建立。那裡面的訊息著麼辦呢 ? 只有被示為需保訊的訊息,才會存入磁碟,並於重啟時還原。
  • 專用性 : 一個佇列可綁定特定的訂閱著,若彼此連線關閉時,則該佇列會說掰掰。
  • 自動刪除 : 當有沒任何訂閱者連線時,便刪除佇列。
交換器 ( exchange ) :

他主要的功能是將訊息傳輸送到一個或多個佇列,這個東西就是放在producerqueue之間,架構大概長的如下圖 :

這邊我們會有一個問題,那就是為什麼需要這個 exchange 而不是直接producerquenu間溝通就好呢 ? 這點我們後面在來說明。

綁定器 ( bind ) :

這個就是上面的交換器與佇列之間的連結

以上的東西都會被中介者管理,它會建立一個抽象的通道,用於維護與中介者之間的通訊狀態。

使用 rabbitmq (mac)

首先我們要先安裝 rabbitmq :

brew update 
brew install rabbitmq

然後安裝好後,我們需要去系統的.zsrch.bashrc設置路徑,這樣我們才能在 terminal 上執行他的指令。

PATH=$PATH:/usr/local/sbin

然後我們就可以執行下列指令開啟rabbitmq server :

rabbitmq-server

如果看到下列的出輸結果,則代表開啟成功。

              RabbitMQ 3.6.9. Copyright (C) 2007-2016 Pivotal Software, Inc.
  ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/
  ##  ##
  ##########  Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log
  ######  ##        /usr/local/var/log/rabbitmq/rabbit@localhost-sasl.log
  ##########
              Starting broker...
 completed with 10 plugins.

如果要關啟請執行下列指令,而不要使用alt + c

rabbitmqctl stop

接下來我們就安裝 node 的 rabbitmq 的 lib ampqplib

npm install --save amqplib

使用rabiitmq實作一個 Producer => Queue 的架構

接下來我們從最簡單的使用開始,我們會簡單的實作如下圖的架構,共有三個實體producerqueueconsume,其中queue現階段就是我們的rabbitmq server

首先我們會先建立一個producer來負責產生訊息,從下面的程式碼我們可以知道,它需要先連線到我們的rabbitmq server,並且建立一個通道,然後,我們可以指定要將資料送到特定的queue,而我們 queue 取名為mark

其中,下面的程式碼中,比較需要注意的下面這行,這行是會判斷該 queue 是否存在,如果不存在則建立,並且durable : false代表的意思為當 queue 重開裡面的資料會消失

ch.assertQueue(quenu_name, {durable: false});
var amqp = require('amqplib/callback_api');

amqp.connect('amqp://127.0.0.1', function (err, conn) {
    console.log("connect amqp server !");

    conn.createChannel(function (err, ch) {
        var quenu_name = "mark";

        ch.assertQueue(quenu_name, {durable: false});

        ch.sendToQueue(quenu_name, new Buffer('Hello Mark'));
        console.log('Send a message');
    })
})

再來我們來實作consumer來當作我們訊息的接受者,一樣也是需要去連線到radditmq server和通道,然後會監聽這個通道所傳下來的訊息。

其中我們的 ch.consume有個參數是{noAck: true},代表這如果該channel不會使用act來進行確認,而如果是 false 它代表的意思為,如果中介者未收到 ack (確認),則訊息就會被保留在佇列裡以待再次處理。

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://127.0.0.1', function (err, conn) {
    conn.createChannel(function (err, ch) {
        var queue_name = "mark";

        ch.assertQueue(queue_name, { durable: false });
        console.log("Waitting the meesages");
        ch.consume(queue_name, function (msg) {
            console.log(`Received the msg : ${msg.content.toString()}`);

        }, {noAck: true});
    })
})

最後我們來執行兩次producer,然後再打開consumer來接受資料,你會發現,他還是接受的得,而且二次傳的資料都有收到,這就是我們前面有提供的message quenue的優點 :

consumer 如果爆了,我 producer 還是一直可以發送資料,等你好了,你還是可以取得完整的資料。

這邊我們提出一個問題。

那 quenue 應該有最大值吧,不然一直沒有訊息流出,遲早會爆的吧 ?

嗯沒錯,所以radditmq它有提供兩個方法來設定每個queue的最大值 :

  • 使用 policy 來進行設定。
  • 使用參數來設定。

我們簡單的使用參數來限定,下面指令設定 quenue 最大為10 byte

ch.assertQueue(queue_name, { durable: false, maxLength: 10  });

那如果 queue 超過10 byte後會如何呢 ?

我們實際執行看看就知道,首先每 0.5 秒傳送一個數字給 queue 結果如下:

Send a message:1
Send a message:2
Send a message:3
Send a message:4
Send a message:5
Send a message:6
Send a message:7
Send a message:8
Send a message:9
Send a message:10
Send a message:11
Send a message:12
Send a message:13
Send a message:14
Send a message:15
Send a message:16
Send a message:17
Send a message:18
Send a message:19
Send a message:20

然後我們 consumer 接收的結果如下 :

Received the msg : 11
Received the msg : 12
Received the msg : 13
Received the msg : 13
Received the msg : 14
Received the msg : 15
Received the msg : 16
Received the msg : 17
Received the msg : 18
Received the msg : 19
Received the msg : 20

從上面的結果可知,因為每個單字為 1 byte,所以當 queue 儲放了前十個數字後,就超過 10 byte了,然後它將先進來的就先把他刪除,所以結果才能如上面所示,只收到 11 以後的數字。

使用rabiitmq實作一個 Producer => Exchange => Queue 的架構

實際上在 rabbitmq 中,我們的 producer 完全不會直接傳送訊息給 queue,producer 完全不知道會傳給那個 queue ,而是透過 exchange 來進行處理,而這邊就準備回答我們上面問的一個問題。

為什麼要有 exchange 呢 ?

我在 quora 發現也有人有相同的問題 (Why does RabbitMQ have both exchanges and queues?)。

然後我發現有一個老兄的說明很傳神,我簡單的說明一下。

想像一下你在 app store ,當你進入店內後,有一位服務員問你說 "你需要什麼呢 ? " ,然我回答說 "我要尋找耳機",然後該名服務員就將你引導到處理耳機的 queue ,接下來下一客人進來服務員就問 "你需要什麼呢 ?" ,然後該名客人回答 "我要修理我的電腦",然後服務員就將它引導到處理修改電腦的 queue 去處理。

所以根據那位老兄的說明,你可以發現服務員就是我們的Exchange,如果沒有了它,那就代表這每一位進來店裡的老兄,只能自已去尋找所需要的 queue ,而且還可能找錯,那是不是很浪費時間呢 ?

在理解了exchange功用後,我們就開始來它的實作。

我們先來畫張圖,你應該會更容易理解,我們等一下做的東西,其中我們先實作 error 那條線,其它的線事實上大同小異。

首先我們來建立producer,下面的程式碼中,可以看到我們會建立一個exchange而不在是queue,並且我們發送訊息的對像也改為exchange

還有我們在發送訊息時,有指定routing_key,這也代表我們有指定發送到的綁定 routing key 為error的 queue 。

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://127.0.0.1', function (err, conn) {
    console.log("connect amqp server !");

    conn.createChannel(function (err, ch) {
        var exchange_name = "logs";
        var msg = "Hello mark";
        var routing_key = "error";

        ch.assertExchange(exchange_name, 'direct', { durable:false });
        ch.publish(exchange_name, routing_key, new Buffer(msg));
        console.log(`send msg: ${msg}`);
    })
})

其中下面這段程式碼中的direct代表這個 exchange 的種類,我們必須給定這個 exchange 的類型,用來決定要如何將訊息從 exchange 發送出。

ch.assertExchange(exchange_name, 'direct', { durable:false });

它主要有四種類型 :

  • direct : 會需要設定一個叫routing key的參數綁定 queue,然後在發送訊息時指定routing key,exchange 就會將該訊息傳送到指定的 queue (上面範例所使用的)。
  • topic : 大至上和 direct 相同,但這裡的routing key可以用匹配的方式。
  • fanout : 此類型的路由規則最簡單,就是收到訊息後,將它發送到所有綁定的佇列中。
  • header : 它的路由規則是由header來判斷,如果要做一些複雜的路由規則,那就用他。

接下來我們來看看consumer的程式碼,如下,我們在綁定 queue 時有指定 routing_key ,就代表這個 consume 只能收到指定的 queue 的訊息。

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://127.0.0.1', function (err, conn) {
    conn.createChannel(function (err, ch) {
        var exchange_name = "logs";
        ch.assertExchange(exchange_name, 'direct', { durable: false });

        ch.assertQueue('', { exclusive: true }, function (err, q) {
            console.log('Waiting for messages');

            var routing_key = 'error';
            ch.bindQueue(q.queue, exchange_name, routing_key);
            ch.consume(q.queue, function (msg) {
                console.log(`Received msg:${msg.content.toString()}`);
                console.log(`routing key is:${msg.fields.routingKey.toString()}`);
            }, { noAck: true });
        })
    })
})

然後我們就來執行看看,結果如下 :

// producer
connect amqp server !
send msg: Hello mark

// consumer
Waiting for messages
Received msg:Hello mark
routing key is:error

結論

本篇文章中,我們先簡單的說明了message queue的概念,它是我們跨進程與執行緒的溝通工具之一,並且它是屬於異步的架構。

接下來我們也說明了訊息系統架構,它基本上可分兩類,一種為對等式架構,而另一種為中介者架構,前者可以使用zeromq來建立,而後者可使用rabbitmq來建立。

最後呢,我們就開始使用rabbitmq來進行實作,本文中只簡單的介紹一些基本的使用方式。

參考資料

 
2 months ago

在上一篇文章中,我們使用cluster來建立多process的應用,這個方法是我們上一篇所提到X軸擴展的複制的方法之一。

而這一篇文章,我們一樣是要來討論X軸擴展的複制的另一種方法 :

反向代理器

這種擴展的方法為,在不同的 port 或不同的機器上,我們會啟動多個應用程式,然後使用反向代理器來存取這些機器,用來分散流量。

他不會像 cluster 上有一個master process然後將工作分配給多個worker,而是有更多個獨立的程式執行在同一個機器不同 port上或是分散在相同的網路中的不同機器上,然後會以反向代理器為入口,由他處理請求並與後端的伺服器做處理,然後在由他回傳給客戶端。

下圖為該結構的圖示 :

那他這樣做有什麼優點呢 ? 事實上他就是 proxy 的用法,也就是說 :

他可以保護伺服器

反向代理器可以和我們上一章所說的cluster一起使用,例如單一機器使用 cluster 進行垂直擴展,再使用反向代理器來做水平性擴展。

本篇文章中我們將使用最常用來做反向代理器的Nginx

Nginx 做反向代理器,並配置負載平衡

nginx是一個網頁伺服器,它的設計架構和 nodejs 非常的相似,都是單一執行緒架構,並且還有豐富的模組庫和第三方工具可以使用,非常的方便啊。

這邊我們將要使用nginx來作為反向代理器,並且進行負載平衡的功能,它要做的工作就是 :

我們有多台伺服器,然後請求進來要將請求分給其它台伺服器。

首先我們先安裝 nginx

// ubuntu
apt-get install nginx

// mac
brew install nginx

然後我們簡單的建立一個 server,它每一次收到請求時,都會回傳這個工作是那個port來進行處理。

// app.js

const http = require('http');

http.createServer(function (req, res) {
    console.log("master:" + process.pid);

    res.writeHead(200);
    res.write("port:" + this._connectionKey.toString());
    res.end();

}).listen(process.env.PORT || process.argv[2] || 8080, function () {
    console.log('started:' + process.pid);
});

接下來我們直接使用上一篇有說到的forever來開啟四個應用程式,並且每一個都給予指定的port

forever start app.js 8081 
forever start app.js 8082
forever start app.js 8083 
forever start app.js 8084 

然後我們可以執行forever list來看看已啟動的應用程式,這邊每一個實例都是一個 process

info:    Forever processes running
data:        uid  command                                        script                forever pid   id logfile     uptime
data:    [0] P8D5 /Users/mark/.nvm/versions/node/v6.2.1/bin/node server_nochid.js 8081 39585   46636    /Users/mark/.forever/P8D5.log 0:2:23:56.342
data:    [1] MKn0 /Users/mark/.nvm/versions/node/v6.2.1/bin/node server_nochid.js 8082 39618   46637    /Users/mark/.forever/MKn0.log 0:2:23:56.342
data:    [2] wAAK /Users/mark/.nvm/versions/node/v6.2.1/bin/node server_nochid.js 8083 39637   46638    /Users/mark/.forever/wAAK.log 0:2:23:56.339
data:    [3] pcFf /Users/mark/.nvm/versions/node/v6.2.1/bin/node server_nochid.js 8084 39648   46639    /Users/mark/.forever/pcFf.log 0:2:23:56.335

接下來我們就將 nginx 反向代理器設置負載平衡

首先我們先進到某個位置找到nginx.conf,然後套用如下設定,由於不同的系統平台位置有異,所以要自已去找找囉。

worker_processes  1;


events {
    worker_connections  1024;
}

http {
    upstream myproject {
        server 127.0.0.1:8081;
        server 127.0.0.1:8082;
        server 127.0.0.1:8083;
        server 127.0.0.1:8084;
    }

    server {
        listen 80;

        location / {
            proxy_pass http://myproject;
        }
    }
}

我們根據上面的設定,來理解一下他裡面的參數意義。

  • worker_process : 用來指定 nginx 要開啟的子進程的數量,建議根據 cpu 有幾個就開幾個。
  • events : 該模組用來指定 nginx 的工作模式。
  • events.worker_connections : 用來指定每個進程的最大請求數,默認為1024
  • http : 該模組為核心,它負責 http 伺服器的所有配置。
  • http.upstream : 該模組用來處理負載均衡的配置,像我們上面的設定就是將該http的連線經過這個 nginx,然後負載均衡到我們設定的 8081、8082、8083、8084 這四個 server 上。
  • http.server : 它用來定義一個主機。
  • http.server.listen : 定義該主機的 port。
  • http.server.location : 該模組主要用來處理定位的,基本上反向代理負載均衡位置等都要在這處理。

設定好後,我們來確任一下 conf 有沒有設定錯誤,我們執行sudo nginx -t,然後如果正確的話會輸出下列結果 :

nginx: the configuration file /usr/local/etc/nginx/nginx.conf syntax is ok
nginx: configuration file /usr/local/etc/nginx/nginx.conf test is successful

確定成功後,我們還要在執行以下指令,來重新讀取 conf 檔。

sudo nginx -s reload

但有點要注意,如果你之前有將 nginx 停止或關閉記得要在執行sudo nginx來開啟它,不然會出現下面的錯誤訊息。

nginx: [error] invalid PID number "" in "/usr/local/var/run/nginx.pid"

接下來我們做以下的實驗 :

發送請求到http://127.0.0.1後,如果有設置好,目前預設,應該是會每次的請求都會在不同的 port 。

我們直接發個五次請求。

curl -G http://127.0.0.1

然後你會看到回傳的結果如下,下面結果就證實了我們的負載平衡反向代理有設置成功,你每次發送127.0.0.1反向代理器會自動的從四個伺服器(8081、8082、8083、8084)中,選出一個來處理。

port:6::::8081%
port:6::::8082%
port:6::::8083%
port:6::::8084%
port:6::::8081%

但這裡我們就還要在思考一個問題,負載平衡他是如何決定要用那個呢 ?

那要如何決定那個工作分配給誰呢 ?

在 nginx 中,負載平衡模組總共有提供四個方法給使用者。

輪詢 ( round robin )

nginx 預設的分配方法,假設你有個用戶叫 mark ,而以有三個 port 分別為8081、8082、8083,然後輪詢就會你配置的順序來分配請求,如下 :

request 1 => port 8081
request 2 => port 8082
request 3 => port 8083
request 4 => port 8081

基於權重 weight

接下來是權重分配,事實上,上面的輪詢也算是這種類型,只是他weight默認為1,所以這也代表,這四個分別處理1(weight)/4(weight加總) = 25%的資料量。

我們將 nginx conf 修改成下列這樣來測試看看。

upstream myproject {
    server 127.0.0.1:8081 weight=1;  // 10%
        server 127.0.0.1:8082 weight=2;  // 20%
    server 127.0.0.1:8083 weight=3;  // 30%
    server 127.0.0.1:8084 weight=4;  // 40%
}

結果如下統計顯示。

84
83
82
84
81
83
84
82
83
84
--------
84 => 4次
83 => 3次
82 => 2次
81 => 1次

這種方式的優點就在於,你可以將效能較好的伺服器權重設定高一些,讓他可以處理更多的請求。

基於 ip_hash

這種方式會根據每次請求的 ip的 hash 結果作分配,也就是說,同樣的 ip 會固定到同樣的伺服器來進行處理。

upstream myproject {
 ip_hash;
    server 127.0.0.1:8081 ;
        server 127.0.0.1:8082 ;
    server 127.0.0.1:8083 ;
    server 127.0.0.1:8084 ;
}

執行結果,因為我都是本機打,所以當然都會是一樣的。

81
81
81
81
81

這種方法是用來處理session共享的問題,在分布式架構中,有一個問題一定會被問到,那就是 :

要如何處理 session ?

因為在分布式架構中,有多台的伺服器,而我們也知道 session 是存在某台伺服器上,如果我們第一次請求在 port 81 上,而第二次是在 port 82 上,如果兩次都有修改到 session,那就會不同步了。

目前大至上有以下幾種的解法。

  • 不使用session,換用cookie : 簡單,但缺點在於如果用戶禁 cookie 了,你就不用玩了。
  • session 存在數據庫 : 資料庫的I/O就會加重,而且如果資料庫也是分散式的,還要進行 session 資料表的同步。
  • session 存在 redis 中 : 這種方法好像目前都比較推。
  • 將相同 ip 的請求導致同一台伺服器 : 這就是我們現在所說明的功能,這種方法也算不錯用,但有一個前提假設,那就是nginx 是最前端的伺服器,保證得到正確的 ip

結論

本篇文章中說明了以下幾個重點。

  • 為什麼要使用反向代理器與負載平衡。
  • 使用 nginx 建立反向代理器與負載平衡。
  • nginx 所提供的負載平衡分配的方法。

這些都是可以幫助我們處理前一篇所說的X軸擴展中的複制,而他又和上一篇cluster不同點在於cluster是屬於同一台機器的垂直擴展,而本篇所說明的則屬於多台機器的水平擴展,而至於接下來的文章,我們還會繼續討論X軸這個主軸

參考資料

 
2 months ago

本篇文章中將要說明,要如何的擴展 node 應用,從上一篇文章中我們知道, node 它很適合高 I/O的任務,而不適合高 cpu 的任務,最主要的原因在於它的架構,它是單執行緒架構,但是無論單體的伺服器能力在強大,單一執行緒的效能一定會有界限,因此我們必須將應用程式擴展運用。

根據The Art of Scalabiltiy的內容來知道,在擴展時,可以用下列三個維度來描述可擴展性。這也是被稱為擴展立方(scale cube)的東東。

  • X 軸 : 複制
  • Y 軸 : 以服務/功能分解
  • Z 軸 : 以資料來分解

基本上Y軸擴展的方法是屬於微服務(Microservices)的範圍所以本篇也不詳細說明,而Z 軸則屬於資料庫方法所以也不加以說明。

我們本篇將要說明X軸 : 複制,它的白話文概念如下 :

將應用程式加以複制 N 個,這也代表每個實體只須處理 N 分之一的工作量。

傳統的系統可以利用多執行緒,來完整使用整台機器的效能,但 node 則否,因為它是單一執行緒,並且在 64 位元下有1.7GB的限制,接下來我們將介紹 node 擴展的基本機制 cluster

cluster

cluster是在 node 中的內建模組,他讓我們可以建立一個 cluster,可通過父進程來管理一堆子進程,在 cluster 中父進程被稱為master process,而子進程則被稱為worker process

每個傳送的連線都會先到master process然後會在將工作分配到worker process中。

我們根據上一篇的程式碼來進行修改。下面程式碼中,首先請先看if(cluster.isMaster)裡面,當執行時,會使用cluster fork根據 cpu 的數量來新增 process,然後每次fork時都會執行else裡面的程式。

const http = require('http');
const child_process = require('child_process');
const cluster = require('cluster');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
    console.log("master process:" + process.pid);
    console.log("cpu num: " + numCPUs.toString());
    for (var i = 0; i < numCPUs; i++) {
        cluster.fork();
    }
} else {
    http.createServer(function (req, res) {
        console.log("process run:" + process.pid);
        res.writeHead(200);
        res.write(fib(40).toString());
        res.end();

    }).listen(8000, function () {
        console.log('started');
        console.log("process:" + process.pid);
    });
}

function fib(n) {
    return n > 1 ? fib(n - 1) + fib(n - 2) : 1;
}

而我們的執行輸出的結果可以看到,我們的master process的 pid 為95199,其餘四個 worker process的 pid 如下。接下來我們每次打這隻 api 時,會直接從這 4 個 process 中選一個出來執行。

master process:95199
cpu num: 4
started
process:95202
started
process:95200
started
process:95201
started
process:95203

那 node 他是如何決定要用那個 process 呢 ?

自版本0.11.2時變導入了一個循環式負載平衡演算法,它的基本概念就是輪流平均的分配所有可用伺服器的負載。

那我們要著麼樣相互溝通了呢 ?

這個事實上在上一章節有提,process間的相互溝通主要使用IPC的方法,而在 cluster 中因為每個 worker process 的產生都是使用 child_process.fork()來產生,所以相對的他也有提供message來讓我們進行溝通。

那我們為什麼不直接用child_process呢 ?

答案是方便,多進程的運行,我們同時還需要考慮到進程通信子進程管理負載均衡等問題,雖然child_process可以自已寫程式來處理,但cluster就已經幫我們處理好了,為何不直接拿來用呢 ? 對吧。

如果有一個 process 掛掉了會如何 ?

在某些時後,如果某個 process 掛掉了,會如何呢 ? 當然不會著麼樣,只要有處理的話。

cluster當然有考慮到這點,這種功能事實上在可擴展性上很重要,我們簡單的寫段程式碼,讓某個 process 來個隨機掛點,如下程式碼,大約每幾秒鐘就會 error 一次。

http.createServer(function (req, res) {
   ....

}).listen(8000, function () {
    setTimeout(function () {
        throw new Error('Ooops');
    }, Math.ceil(Math.random() * 3) * 10000);
});

然後會監聽clusterexit,該事件代表如果任何一個 worker 離開該master process則會觸發。當我們發生事件時,會先判斷是否錯誤,如果是的話,則在fork()一個worker

雖然掛掉的 worker 可能還在重新建立,但是不會影響到我們應用程式的使用。

if (cluster.isMaster) {

    for (var i = 0; i < numCPUs; i++) {
        var worker = cluster.fork();
        cluster.on('exit', function (worker, code) {
            if (code != 0 && !worker.suicide) {
                console.log('Worker crashed. Starting a new worker');
                cluster.fork();
            }
        })
    }
} 

我想更新應該程式但不想停機

在實務上,某些大型的應該程式是 24 X 7 的在跑,就算是更新也不能停機,所以要著麼解決呢 ? 可行的解決方案是實作 :

零停機時間的重啟

比較白話文的來說明實作過程就是 :

一次只重新啟動一個 worker ,其餘的繼續工作

我們實作的方法參考Miario Casciaro 的 nodejs設計模式一日,首先我們會在SIGUSR2中設置監聽器,當接受到 SIGUSR2 信號時會一個一個將 worker 重新啟動。

其中我們有使用 unix 信號,它也是一樣 IPC 的方法,它是一種異歲的通知機制,主要用來和某個 process 說一個事情已經被發生。

 if (cluster.isMaster) {
    console.log("master process:" + process.pid);
    console.log("cpu num: " + numCPUs.toString());

    process.on('SIGUSR2', function () {
        console.log("Received SIGUSR2 from system");
        console.log("Restarting workers");
        var workers = Object.keys(cluster.workers);
        
        function restartWorker(i){
            if ( i >= workers.length) return;
            var worker = cluster.workers[workers[i]];
            console.log('Stopping worker:' + worker.process.pid);
            worker.disconnect();

            worker.on('exit', function () {
                if(!worker.exitedAfterDisconnect) return;

                var newWorker = cluster.fork();
                newWorker.on('listening',function () {
                    restartWorker(i+1);
                })
            })

        }
        restartWorker(0);
    })
    
}

為了要模擬這種狀態,我們需要使用下面指令,來 kill 掉我們master process,然後當執行這行時,就會執行process.on('SIGUSR2')裡面的指令開啟重新的一個一個啟動 worker。

kill -SIGUSR2 <PID>

結果如下。

Restarting workers
Stopping worker:14239
started
process:14249
Stopping worker:14240
started
process:14255
Stopping worker:14241
started
process:14256
Stopping worker:14242
started
process:14257

不過除了上面自已寫以外,當然還有其它的東西可以完成這項工作。

那就是 forever

這套工具最主要的功用是持繼的保持後台的運作

就算你的程式發生錯誤,他也會自動的幫你重新啟動,而且就像我們上面的說的,要更新應用程式時,他也會保持系統的持繼運作。

他的用法很簡單。

npm install -g forever

然後在執行下面指令,這樣就完成了。

forever start xxxx.js

結論

嚴格來說本篇文章大部份都針對* X 軸 : 複制的方法來進行說明,本篇中所提到的 cluster,就是用來複制的方法之一,但這只是之一,在傳統上也有一些技巧更常被使用到的,那就是在不同 port 或不同機器上啟動應用程式的多個實例,然後在使用一個反向代理器來處理,下一篇文章中我們將會繼續對於X軸的擴展 : 複制的方法進行討論。

參考資料

 
2 months ago

這篇文章中,我們希望學習到 :

在開發nodejs時,如果遇到cpu密集型的任務時,要如何處理 ?

首先我們先來複習一下nodejs的機制一下。

我們都知道nodejs是屬於單一執行序架構,在其它的語言裡,每當有一個請求進來時,它們都會產生一個執行緒,但nodejs則否,他是用一個執行緒就來處理所有的請求,而他的背後就是有個事件機制設計才能做到這種方法。請參考這篇

但為什麼要設計成用單一執行序架構呢?

這邊我們要先來說說I/O操作。

I/O 問題

I/O就是電腦中資料與記憶體、硬碟或網路的輸入和輸出,他基本上是電腦作業裡最慢的事物,I/O操作基本上對 cpu 而言通常負擔很小,但是問題就在於它很耗時

傳統的阻塞I/O設計方式如下 :

data = getData();

print(data);

我們假設getData是要去讀取一個檔案,而這時會等到getData執行完後,就資料傳送給data時我們才可以使用。

那假設我們這個getData要讀很久,那這樣的話其它的請求著麼辦 ?

傳統的作法就會像下面這張圖一樣,系統會分別的開啟不同的執行緒來進行處理,如此一來,當有某個執行緒因I/O操作而阻塞時,就不會影響到其它的請求。

這種作法的缺點就在於 :

開啟執行緒的成本不低,它會消耗記憶體而且引發環境切換

node他著麼處理呢 ?

他使用單一執行緒機制,而他的執行緒中有一個機制被稱為事件機制,簡單的說事件機制可以將所有的請求收集起來,並且將需要長時間處理的工作丟出去工作給其它人做(I/O),然後繼續接收新的請求,就如同下圖一樣,這樣的優點就在於,他可以接受更多的請求,,而不會因為一個長時間的I/O,其它東西就都卡住不能動。

但他也是有缺點的 :

它無法充分利用多核cpu資源

當 Event loop 遇到 CPU 密集型任務會發生什麼事 ?

上面有提到單一執行緒機制有一個缺點,那就是無法統分利用cpu資源,這是什麼意思呢 ?

傳統的方式,每個請求分配一個執行緒,他都可以得到一個不同於自已的 cpu,在這種情況下多執行緒可以大大的提高資源使用效率。

而這也代表的單執行緒他就只能占用一個 cpu ,並且如果某個任務是很吃 cpu 的工作時,這執行緒就會被那個任務占用,導致其它的任務、請求都無法執行。

我們下面簡單的寫一段程式碼來看看會發生什麼事情。

下面這段程式碼裡,我們將簡單的建立一個server,它一收到請求,就會開始計算費波南西數列,這種運算基本上就是一個很耗 CPU 的工作。

const http = require('http');

http.createServer(function (req, res) {
    console.log("master:" + process.pid);

    res.writeHead(200);
    res.write(fib(46).toString());
    res.end();

}).listen(8000, function () {
    console.log('started');
});

function fib(n) {
    return n > 1 ? fib(n - 1) + fib(n - 2) : 1;
}

然後當我們啟動這個 server 後,你會注意到,第一個請求發送以後,你會在 console 看到下面的輸出 :

master:68375

也就是打印出這個process的 pid ,但它會還沒回傳值給第一個請求,然後這時如果你在發送一個請求,你會注意到它沒有打印出 master:68375這段資訊。

為什麼呢 ? 這就是我們上面說的node屬於單一執行緒機制,他就只能占用一個 cpu 並且因為第一個請求的運算還在執行,導致其它的請求都會無法執行,只有等到第一個請求結束後,才會繼續執行。

注意 process 進程thread 執行緒是兩個不一樣的東西

我們這邊簡單的說明一下process進程thread執行緒的關係, 首先在傳統的系統中進程是個容器,而執行緒就是容器中的工作單位

進程就是我們在 window 系統下,打開工作管員裡processes打開的一個一個就是它了,而且你打開每個chrome頁面他都是一個進程,而進程間的通訊則使用IPC方法。

執行緒是包含在進程內的工作單位,在同一個進程裡,所有的執行緒都共享系統資源,但他們同時也都有自已的stackcontext,而且可以共享變數。

那要如何解決呢 ?

開一個新的 process 來處理

在 javascript 中我們可以使用一個叫Web Worker的東西來處理,可以看一下筆者年輕時寫的這篇文章HTML5之走在平行時空的Web Worker

而在 node 中我們則時用child_process,這個模組可以幫助我們建立child process中來說來就是子進程,另我當我們使用這模組中的fork來建立時,它同時會提供IPC通道讓我們可以使用訊息來進行process與process的溝通

接下來我們就是要將執行費波南西數列的運算,丟到另一個子進程中來處理,這樣我們的請求也就可以同時的處理了。

下面為我們修改後的程式碼,我們會使用child_process.fork('./subset.js')來建立子進程,並且我們會使用send方法就資料丟到子進程中,然後在用on('message')來監聽回傳結果。

這種寫法實際執行測試後,你會發生每當你發一個請求時,都會打印出master:68375,這也代表我們的執行緒不會在塞住了,而且你在實際丟兩個請求來測試有用子進程的執行速度,你會發現快了兩倍。

const child_process = require('child_process');

http.createServer(function (req, res) {
    console.log("master:" + process.pid);
    const child = child_process.fork('./subprocess.js');
    child.send({ value: 45 });

    child.on('message', function (m) {
        res.writeHead(200);
        res.write(m.result.toString());
        res.end();
    })
}).listen(8000, function () {
    console.log('started');
});
// subprocess.js

function fibo(n){
    return n>1 ? fibo(n-1) + fibo(n-2) : 2;
}

process.on('message', function (message) {
    console.log("child:" + process.pid)
    process.send({result: fibo(message.value)});
})

但是呢 ? 上面這種寫法還是有個缺點,那就是代表每一個請求都會多開一個子進程,這樣也代表這請求一多就會開了一堆子進程,這樣是很浪費資源的,所以接下來我們會修改一下增加一個 pool 來管理這些子進程,好處在於可以節省資源,而另一個好處可以阻斷服務攻擊 Dos

這個我們就留到下一篇cluster時在來說明囉。

參考資料