over 1 year ago

在前幾篇文章中,我們說明了如何將系統進行擴展,而接下來呢,我們將要說明如何使用訊息佇列來進行整合,事實上之前的每篇文章中都要提到一個名稱IPC通信,其中裡面就包含了訊息佇列 (message queue)

訊息佇列基本上是用來行程間溝通或是同行程內不同執行序溝通,他提供了異步的溝通協定,這個意思就是指當你傳送一堆訊息給 A 時,A 可以不用即時的來處理這些訊息,這也代表這訊息可堆積再處理,白話文就是 :

訊息接受者如果爆了,我訊息發送者還是可以一直發送訊息,等你好了,你還是可以取得完整的訊息。

我們可以想想http協定他是一個同步協定,這也代表你傳送一個request必須等待伺服器發送response

至於我們為什麼要用message queue請參考下面這篇文章,他真的已經寫的很完整了。

使用訊息佇列的十個理由---簡中

然後我們先簡單的說明一下訊息系統的基礎。

訊息系統架構

基本上分為以下兩種 :

對等式 (peer-to-peer)

在對等式的架構下,每一個節點都直接將訊息傳送給接受者,這種方法基本上會比較複雜,因為他還要決定各自結點的的通訊協定,但還是有一些優點 :

  • 避免單點故障。
  • 和中介者模式比較來少了中間一層,速度應該是比較快。
  • 彈性較高。

以下為對等式架構的圖示 :

zeroMQ 他可以幫助我們建立對等式架構

中介者模式 (message broker)

中介者模式就是所有的節點,都會連結到某個broker,一切都由broke來處理,每個節點不需要知道,我和誰溝通,只需要知道要傳送的訊息內容即可。但缺點就是上面對等式的優點。

以下為中介者架構的圖示 :

RabbitMQ 就是專門用來建立這個架構的東東。

接下來的文章中,我們將要先來實作一些rabbitmq

RabbitMQ

在上面的章節中,我們應該有說到,分佈式架構除了對等式架構外,還有一個是中介者架構,中介者的主要作用就是讓訊息接受者與傳送者之間完全的解偶,而rabbitmq就是一個支援AMQP (Advanced Message Queuing Protocol)協議的中間介者。

如下圖所示,它就是中間綠綠那個,我們稱他為中介者 broker

AMQP是什麼 ?

它是一種協議,AMQP 是一個提供統一訊息服務的應用層標準協議(osi第七層),也就是設定於其它應用軟體之間的通訊,像 http、https、ftp 等都是應用層協議。

根據該協議,客戶端與訊息中間件(broker)可傳送訊息,不受客戶端/中間件不同產品,不同開發語言的條件限制。

它有三個總要概念

佇列 (queue) :

這東東它是儲存訊息的架構,然後裡面的訊息它會被客戶端拿走。一個佇列可能會推多個客戶端取走訊息,這時處理的方式和我們之前說的負載平衡差不多。

佇列它還有以下三種特性 :

  • 可延續性 : 意即若 broker 重新啟動時,則佇列也自動重新建立。那裡面的訊息著麼辦呢 ? 只有被示為需保訊的訊息,才會存入磁碟,並於重啟時還原。
  • 專用性 : 一個佇列可綁定特定的訂閱著,若彼此連線關閉時,則該佇列會說掰掰。
  • 自動刪除 : 當有沒任何訂閱者連線時,便刪除佇列。
交換器 ( exchange ) :

他主要的功能是將訊息傳輸送到一個或多個佇列,這個東西就是放在producerqueue之間,架構大概長的如下圖 :

這邊我們會有一個問題,那就是為什麼需要這個 exchange 而不是直接producerquenu間溝通就好呢 ? 這點我們後面在來說明。

綁定器 ( bind ) :

這個就是上面的交換器與佇列之間的連結

以上的東西都會被中介者管理,它會建立一個抽象的通道,用於維護與中介者之間的通訊狀態。

使用 rabbitmq (mac)

首先我們要先安裝 rabbitmq :

brew update 
brew install rabbitmq

然後安裝好後,我們需要去系統的.zsrch.bashrc設置路徑,這樣我們才能在 terminal 上執行他的指令。

PATH=$PATH:/usr/local/sbin

然後我們就可以執行下列指令開啟rabbitmq server :

rabbitmq-server

如果看到下列的出輸結果,則代表開啟成功。

              RabbitMQ 3.6.9. Copyright (C) 2007-2016 Pivotal Software, Inc.
  ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/
  ##  ##
  ##########  Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log
  ######  ##        /usr/local/var/log/rabbitmq/rabbit@localhost-sasl.log
  ##########
              Starting broker...
 completed with 10 plugins.

如果要關啟請執行下列指令,而不要使用alt + c

rabbitmqctl stop

接下來我們就安裝 node 的 rabbitmq 的 lib ampqplib

npm install --save amqplib

使用rabiitmq實作一個 Producer => Queue 的架構

接下來我們從最簡單的使用開始,我們會簡單的實作如下圖的架構,共有三個實體producerqueueconsume,其中queue現階段就是我們的rabbitmq server

首先我們會先建立一個producer來負責產生訊息,從下面的程式碼我們可以知道,它需要先連線到我們的rabbitmq server,並且建立一個通道,然後,我們可以指定要將資料送到特定的queue,而我們 queue 取名為mark

其中,下面的程式碼中,比較需要注意的下面這行,這行是會判斷該 queue 是否存在,如果不存在則建立,並且durable : false代表的意思為當 queue 重開裡面的資料會消失

ch.assertQueue(quenu_name, {durable: false});
var amqp = require('amqplib/callback_api');

amqp.connect('amqp://127.0.0.1', function (err, conn) {
    console.log("connect amqp server !");

    conn.createChannel(function (err, ch) {
        var quenu_name = "mark";

        ch.assertQueue(quenu_name, {durable: false});

        ch.sendToQueue(quenu_name, new Buffer('Hello Mark'));
        console.log('Send a message');
    })
})

再來我們來實作consumer來當作我們訊息的接受者,一樣也是需要去連線到radditmq server和通道,然後會監聽這個通道所傳下來的訊息。

其中我們的 ch.consume有個參數是{noAck: true},代表這如果該channel不會使用act來進行確認,而如果是 false 它代表的意思為,如果中介者未收到 ack (確認),則訊息就會被保留在佇列裡以待再次處理。

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://127.0.0.1', function (err, conn) {
    conn.createChannel(function (err, ch) {
        var queue_name = "mark";

        ch.assertQueue(queue_name, { durable: false });
        console.log("Waitting the meesages");
        ch.consume(queue_name, function (msg) {
            console.log(`Received the msg : ${msg.content.toString()}`);

        }, {noAck: true});
    })
})

最後我們來執行兩次producer,然後再打開consumer來接受資料,你會發現,他還是接受的得,而且二次傳的資料都有收到,這就是我們前面有提供的message quenue的優點 :

consumer 如果爆了,我 producer 還是一直可以發送資料,等你好了,你還是可以取得完整的資料。

這邊我們提出一個問題。

那 quenue 應該有最大值吧,不然一直沒有訊息流出,遲早會爆的吧 ?

嗯沒錯,所以radditmq它有提供兩個方法來設定每個queue的最大值 :

  • 使用 policy 來進行設定。
  • 使用參數來設定。

我們簡單的使用參數來限定,下面指令設定 quenue 最大為10 byte

ch.assertQueue(queue_name, { durable: false, maxLength: 10  });

那如果 queue 超過10 byte後會如何呢 ?

我們實際執行看看就知道,首先每 0.5 秒傳送一個數字給 queue 結果如下:

Send a message:1
Send a message:2
Send a message:3
Send a message:4
Send a message:5
Send a message:6
Send a message:7
Send a message:8
Send a message:9
Send a message:10
Send a message:11
Send a message:12
Send a message:13
Send a message:14
Send a message:15
Send a message:16
Send a message:17
Send a message:18
Send a message:19
Send a message:20

然後我們 consumer 接收的結果如下 :

Received the msg : 11
Received the msg : 12
Received the msg : 13
Received the msg : 13
Received the msg : 14
Received the msg : 15
Received the msg : 16
Received the msg : 17
Received the msg : 18
Received the msg : 19
Received the msg : 20

從上面的結果可知,因為每個單字為 1 byte,所以當 queue 儲放了前十個數字後,就超過 10 byte了,然後它將先進來的就先把他刪除,所以結果才能如上面所示,只收到 11 以後的數字。

使用rabiitmq實作一個 Producer => Exchange => Queue 的架構

實際上在 rabbitmq 中,我們的 producer 完全不會直接傳送訊息給 queue,producer 完全不知道會傳給那個 queue ,而是透過 exchange 來進行處理,而這邊就準備回答我們上面問的一個問題。

為什麼要有 exchange 呢 ?

我在 quora 發現也有人有相同的問題 (Why does RabbitMQ have both exchanges and queues?)。

然後我發現有一個老兄的說明很傳神,我簡單的說明一下。

想像一下你在 app store ,當你進入店內後,有一位服務員問你說 "你需要什麼呢 ? " ,然我回答說 "我要尋找耳機",然後該名服務員就將你引導到處理耳機的 queue ,接下來下一客人進來服務員就問 "你需要什麼呢 ?" ,然後該名客人回答 "我要修理我的電腦",然後服務員就將它引導到處理修改電腦的 queue 去處理。

所以根據那位老兄的說明,你可以發現服務員就是我們的Exchange,如果沒有了它,那就代表這每一位進來店裡的老兄,只能自已去尋找所需要的 queue ,而且還可能找錯,那是不是很浪費時間呢 ?

在理解了exchange功用後,我們就開始來它的實作。

我們先來畫張圖,你應該會更容易理解,我們等一下做的東西,其中我們先實作 error 那條線,其它的線事實上大同小異。

首先我們來建立producer,下面的程式碼中,可以看到我們會建立一個exchange而不在是queue,並且我們發送訊息的對像也改為exchange

還有我們在發送訊息時,有指定routing_key,這也代表我們有指定發送到的綁定 routing key 為error的 queue 。

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://127.0.0.1', function (err, conn) {
    console.log("connect amqp server !");

    conn.createChannel(function (err, ch) {
        var exchange_name = "logs";
        var msg = "Hello mark";
        var routing_key = "error";

        ch.assertExchange(exchange_name, 'direct', { durable:false });
        ch.publish(exchange_name, routing_key, new Buffer(msg));
        console.log(`send msg: ${msg}`);
    })
})

其中下面這段程式碼中的direct代表這個 exchange 的種類,我們必須給定這個 exchange 的類型,用來決定要如何將訊息從 exchange 發送出。

ch.assertExchange(exchange_name, 'direct', { durable:false });

它主要有四種類型 :

  • direct : 會需要設定一個叫routing key的參數綁定 queue,然後在發送訊息時指定routing key,exchange 就會將該訊息傳送到指定的 queue (上面範例所使用的)。
  • topic : 大至上和 direct 相同,但這裡的routing key可以用匹配的方式。
  • fanout : 此類型的路由規則最簡單,就是收到訊息後,將它發送到所有綁定的佇列中。
  • header : 它的路由規則是由header來判斷,如果要做一些複雜的路由規則,那就用他。

接下來我們來看看consumer的程式碼,如下,我們在綁定 queue 時有指定 routing_key ,就代表這個 consume 只能收到指定的 queue 的訊息。

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://127.0.0.1', function (err, conn) {
    conn.createChannel(function (err, ch) {
        var exchange_name = "logs";
        ch.assertExchange(exchange_name, 'direct', { durable: false });

        ch.assertQueue('', { exclusive: true }, function (err, q) {
            console.log('Waiting for messages');

            var routing_key = 'error';
            ch.bindQueue(q.queue, exchange_name, routing_key);
            ch.consume(q.queue, function (msg) {
                console.log(`Received msg:${msg.content.toString()}`);
                console.log(`routing key is:${msg.fields.routingKey.toString()}`);
            }, { noAck: true });
        })
    })
})

然後我們就來執行看看,結果如下 :

// producer
connect amqp server !
send msg: Hello mark

// consumer
Waiting for messages
Received msg:Hello mark
routing key is:error

結論

本篇文章中,我們先簡單的說明了message queue的概念,它是我們跨進程與執行緒的溝通工具之一,並且它是屬於異步的架構。

接下來我們也說明了訊息系統架構,它基本上可分兩類,一種為對等式架構,而另一種為中介者架構,前者可以使用zeromq來建立,而後者可使用rabbitmq來建立。

最後呢,我們就開始使用rabbitmq來進行實作,本文中只簡單的介紹一些基本的使用方式。

參考資料

← Node之可擴展性 --- Nginx反向代理建立 Node之可擴展性 --- 訊息佇列 Message queue (ZeroMQ) →
 
comments powered by Disqus