21 days ago

首先我們先來看看最一開始時,要建立連線會那些事情,假設我們的 server 已經開啟 :

var io = require('socket.io').listen(8080);

io.sockets.on('connection', function (socket) {
    console.log("Hello xxxx client");
});

接下來我們要從前端開始追蹤它做了那些事情。

Client 端它做了什麼呢 ??

在最開始時,一定時前端會去進行連線,那我們來看看他在socket.io-client中什麼地方處理。

我們前端與 server 端連結的程式碼如下,從下面程式碼可知,我們執行io('xxxx')時,他就會去後端建立連線。

<script src="/socket.io/socket.io.js"></script>
<script>
  var socket = io('http://localhost');
  socket.on('connect', function(){});
</script>

然後我們來看看 socket.io-client 的這段程式碼長啥樣子,如下,但下面程式碼我們只要先注意newConnection裡面做的事情,因為我們是要建立新的連線。

lookup 原始碼

function lookup (uri, opts) {
  ....

  if (newConnection) {
    debug('ignoring socket cache for %s', source);
    io = Manager(source, opts);
  } else {
    if (!cache[id]) {
      debug('new io instance for %s', source);
      cache[id] = Manager(source, opts);
    }
    io = cache[id];
  }
  if (parsed.query && !opts.query) {
    opts.query = parsed.query;
  }
  return io.socket(parsed.path, opts);
}

下面這段程式碼為manager裡面的程式碼,大部份都是在進行屬性初使化,並且還有一些重連機制的設定,這邊我們直接來看最下面的this.open部份。

Manger 原始碼

function Manager (uri, opts) {
  if (!(this instanceof Manager)) return new Manager(uri, opts);
  if (uri && ('object' === typeof uri)) {
    opts = uri;
    uri = undefined;
  }
  opts = opts || {};

  ...
  
  if (this.autoConnect) this.open();
}

this.open 裡面的程式碼如下,這段就是socket.io-client建立連線的地方,但這邊要注意,我們實際建立連線的地方為eio(this.uri, this.opts)這段程式碼,所以我們接下來要去看engion.io-client的程式碼。

Manger.prototype.open 原始碼傳送門

Manager.prototype.open =
Manager.prototype.connect = function (fn, opts) {
  debug('readyState %s', this.readyState);
  if (~this.readyState.indexOf('open')) return this;

  debug('opening %s', this.uri);
  this.engine = eio(this.uri, this.opts);
  var socket = this.engine;
  var self = this;
  this.readyState = 'opening';
  this.skipReconnect = false;


 ....

  return this;
};

engine.io-client 實際建立連線的地方

這段程式碼是我們實際建立連線的地方,首先他會判斷我們的transport是什麼,是要用websocket還是polling,然後確定好後,就使用this.createTransport來建立實際上要用的transport,最後在將要使用的 transport 開啟,然後他裡面將會建立連線了。

Socket.prototype.open 原始碼傳送門

Socket.prototype.open = function () {
  var transport;
  if (this.rememberUpgrade && Socket.priorWebsocketSuccess && this.transports.indexOf('websocket') !== -1) {
    transport = 'websocket';
  } else if (0 === this.transports.length) {
    // Emit error on next tick so it can be listened to
    var self = this;
    setTimeout(function () {
      self.emit('error', 'No transports available');
    }, 0);
    return;
  } else {
    transport = this.transports[0];
  }
  this.readyState = 'opening';

  // Retry with the next transport if the transport is disabled (jsonp: false)
  try {
    transport = this.createTransport(transport);
  } catch (e) {
    this.transports.shift();
    this.open();
    return;
  }

  transport.open();
  this.setTransport(transport);
};

前端流程圖

前端這邊,我們最後補上一張流程圖,好讓各位官爺更好的追 code 。

後端接受到請求後,它做了啥 ??

每當我們 socket io server 啟動時,會將 engine io server attach 到 http server (srv) 上監聽request事件,下面程式碼的 srv 就是我們 attach 的 server。

attachServe 傳送門

Server.prototype.initEngine = function(srv, opts){
  // initialize engine
  debug('creating engine.io instance with opts %j', opts);
  this.eio = engine.attach(srv, opts);

  // attach static file serving
  if (this._serveClient) this.attachServe(srv);

  // Export http server
  this.httpServer = srv;

  // bind to engine events
  this.bind(this.eio);
};

當收到一個 http 請求時,會轉到 engine.io 下面這段程式碼中,然後會在handleRequest進行主要處理,它這裡只會簡單的檢查一下 req 這個參數,而這參數就是我們 http request 請求內容。

server.on 原始碼

server.on('request', function (req, res) {
    if (check(req)) {
      debug('intercepting request for path "%s"', path);
      if ('OPTIONS' === req.method && 'function' === typeof options.handlePreflightRequest) {
        options.handlePreflightRequest.call(server, req, res);
      } else {
        self.handleRequest(req, res);
      }
    } else {
      for (var i = 0, l = listeners.length; i < l; i++) {
        listeners[i].call(server, req, res);
      }
    }
  });

接下來,下面這段程式碼為handleRequest程式碼,主要用來處理這個請求。首先他會先使用verify進行檢查,看看這一次的請求是不是合法的,而它主要檢查兩個點,首先是transport的檢查,我們要先確定req._query.transport這個參數是否合法,因為我們是要用這參數來決定我們要用那種傳輸方式,而第二個檢查為sid checksid就是每個 client 的編號,在這邊會檢查該條 sid 是否存在以及如果存在是否合法。

檢查完這個 request 請求後,接下來就看看這個請求的 client 有沒有建立請起,如果沒有則執行handshake,有的話則執行onRequest

handleRequest 原始碼

Server.prototype.handleRequest = function (req, res) {
  debug('handling "%s" http request "%s"', req.method, req.url);
  this.prepare(req);
  req.res = res;

  var self = this;
  this.verify(req, false, function (err, success) {
    if (!success) {
      sendErrorMessage(req, res, err);
      return;
    }

    if (req._query.sid) {
      debug('setting new request for existing client');
      self.clients[req._query.sid].transport.onRequest(req);
    } else {
      self.handshake(req._query.transport, req);
    }
  });
};

接下來我們來看看,如果這個 client 還沒建立的流程,也就是要進行handshake
這個方法最主要的功能就是用來建立一條新的連線,然後最後會發送一個connection事件到socket.io

Server.prototype.handshake 原始碼

Server.prototype.handshake = function (transportName, req) {
  var self = this;
  this.generateId(req, function (err, id) {
    if (err) {
      sendErrorMessage(req, req.res, Server.errors.BAD_REQUEST);
      return;
    }
    debug('handshaking client "%s"', id);

    try {
      var transport = new transports[transportName](req);
      ……
    
    var socket = new Socket(id, self, transport, req);
     ……
   

    transport.onRequest(req);

    self.clients[id] = socket;
    self.clientsCount++;

    socket.once('close', function () {
      delete self.clients[id];
      self.clientsCount--;
    });

    self.emit('connection', socket);
  });
};

當我們engion.io已經建立好連線後,它會發送connection訊息到socket.io,然後它在下面這段程式碼,會收到這個事件,然後他這邊會建立一個 client,並且處理一些連線的事務。

bind 與 onconnection 原始碼

Server.prototype.bind = function(engine){
  this.engine = engine;
  this.engine.on('connection', this.onconnection.bind(this));
  return this;
};

Server.prototype.onconnection = function(conn){
  debug('incoming connection with id %s', conn.id);
  var client = new Client(this, conn);
  client.connect('/');
  return this;
};

然後我們來看看client.connection這段程式碼做的事情,這裡它主要會將這名client加入到nsp中也就是 io 的 namespace 裡,但這邊 socket 還沒有產生喔。

Client.prototype.connect 原始碼

Client.prototype.connect = function(name, query){
  debug('connecting to namespace %s', name);
  var nsp = this.server.nsps[name];
  if (!nsp) {
    this.packet({ type: parser.ERROR, nsp: name, data : 'Invalid namespace'});
    return;
  }

  if ('/' != name && !this.nsps['/']) {
    this.connectBuffer.push(name);
    return;
  }

  var self = this;
  var socket = nsp.add(this, query, function(){
    self.sockets[socket.id] = socket;
    self.nsps[nsp.name] = socket;

    if ('/' == nsp.name && self.connectBuffer.length > 0) {
      self.connectBuffer.forEach(self.connect, self);
      self.connectBuffer = [];
    }
  });
};

然後在add中,會將這個 client 產生一個 socket,然後將這條 socket 進行onconnect,並且在最後,會執行self.emit('connection', socket)這裡也就是我們在最上面,實際上觸發connection事件的地方。

Namespace.prototype.add 原始碼

Namespace.prototype.add = function(client, query, fn){
  debug('adding socket to nsp %s', this.name);
  var socket = new Socket(this, client, query);
  var self = this;
  this.run(socket, function(err){
    process.nextTick(function(){
      if ('open' == client.conn.readyState) {
        if (err) return socket.error(err.data || err.message);

        // track socket
        self.sockets[socket.id] = socket;

        // it's paramount that the internal `onconnect` logic
        // fires before user-set events to prevent state order
        // violations (such as a disconnection before the connection
        // logic is complete)
        socket.onconnect();
        if (fn) fn();

        // fire user-set events
        self.emit('connect', socket);
        self.emit('connection', socket);
      } else {
        debug('next called after client was closed - ignoring socket');
      }
    });
  });
  return socket;
};

我們最後來看一下onconnect實際上做了那些事情,這個方法是當我們 connect 確定建立起來後,會進行的動作,它會將這條 socket 連線加入到 nsp 裡的 connected 這個地方,這個屬性也可以讓我們知道,一個 nsp 中有那些 socket 在進行連線。

然後並且會將這條 socket ,加入到一個已自已 id 為名的房間,所以假設我們要追蹤某個 client ,也可以選擇加入到該名使用者為名的房間,這樣該名使用者收到的事件,我們也都可以收到,不過這只是變化用法,到不是這邊的重點。

最後他會執行this.packet就是會將這個訊息,傳送到 client 端。

onconnect 原始碼

Socket.prototype.onconnect = function(){
  debug('socket connected - writing packet');
  this.nsp.connected[this.id] = this;
  this.join(this.id);
  var skip = this.nsp.name === '/' && this.nsp.fns.length === 0;
  if (skip) {
    debug('packet already sent in initial handshake');
  } else {
    this.packet({ type: parser.CONNECT });
  }
};

後端的流程圖

最後,這邊將提供後端的流程圖,讓我們更容易的理解它的流程。

 
about 1 month ago

socket io 是 nodejs 所提供的套件,它主要可以做的事情就是推播功能

你想想,假設你要做個股票報價網站,然後當你後端收到新的股價時,你要如何的送到前端 ?
在傳統的 server 與 client 架構下,因為只能由 client 向 server 發出請求,而不能由 server 發送新的訊息到 client,所以當時的人們的解決方案就是輪詢,固名思意就是指定時的去 server 找資料。

但這種方案有缺點,你想想,你有可能去 server 抓 10 次資料,它有可能 10 次都有新的資料嗎 ? 不一定對吧 ? 所以最理想的方案一定是從 server 端有新資料就自動推送到 client 端。

websocket就是一個由 html 5 所發布的新協議,它就可以做到上面所需要的功能。

socket.io是啥 ? 它是會根據你的 client 所支援的功能(websocket、comet、長輪詢…)來決定你後端要如何的發送資料,更白話文的說,你不用管你的 client 有沒有支援 websocket,socket.io 一切都自動會處理好,你只要和我說啥時要送資訊到前端就對了。

Socket.io 的組成

請參考筆者的這篇文章。

Socketio 的架構

簡單 client 與 server 的溝通範例

server 端程式碼如下,這段程式碼當與 client 端建立一條 websocket 連線後,會直接對該條連線傳送個{hello: "world"}訊息。

var io = require('socket.io').listen(8080);

io.sockets.on('connection', function (socket) {
    socket.emit('news', { hello: 'world' });
    socket.on('my other event', function (data) {
        console.log(data);
    });
});

前端 :

<script src="/socket.io/socket.io.js"></script>
<script>
    var socket = io.connect('http://localhost:8080');
    socket.on('news', function (data) {
        console.log(data);
        socket.emit('my other event', { my: 'data' });
    });
</script>

這樣就可以完成簡單的推播功能囉。

Socket.io 的 Rooms 與 Namespaces

在 socket.io 有兩個很重要的概念roomsnamespaces,這邊你只需要記好一件事,那就是它們兩個存在的原因都是為了分組,把要傳送的訊息,送到你想要的群組中。

Namespaces

我們先看看namespaces,上面的範例中有沒有注意到,我們都是用io來進行所有的操作,假設我們要放送訊息到所有連線的 socket,那我們只要下達該指令 :

io.emit('news', 'Hi I am Mark')

接下來然後我們也可以使用 rooms 來將 io 裡面的 socket 分類到不同的房間中,所以當我們要傳送指定的訊息到某個房間中只要下達下面的指令 :

io.to('全家就是我家').emit('news', 'Hi I am Mark')

那這邊我想問問,有沒有辦法建立另一個 io 呢 ? 例如我想建立一個是專門處理股票的 io ,而另一個是專門處理期貨的 io ,這時我們就可以使用namespaces裡的of這方法來處理。

例如我們先來建立一個股價的 namespaces :

var stock_io = io.of('/stock');

然後我們就可以使用這個stock_io來進行我們上面提到的所有動作。

namespaces 你可以想成, 子 io ,它可以做所有 io 可以做的事情 (基本上)

Room

接下來我們來說說rooms,這東東的概念和namespace事實上很像,但記好,rooms 是在 namespaces 底下的

假設你有一個需求,有3個用戶在用你的股價報價系統,其中兩個是在看 1101 台泥的股價,而其中一個是在看 2330 台積電 的股價,這時你應該要注意,不能將 2330 的股價推到在看 1101 股價的使用者那。

socket.io 的 rooms 的功能就可以解決上面的需求,rooms的功能白話文就是你可以發送訊息到指定的房間,像 1101 就是一個房間,而 2330 就是另一個房間,所以假設有 1101 的訊息要推送,只要針對該 room 的用戶進行推送就夠了。

那要如何加入到房間呢 ?

如下 :

io.on('connection', function(socket){
  socket.join('1101');
});
那要如何傳送訊息到指定的房間呢 ?

如下園式碼,你就可以發送訊息到這個房間裡。注意這邊是用io來發送訊息,上面簡單的範例是用socket.emit來送是因為上面範例只需要發送訊息給那條 socket 連線就好,而這邊我們是要發送給1101 這個 room中的 socket,所以要用io來發送。

 io.to('1101').emit('xxxx股價')

這兩個的差別

最後終結一下這兩個的差別

你可以把 namespace 的功能想成可以建立多個子 io ,不同的子 io 可以處理自已的事情,也代表有自已的 rooms ,io1 與 io2 兩個如果都有 room 為 movie 的,也不會影響到什麼,因為它們是分屬不同的 io 了。

在 Socket.io 中使用 middleware

在平常我們 web 開發時,有時後會有下面這種需求 :

每一個 http 請求進來前,需要先檢查登入狀態

通常這種時後我們就會建立一個 middleware 來處理登入狀態,這邊要注意 middleware 不是用來專門處理登入的東東,它只是一個概念,它真正的定義如下 :

middleware 又稱中介層,用來處理所有進入或離開主體前的事務。

像我們每個 http 請求進到主體前,我們需要先確認它的狀態,又或者是進到主題前就先將 log 寫好,這些都是可以放置到middleware來處理,你只要記好,主體就只做主體要做的事情就好。

當然在 socket.io 中我們也是有這個需求,例如每當要建立 websocket 連線時,要預先處理的事情,我們都可以建立 middleware 來處理。

socket.io 提供use方法來讓我們建立 middleware ,範例程式碼如下 :

var srv = require('http').createServer();
var io = require('socket.io')(srv);
var run = 0;
io.use(function(socket, next){
  run++; // 0 -> 1
  next();
});

var socket = require('socket.io-client')();
socket.on('connect', function(){
  // run == 2 at this time
});

像上面的官方程式碼中,下面這段就是 midddleware 的使用方法,裡面的 use 就是一個 middleware 方法,在建立 websocket 前會先處理的事情。

io.use(function(socket, next){
  run++; // 0 -> 1
  next();
});

所以假設我們需要先處理log 事務快取事務時,我們程式碼就大概會長如下 :

io.use(logMiddleWare);
io.use(cacheMiddleWare);

function logMiddleWare(socket, next){
    寫 log ~~~
    next();
}

function cacheMiddleWare(socket, next){
    取得暫存資料.....
    next();
}

參考資料

 
about 1 month ago

socket.io 是 node js 的一個 framework,它可以幫助我們建立聊天室這種推播功能的系統,這篇文章我們不會說明它如何使用,而是要理解 socket.io 這個套件的架構組成。

socket.io 主要由以下幾個東東構成的 :

  • engine.io、engino.io-client
  • socket.io-parser
  • socket.io-adapter
  • socket.io-client
  • socket.io-protocol

接下來我們將一個一個說明它們是做啥用的,並且最後會在進行一個總結。

engine.io

engine.io是一個實際執行 socket.io 通訊層級的 libary,嚴格說起來,socket.io 的核心就是engine.io,所有的建立連線、傳輸資訊實際上都是由它來做,並且根據前端傳送回來的資訊,來決定使用什麼傳輸方式。

目前 engine.io 所提供的溝通方式有以下幾種 :

  • polling-jsonp
  • polling-xhr
  • pollin
  • websocket

上面有提到,socket.io 本身不提供連線功能,而是在 engine.io 才提供,所以事實上,如果你沒有一定要使用到 socket.io 的功能,而只是要連線到 http server 或是監聽 port 的話,只要用 engine.io 就夠了,這邊有個重點要記得 socket.io 是個 framework 而 engine.io 只是個 libary,只要分的出這兩個差別,你就可以自由的選你要的使用。

var engine = require('engine.io');
var server = engine.listen(80);

server.on('connection', function(socket){
  socket.send('utf 8 string');
  socket.send(new Buffer([0, 1, 2, 3, 4, 5])); // binary data
});

engino.io-client

engine.io-clientsocket.io-client的核心,所有關鍵的連線、選擇傳輸方式,都是在這裡面執行。

我們這裡來看看,它是從那決定要用那種的傳輸方式(websocket、polling)。

engine.io-client下面這段程式碼(程式碼傳送門)中 ,這段onOpen是在 socket 要與 server 進行連線時,會先執行的事件,其中,就是由this.probe(this.upgrades[i])這個方法來決定要用什麼方式(websocket、polling)來進行傳輸。

Socket.prototype.onOpen = function () {
  debug('socket open');
  this.readyState = 'open';
  Socket.priorWebsocketSuccess = 'websocket' === this.transport.name;
  this.emit('open');
  this.flush();

  // we check for `readyState` in case an `open`
  // listener already closed the socket
  if ('open' === this.readyState && this.upgrade && this.transport.pause) {
    debug('starting upgrade probes');
    for (var i = 0, l = this.upgrades.length; i < l; i++) {
      this.probe(this.upgrades[i]);
    }
  }
};

然後在後端 server ,也就是engine.io裡根據前端傳送回來的url參數的Transport來決定要用什麼來進行傳輸 :

url 範例 : 

/engine.io/default/?transport=polling

engine.io的下面這段程式碼裡,check用來驗證傳進來的參數是否合法,來決定這個transport參數是否合法,然後再來將 http 升級為 websocket 協義。

server.on('upgrade', function (req, socket, head) {
      if (check(req)) {
        self.handleUpgrade(req, socket, head);
      } else if (false !== options.destroyUpgrade) {
        // default node behavior is to disconnect when no handlers
        // but by adding a handler, we prevent that
        // and if no eio thing handles the upgrade
        // then the socket needs to die!
        setTimeout(function () {
          if (socket.writable && socket.bytesWritten <= 0) {
            return socket.end();
          }
        }, destroyUpgradeTimeout);
      }
    });

socket.io-parser

在 socket.io 的世界中,有一個東西叫做 packet,它是所有溝通的基礎包,事實上它也就是 socket.io 的協議,有一個東西叫socket.io-protocol(傳送門),它裡面有定議好,你這個 packet 要長什麼樣子。

下面為最簡單的 packet 包 :

{
    type: 2,
    data: [{
        word: "hello mark"
    },{
        word: "hello gg"
    }]
}

其中2所代表的為這個 packet 是要用來處理event事件,像要傳送到前端的事件也是用這個代號,這個數字會由 socket.io-protocol 裡定義好。

然後每當我們要傳送 packet 到前端時,都會先使用socket.io-parser`將 packet 給 encode ,而致於為什麼要 encode 後在傳呢 ? 主要原因,可能是希望儘可能的將要傳送出去的 packet 縮小已節省傳輸成本。

像官方所提供的socket.io-parser會將上面的 packet 包 encode 成如下數據 :

"2[{"word":"hello mark"},{"word":"hello gg"}]"

它的確比原本要傳輸的資料還少點兒東西,然後前端在再用相同的socket.io-parser來進行decode,變成原來的 packet 包。

這就是socket.io-parser所做的事情,當然我們也可以自訂一個 parser ,如果你有需要的話,例如你想使用 xml 來當傳輸格式,這時你就要自訂一個 parser 了,雖然覺得應該不會想用xml來傳。

socket.io-adapter

在理解這套件之前,我們先看看adapter這代表什麼意思呢 ? 我們直接用這個單字去 google 圖片一下,然後可以看到下圖的結果 :

嗯哼,就是電源轉接器,在插頭和我們的機器之間所需要的東西,在程式開發中,你有沒有遇過下面這種問題呢 ?

我和 A 要 api 來使用,但發覺它的 api 我需要調整過後才能使用。

所以通常你這時,應該中間會有一個東西,來呼叫這隻 api ,然後再裡面先整理一下,然後才傳出去給主要的方法來使用,對吧 ? 這時那中間的東東就是所謂的adapter它本身就是一種設計模式。

好回來到socket.io-adapter,那我們的插頭和機器各代表什麼呢 ? 先來說說機器,機器就是我們的 socket.io 那插頭呢 ? 嚴格來說是儲存空間,儲啥呢 ? 就是namespace、rooms、sids這些東東。

socket.io-adapter預設是存放在記憶體之內,所以如果你要用 redis 或 mq 之類的來做儲放,你就只要調整你的 adapter 就好,目前官方有提供socket.io-redis可使用,它也是一個 adapter。

基本上你要進行任何 socket.io 提供的 emit、broadcast、join room 這些功能,都一定會到這一層來做處理。

socket.io-client

這個東東就是讓我們在前端使用的東西,假設我們在後端 server 建立好喔,我們就可以如下面這樣,連建立連線 :

<script src="/socket.io/socket.io.js"></script>
<script>
  var socket = io('http://localhost');
  socket.on('connect', function(){});
  socket.on('event', function(data){});
  socket.on('disconnect', function(){});
</script>

然後如果你在後端要寫整合測試時,想要模擬前端,也可以如下使用 :

var socket = require('socket.io-client')('http://localhost');
socket.on('connect', function(){});
socket.on('event', function(data){});
socket.on('disconnect', function(){});

這邊還有一個重點,它與 socket.io 一樣核心都是engine.io-client並且也是由它來決定我們要用什麼傳輸方式(polling、websocket)。

socket.io-protocol

最後是socket.io-protocol,這個事實上上面有說明過了,我們在來複習一次。

它是個協定,它不是程式碼、套件或其它可以執行的東西,它是一個規定,它定議好socket.io 要如何的傳輸資料,它主要定義了以下二個主題 :

Parser API

Parser API上面有提到,它是用來將 packet 包進行 encode 與 decode 的東東,在 protocol 中,它實際定義一個 parser api 需要有那些東西。

下面為它的主要定義

  • Encoder.encode(Object: packet, Function: callback)
  • Decoder.add(Object:encoding)

Encoder就是用來進行 encode 的類別,然後它需要提供encode方法,並且有兩個參數分別為packet 和 callback

Decoder就是要將 packet 進行 decod 會類別,並且需要有個add方法,來進行處理。

像我們上面提到的socket.io-parser就是根據socket.io-protocol所定義的parser api實作出的程式碼。

var parser = require('socket.io-parser');
var encoder = new parser.Encoder();
var packet = {
  type: parser.EVENT,
  data: 'test-packet',
  id: 13
};
encoder.encode(packet, function(encodedPackets) {
  var decoder = new parser.Decoder();
  decoder.on('decoded', function(decodedPacket) {
    // decodedPacket.type == parser.EVENT
    // decodedPacket.data == 'test-packet'
    // decodedPacket.id == 13
  });

  for (var i = 0; i < encodedPackets.length; i++) {
    decoder.add(encodedPackets[i]);
  }
});

Packet

Packetsocket.io世界裡的溝通包包,像你如果要放送訊息或是進行 connect 時,它們都是傳送的都是packet

在 protocol 中,它定義好了,一個 packet 要長什麼樣子 :

{
    type: Number,
    data: [],
    id: Number,
}
type

就是用來決定,這個包是要做什麼事情,protocol 它定義了以下幾種類型 :

  • CONNECT(0)
  • DISCONNECT(1)
  • EVENT(2)
  • ACK(3)
  • ERROR(4)
  • BINARY_EVENT(5)
  • INARRY_ACK(6)
data

就是你這個包傳送的資訊,它常都是我們自已的,不過記好,它是要放成一個陣列。

id

用來識別這個包是誰的,需要時在設定。

總結

最後我們來使用下面這張圖,來總結一下 socket.io 這個 framework 的架構 :

上圖為 socket.io 的整體架構,我們最後來複習一次。

首先最主要的主體為engino.io,所有的連線、傳輸方式的核心都是他,然後當你想要與其它東西進行溝通時,它們統一的溝通元件packet都需要使用socket.io-parser來進行 encode 與 decode ,因為某些傳輸方式 websocket 只能用文字與二進位數據,還有這些定義都會寫在 socket.io-protocol裡面,最後如果想你要與儲存元件溝通,請建立一個 adapter 來處理。

參考資料

 
2 months ago

在上一篇文章中,我們說明了如何的設計像 line 的聊天群的架構設計,而這一篇我們要來說明聊天室的架構設計,這東西和上一篇有什麼差別 ?

通常聊天群是會由用戶提出申請,然後管理者來加入到該群裡,而聊天室則不相同,它是用戶可以自由自在的加入或退出,這也代表這,通常聊天群會限制人數,像 line 好像就限制 500 人,而聊天室則否,他通常不會限制人數。

那這也代表我們要面對什麼問題呢 ? 我目前想想主要有兩個 :

  1. 由於沒有限制人數,所以通常我們的架構要考慮擴展性。
  2. 訊息的即時性非常的要求,如果一個訊息傳輸慢了,會導致其它人無法理解上下文。

最簡單的聊天室架構 V-1

基本上和聊天群的架構相同,都是一個Business Server和一個Message Server,其中前者做的事情是為所有需要使用 http 協議的工作,更正確的說是 http 短連接的工作,如新增聊天室、登入、登出、註冊這類事情的,都屬於 Business Server ,而所有使用 websocket 協議的都是屬於 Message Server 的工作。

聊天室 V-2

上面的架構有沒有啥問題呢 ? 有的 ! 請想像一個情境 :

用戶 A 從 business server 登入後,然後再去 message server 建立連線,但問題是 message server 怎麼知道這條連線是用戶 A 呢 ?

在一般的 web 應用中,每當 client 連結 server 時,server 會產生唯一個 sessionId ,並用它來連結 server 內的存放空間,然後會將 sessionId 存放到 cookie 中,這樣每一次 client 進行請求時,server 都會去 cookie 中取得 sessionId 然後再去 session 取得資料。

從上面的說明可知 session 是存放在 server 中,所以上面的情境變成 business server 產生 session 但問題是 message server 沒有 session。

所以我們將架構修改成如下 :

我們會新增一個 redis 用來專門存放 session 當使用者登入後產生 session 資訊存放入 redis 中,接下來 client 要與 message server 建立連結時,由於它是使用 http 進行連線,所以它會有包含 cookie ,內含已加密過的 sessionId 進來,最後到 message server 時他會用這個 sessionId 再去 redis 取得使用者資訊。而每當登出時,business server 相同的也會更新 redis 裡面的資料。

V-2 版本大致上就是如此 ~

聊天室 V-3

上面的架構理論上基本應該可以運行,但根據我們的要求,用戶數有可能會激增,所以我們這邊的Message Server需要考慮擴展,會優先考慮擴展他主要原因在於,每一個 websocket 都代表這一條 tcp 連線,而由於他是持久連接,白話文就是它會一直佔著一定的資源,它並不像 http 用完一定時間就收,所以如果一個 message server 有 1 萬人在聊天室,就也代表這需要同時維持 1萬條 tcp 連線,會上西天的,所以說我們需要一個代理器來做 Load balance 可以幫助我們將流量分散,架構變成如下 :

其中 Proxy 可以做很多的事情,其中最重要的就是要幫助我們分散流量,它會將我們 http 的請求 (登入、登出) 導到 business server。 websocket 的連接,則導到其中一個message server

這時我們來模擬一下,用戶在聊天室時發送訊息的流程 :

  1. client 發送訊息 。
  2. proxy 收到請求,並將它導至 message server A 。
  3. 然後 message server A 將該訊息 broadcast 到聊天室的用戶 。

目前是建議 proxy 使用 nginx 來處理,他本身就提供了 load balance 的功能,而且它也可以幫助我們來防禦一些 ddos 攻擊。

這邊有一個重點要記得:

它建立websocket 通道是長成:

client ==== proxy ==== message server
而不是
client ==== message server

聊天室 V-4

嗯嗯 ~ 看上去都沒問題。

但這是有前提假設的,那個假設就是 :

那個聊天室的所有用戶,都在同一個 message server 裡 。

你想想如果有個用戶不在 message server A 裡,那 broadcast 時不就代表那個用戶不會收到訊息 ?

那這邊要如何解決呢 ? 我目前想到的二個方法就是 :

  1. 在 load balance 時,將同一聊天室的用戶都導到同一個 message server 裡 (QQ)。
  2. 建立一個 pub/sub 的redis 來處理。

我先說說第一個的方法,我們使用 load balance 根據聊天室來將 websocket 連線導至某台 message server ,也就是說同一個聊天室的用戶都放在相同的 message server 裡,所以當有用戶發訊息到聊天室時,那個特定的 message server 就會自動的 bordcast 出去。

而當用戶要加入某個聊天室,而想發出xxxx加入聊天室時,因為那個聊天室在那個 message server 所以只要往那個 message server 發送訊息。但這種方法的缺點就在於你要如何分配聊天室在那個 message server , 分配不好,大部份流量都往那個 message server 去,反而失去我們擴展 message server 的用因。

這方法我覺得還有一個問題,那就是假設我們其中一台 message server 上西天了,那不就代表該聊天室的用戶都無法進行連線了 ? 除非我們的 load balance 演算法有設計好,不然問題很大。

目前比較推第二個方法,就是架構個 pub/sub 功能的 redis ,如下圖 :

然後我們來說一下它的運作流程,我們先看看下圖,當用戶 A、B、C 加入某個叫 kkbox 的聊天室時,當在建立連線時 message server 會去 redis 進行 kkbox這個 channel 的訂閱 (subscribe)。

接下來當用戶 A 發送訊息時,會前往 redis 對應的 kkbox channel 進行訊息 pub ,然後存在用戶 B、C 的 message server 收到訂閱的敲門,就會將消息推送到用戶 B、C了。

聊天室 V-5

上圖為我們到目前的架構,基本上可以動,理論上應該運行的不錯,但實際上呢 ?
不知道 !,現在環境難以預測,所以可能現實環境會發生下面的事情 :

啊 ! proxy gg 了,所有的請求都不能進來了 !

由於我們 proxy 那裡是使用 nginx 來做 load balance ,所以如果我們那裡掛掉,那我們的還有服務事實上就說掰掰了,所以為了避免這種狀況,我們需要用到keepalive的功能。

keepalive是啥 ? 它主要的功用就是如下 :

在一個集群中,會隨時的檢查每一台機器的健康狀態,保證該集群可以服務。

所以說,我們可以在 proxy 建立一個集群,也代表這可能有多個 nginx,然後我們在搭配 使用 keepalive,這樣每當有一台 nginx 上西天後,keepalive 就會檢查到,然後會立即的轉換成另一台可以用的 nginx,這樣也確保了我們的整體服務不會因為一台 nginx 上西天就全死了。

聊天室 V-6 (在想想)

基本上,上面的架構的確可以運行,但是呢 ~ 我們有沒有辦法確定訊息的即時性是正確的,例如下面的情況 :

聊天室中有 A 然後他發了三句話 :

A(最早) : 肚子餓了吃啥 ?

B(第二) : 吃拉麵如何 ?

C(最後) : 還是吃飯 !

但有沒有可能在實際的聊天室看到的是變成這樣 ?

C(最後) : 還是吃飯 !

B(第二) : 吃拉麵如何 ?

A(最早) : 肚子餓了吃啥 ?

會出現這種狀況,最有可能的場景就是分散式的問題,因為每一個服務都是不同的機器上,而不同的機器上會有自已的本地時鐘。

像假設我們 message server A 在台灣、message server B 在日本,這時他們的本地時鐘就是不相同的,所以我們在判斷訊息的先後順序不能使用 server 端時鐘,同理 client 端的時鐘也不行。

我這邊簡單的整理一下,很難保證時間順序性的原因 :

時鐘不一致 :

就像我們上面說的範例,你放在不同的機器,會有不同的本地時鐘。

多用戶端 :

假設我們有兩個用戶 A、 B ,一個 server,然後 A 先發送訊息,接下來是 B 再發送訊息,但因為網路傳輸的問題,我們不能保證 server 先收到 A 。

多伺服器端 :

假設我們有一個用戶 A ,然後兩個 server A 與 B ,然後先發送訊息到 server A、在發送訊息到 server B ,但因為兩台機器時鐘不一定相同,所以可能會導致時間不正確。這邊你可以想成,假設有 load balance 時,第一次訊息它導至 A,而第二次訊息導至 B。

不好意思,請參考這篇文章 如何保证IM实时消息的“时序性”与“一致性”? ,不過這篇文章中有幾個解法我是有點疑問的,像它裡面有個在單點 server 上,生成有順序的 id ,但問題在於如果你進來的順序就已經亂掉了(網路問題),那你生成的這 id 事實上也沒什麼意思,除非你可以確保,你進到 server 時的順序是正確的,這 id 才能使用。

所以這方面的問題,改天在另生一篇文章出來寫寫,這邊就先降 ~

參考資料

由於本篇參考不少資料,所以以下只列出所參考資料出處。

 
2 months ago

本篇文章中,我們講要說明,如何開發一個簡單的聊天群系統,這個東東雖然我們很常見到,到和我們平常開發的一些 WEB 有很大的差別。

差別在那呢 ? 假設我們開一個todolist功能,事實上大部份的工作就是crud的事情,每當要新增一個 todo 時,只要發送 http 到後端新增資料到資料庫裡去,然後在回傳結果就好了,但聊天群這種,如果你每發送一個訊息都使用 http 那一定爆掉的。

像聊天群這樣類型的,我們稱為InstantMessaging IM中文為即時通訊,本篇文章我們將會說明要建立這種IM應用所需要的基本知識。

開始吧 ~

從 Web 到 IM 的通信過程轉變

在最開始時瀏覽器它沒有辦法直接連接到另一個瀏覽器的通信功能,也就是說你不能從 A client 直接傳送訊息到 B client 去,我們只能在它們的中間,建立一個 server ,來將 A 要傳送的訊息儲放起來,然後 B 在自已去 server 取得資料,如下圖 :

這種做不行嗎 ?

說實話,功能是有做出來 ~ 但浪費太多的資源,你想想,根據上面的說法,當 A 發送訊息到 server 後,你 B 要如何知道 server 有你的訊息 ? 記好 http 只能從client發送到server,不能反之,所以這也代表這你 B 只能定時的去 server 問問看,說有沒有我的資料啊 ~

很明顯的,你可能問了十次,只有一次才有你要的訊息,那其它九次,不就都浪費掉了,這也代表你的 IM 系統有 90 % 的效能在處理沒用的事情

當然中間處理的其它方法先不說,後來 html5 提出了一個應用層的協議websocket,來解決這事兒 ~

Hello WebSocket

這個協議可以幫助我們可以實現,從 server 端推送資料到 client 端,而且從建立的通道是持久連接,在 http 1.0 時你每發送一次請求,都需要做一次 tcp 握手,而 http 1.1 時,則可以多次使用一個 tcp,但在 websocket 你就只要建立一次,當完成握手後,就會產生一個全雙工的通道。

全雙工代表這,可以從任何一方傳送資料或接受資料。

它的 server 與 client 的運作圖如下 :

最簡單的聊天群架構

從上面的概念中我們知道,我們這邊主要需要使用的東西是 websocket,接下來我們可以來開始的規畫我們聊天群的架構。

我們先從最簡單的來看,如下圖 :

Business Server : 用來處理用戶需要用到 http 的操作,例如用戶登入、登出、加入聊天群、離開聊天群之類的,並且每當使用者要加入聊天群時,會發送訊息給 message server 然後它會在往 client 端推送說 ~ 某用戶加入聊天群囉 ~

Message Server : 用來發送訊息與接受訊息。

上面的架構很簡單,就是 client 對一個 server,但我們來想想會發生什麼問題 ? 首先假設我們這個系統有多少人,server 就需要建立幾條的 websocket,那我想問,一台 server 可以接受幾個 websocket 連線呢 ?

一台 server 可以有多少連線呢 ?

這個問題說實話很難回答,因為還要考慮 server 性能、程式碼撰寫等,不過我這邊會簡單的根據,來大概的算出可能的連線數。

要理解這個問題,我們需要先理解下面這個知識,

file hanle 的限制

在 unix 中每一個 tcp 連線都要占用一個file descriptor,而它有一定的限制數量,當使用完後,新的 tcp 連線到來就會發生錯誤以下的錯誤訊息 :

Socket/File: Can't open so many files。

那一個 process 我們可以開啟幾個檔案呢 ? 我們可以用以下的指令來看看 :

ulimit -n

像我這台 mac pro 的預設是 :

4864

而如果是AWS EC2那在最基本版本,什麼都沒動過的則為 :

1024

所以白話文的說,如果我沒有修改預設,我在這邊最多同時間,只能連線4864個 tcp 。
它當然可以改預設,不過每一個系統一定都是有上限的,記憶中有些系統的上線大約 1百萬 到2百萬之間,這邊到不確定。

簡單的總結一下,一個 server 可以有多少連線呢 ? 直說我很難回答正確的答案,但我只能說,如果你沒調整預設的 process 限制,那必定只能連這個系統的上限(大概)。

但是每個系統有設定限制,就代表它預設一定有它的理由,所以不確定修改是不是個好方法。

聊天群架構 V-2

上面的架構非常非常的簡單,但它有什麼問題呢 ? 我們想想以下這個場景 :

我們某個聊天群裡總共 4 人,其中 A、B、C 三人目前在線上,而 D 不在線上,那麼 ~
要著麼確保 D 上線時,可以看到其它三人所發送的訊息呢 ?

目前大概的想法為,增加兩個服務,其中第一個為使用 redies 建立的用來保存用戶狀態(上線與未上線)的服務,每當使用者登入或登出時,都會透過business server發送訊息來更新redies狀態。

然後每當有訊息準備要傳送時,message server會先到redies取得聊天群內的用戶資訊,然後如果在線上的,則進行傳送,不在線上的則將知存放到temp message server裡。

然後每當用戶上線時,business server會發個訊息到messge server去,然後去看看temp message server裡有沒有離線訊息,有的話就傳送出去。

聊天群架構 V-3

上面的架構看似沒問題,但是它事實上有個缺點,有沒有注意到,很多的邏輯運算都在message server裡面做,我原本是將它定義成只用來收發訊息,但是我現在很多判斷都在裡面做,雖然在量不大時,還沒什麼問題,但如果量很大的話message server會上天堂的。

所以我決定在增加一個服務,就取名為Logical Server,架構圖如下 :

它主要的工作就是要處理所有要送出與收到的邏輯運算,像我們剛剛上面說到,要更新使用者狀態也改成從這裡運作,而決定那些訊息要存放到離線 server 也是在這處理,而在每一次使用者登入時,要傳送的訊息也都是從這裡處理好,然後在去通知message server和他說,該送貨囉 ~~

聊天群架構 V-4

上面的架構中,我們基本上已經可以處理不少需求了,但我們在思考看看,有沒有那個地方可以改進,我們從一個角度來想,流量,我們仔細想一下那一個地方的流量是最大的 ? 嗯就是message server,假設我們一台message serverhold 不住要著麼辦呢 ?

這時就是針對message server進行擴充,架構會變成如下 :

上面新增加了兩個服務 :

Proxy : 用來決定要使用那台message serverclient進行連線,事實上就是 load balance 的功能。

Dispatch : 用來決定訊息要傳送到那一個message server

我們每一次要建立 websocket 連線時,都會需要先連到proxy由它來決定你要去那個message server建立 websocket 連線。

然後每當有訊息進來後,會到一個名為dispatch的服務,先將該訊息進行包裝,並且也會注明該訊息是從那個 message server 過來的,然後再傳送到 logical server 中,判斷那將該訊息送到那些用戶上,最後在回送到dispatch上,由它來決定要送到那個message server 上,最後再由它回傳給用戶上。

結論

在這篇文章中,我們所討論到的聊天群架構,事實上到 V-4 版本,已經可以處理大致上WEB!聊天群會用的功能,下一篇文章中我們將要來討論如果來設計聊天室

參考資料

 
2 months ago

在前面的幾篇有說到,不同的process間,我們可以使用 IPC 通信來進行溝通,但如果是不同電腦呢 ? 要如何溝通呢 ? 我們這時就可以使用 socket 來進行溝通。

在開始說明 socket 前,我們需要先準備一些基本知識,那就是常聽到的 tcp/ip

TCP/IP 通訊模型

tcp/ip 它是一種網路協定,它定義了點對點如何的傳輸,如何將資料封裝、定址、傳輸、路由以及在目的地如何接受,全部都加以標準化,它基本上可以分為四層應用層傳輸層網路互連層網路介面層,它常被視為簡化的七層 OSI 模型。

圖片來源:鳥哥

在了解 socket 前,我們需要了解應用層傳輸層的基本概念。

應用層

這個層級主要是定義 :

應用程式的溝通協定,也可以理解為不同應用程式如何協同工作。

在這個層級的協定,大部份都會使用到兩個傳輸協定tcpudp,至於何時使用 tcp 或 udp 取決於,該協定是否保證資料完整的傳送到另一端,這邊我們只要記得tcp可靠udp不可靠這兩件事情就夠了。

我們常用的 http 就是屬於這一層協定,smtp 也屬於這層,我們簡單的來說明一下 http 的概念。

HTTP (超文字傳輸協定)

它是一種應用層的傳輸協定,它主要定義了下面的事情 :

它是一個用戶端與伺服器端請求和應答的標準

通常 http 用戶端的發出一個請求,它會建立一個到伺服器端的 TCP 連線 。

傳輸層

這個層級主要是定義 :

定義點到點如何傳輸

其中tcp、udp就是這一層,我們簡單的來說明一下 tcp 的工作,就會知道這個層級主要是做啥事情。

TCP (傳輸控制協定)

它是根據傳輸層的定義,所完成的協定,這個協定宗旨在於 :

提供一個可靠(不會掉資料)的資料流傳送服務

那它用什麼方法來處理可靠的問題呢 ? 答案就是tcp三次對話

我們簡單的用下面例子來說明,假設 A 和 B 兩台電腦要傳輸資料了,這時就開始要準備建立 tcp 連線,但在連線之前需要有三次對話 :

A : 我想發資料給你(B),好嗎 ?

B : 喔好啊,我會發送一個同意連接要求同步(ack1)的給你喔 。

A : 好我收到了,然後我回發了一個同意要求同步(ack2)的給你喔。

經過以上三次對話,A 才能正式的傳送資料給 B。

只要是對可靠性要求的傳輸,都必須使用 tcp 協定。

那 Socket 是啥 ?

在簡單的理解完上面的網路概念後,我們就可以來理解,什麼是 socket 。

socket 是在應用層傳輸層之間的一個抽象層,它是一組接口,隱藏了底層的複雜操作,同時你也可以把他想成一個雙向的 endpoint ,可以給人連線或連線它人,而且由於它有綁定一個特定的 port 所以這也代表傳輸層它們那邊可以用它來定位應用程式。

你可以吧 socket 想成一個兩個節點的節點的溝通機制的概念,然後在由其它語言來實作這個概念。

而他的運作流程圖如下 :

Node 的 Socket 實作

在上面大概理解了 socket 機制後,我們簡單的使用 nodejs 來建立兩個 socket 節點,一個為 server ,另一個則為 client。

Server端

我們 server socket 主要是用來接受 socket client 的資料。

var net = require('net');

var HOST = '127.0.0.1';
var PORT = '61111';


net.createServer(function(sock){
    console.log('Server open !');

    sock.on('data',function(data){
        console.log('I receved data from client :' + data);
    });

    sock.on('close',function(){
        console.log('close');
    });
}).listen(PORT, HOST);

Client端

而我們的 client 端則為每一秒傳輸資料到 server 端去。

var net = require('net');

var HOST = '127.0.0.1';
var PORT = '61111';

var client = net.Socket();
client.connect(PORT, HOST, function(){
    console.log('client connected');

    setInterval(function(){
        client.write('I am Mark');
    },1000)
});

client.on('close', function(){
    console.log('close client');
});

Socket VS Websocket

不一樣的東西,前面有提到,socket 基本上是屬於傳輸層與應用層的抽象層,而 websocket 它基本是屬於應用層的協定了。

ws://example.com/wsapi
wss://secure.example.com/

上面先說結論,我們先簡單的來看一下什麼是 websocket 。

輪詢

在 web 開發時,我們有時後會遇到這樣的需求,就是當 server 端資料有變動時,client 端畫面會變動,很古老的做法是輪詢,也就是說定時的去 server 端問問看有沒有空的資料,但很明顯的,這浪費很多資源,有可能 10 次呼叫,只有一次才真的有新的資料。

Comet

再來出現的是comet它是一種推播技術,也就是 server 那可以更新資訊時傳送到 client 端,它有一種實現方式長時間輪詢 long-polling,它是 client 和 server 進行溝通後,它的連線會先留一段時間,等某段時間沒資料時,再發送請求到 server 端,
但他還是有缺點,那就是當 server 沒有資料時,那個連線還會繼續連接,會造成 server 資源浪費的。

Websocket

最後是websocket,它是 html5 規範發布的新協議,等同於應用層(ex. http),它的基本概念為 server 端與 client 端的建立是持久連接,使用這項協議後, server 端可以主動傳送資料給 client 端,而且它 tcp 握手只要一次,不像 http1.0 每次使用都需要 1 次的 tcp 握手,但 http1.1 時,則可以在一次的連接處理多個請求,但還是比不上 websocket 的一次。

這種東東很適合用來處理聊天室報價系統這類型的應用。

NodeJS 中的 Socket.io

socket.io是一個 nodejs 的套件,它做了什麼事情呢 ? 它將上面提的溝通方式全部的整合在一起,讓我們前端與後端可以處理推播功能,它有支援以下的傳輸方式 :

  • xhr-polling
  • xhr-multipart
  • htmlfile
  • websocket
  • flashsocket
  • jsonp-polling

上面對種類中有polling字樣就是我們上面所說的溝通方式,我們直接拿官網的程式碼來看看使用的方法,下面為 server 端的程式碼。

var io = require('socket.io').listen(8080);

io.sockets.on('connection', function (socket) {
    socket.emit('news', { hello: 'world' });
    socket.on('my other event', function (data) {
        console.log(data);
    });
});

而下面為前端。

<script src="/socket.io/socket.io.js"></script>
<script>
    var socket = io.connect('http://localhost:8080');
    socket.on('news', function (data) {
        console.log(data);
        socket.emit('my other event', { my: 'data' });
    });
</script>

這樣就可以完成推播功能。但這邊有個地方有注意一下,我們都沒有設定要用什麼傳輸方式,那它要用那種傳輸方式呢 ? polling ? websocket ?

socket.io 自動會根據瀏覽器支援到什麼程度來決定使用什麼。

很方便吧 ~ 事實上 socket.io 就等同於 net 開發者都很熟悉的signaR

Network Socket VS Unix Socket

在網路上尋找 socket 文章時,常常會看到 socket 有分為network socketunix socket,那這兩個有什麼不同呢 ?

上面一章節有說到 socket 本身是建立在 tcp/ip 網路層級的應用層與傳輸層中間,所以說它本身一開始是定位於網路溝通使用,也就是我們這邊說的network socket,但後來發現在 socket 的概念上,建立一個 IPC 通信,可以使我們電腦內的溝通更有效率,這時的產生出來的東東,就是我們所謂的unix socket

unix socket不像network socket一樣,它不需要經過網路協定,tcp 連線、握手等步驟,而只是將應用層的資料,從一個 process 傳輸到另一個 process上,它於其它 ipc 機制相比來說,算是目前最常使用的 ipc 機制 (會有另一篇來比較 ipc 機制)。

在 nodejs 中,我們也可以建立 unix socket 程式碼如下,這裡的程式碼基本上與 network socket 相似,不同點在於我們 listen 不是個網址,而是個檔案,而你可以將這個檔案想成為socket 節點

const net = require('net');
const server = net.createServer((c) => {
  // 'connection' listener
  console.log('client connected');
  c.on('end', () => {
    console.log('client disconnected');
  });
  c.write('hello\r\n');
  c.pipe(c);
});
server.on('error', (err) => {
  throw err;
});
server.listen("/tmp/echo.sock", () => {
  console.log('server bound');
});

然後我們就可以使用以下指令,建立一個連線到/tmp/echo.sock的 client 端 socket 。

nc -U /tmp/echo.sock

然後 client 端這邊應該會收到從 server 端的 socket 傳送回來的訊息 :

hello

然後 server 端應該也會輸出以下訊息 :

client connected

參考資料

 
4 months ago

本篇文章中,我們想要知道以下兩件事情 :

  1. 為什麼要使用它呢 ?
  2. 什麼是策略模式呢 ?

為什麼要使用策略模式呢 ?

我們簡單的寫一下,一個多需要用不同方法的登入方法,它可以選擇使用googlefacebook的方法,來進行登入。

var user = {
    login: function (type) {
        if (type == "google") {
            doGoogleLoginSomething();
            console.log("google login process");
        } else if (type == "facebook") {
            doFbLoginSomething();
            console.log("facebook login process");
        } else {
            doSomething();
            console.log("custom login process");
        }
    }
}


user.login("google");

那上面這段程式碼中,有那些缺點呢 ?

首先第一個,它包含了很多的 if else 判斷,這樣反而增加了該函數的邏輯分支。

第二個為該函數缺泛彈性,如果你想增加twitter的登入,那就必須修改這函數的內部實作,這樣違反了開放封閉原則

開放封閉原則 : 白話文就是當你增加新功能時,盡量不修改原有的程式碼。

好處 : 較好維護較好測試可重複使用

所以說,當碰到這種情況時,就可以使用策略模式囉 ~

策略模式簡單的來說,就是為了處理以下的情況 :

當使用者有相同的行為,但不同的場景時,有不同的方法。

例如 : 使用者想要進行登入,但我們可以使用googlefacebook等不同的方法來登入。

在白話文一點就是 : 當你有大量的if elseswitch就可以使用策略模式了。

策略模式是什麼 ?

接下來我們就要使用策略模式來修改上面的程式碼,但在開始前,我們要先知道策略模式是什麼。

簡單的說,它的定義如下 :

定義一系列的演算法,把它們一個個封裝起來,並且可以相互替換。

以我們上面登入的範例來看,login裡面的每一種登入方法就是一種演算法,但他都丟在裡面,所以我們策略模式就是要將他,一個一個封裝起來,並且可以相互的替換。

如下面的程式碼,我們將每一個登入的演算法都封裝起來,然後在需要那個的時後,就使用那個。

var user = {
    login: function (stragtegy) {
        stragtegy();
    }
}


var loginStrategy = {
    fb : function(){
        doFbLoginSomething();
    },
    google : function(){
        doGoogleLoginSomething();
    },
    custom : function(){
        doSomething();
    }
}

user.login(loginStrategy.fb);

首先我們先來看看可維護性,上面的程式碼中,我們假設要新增一個twitter,那你只需要在loginStrategy裡新增一個twitter的策略,這樣就不會動到主題的login函數,以防止,當你修改了login會影響到其它登入方法的問題。

再來來看看可測試性,我們只需要mock你要測試的策略,然後在將他丟到login裡面,進行測試,非常的簡單,如果是沒用策略模式的程式碼,我還真的要動腦想一下著麼寫單元測試了。

結論

上面我們簡單的用登入系統的例子,來說明策略模式,在 node 中,有一個叫passport js的東西,就是使用策略模式,來實作登入系統的實例,筆上之前就有寫過一篇關於 passport 的文章,可參考參考。

Passport.js 之 Hello 你好嗎 ~

參考資料

 
4 months ago

本篇文章中,我們想要知道以下的重點 :

  1. passport 是啥鬼 ?
  2. 要如何使用它呢 ?
  3. 要如何使用一個 passport 的登入系統呢 ?

passport 是啥 ?

passport.js是 node 中的一段登入驗證中間層(middleware),也就是說可以讓你簡單的使用 google 登入使用 fb 登入,它的架構就是所謂的策略模式,接下來我們來實際上看看他是如何使用的。

passort.js 活著的目的就是為了驗證 request

要使用 passport 來進行驗證,需要設定三個東西 :

  • 驗證策略 (Authentication strategies)
  • 應用程式的中間件 (Application middleware)
  • Sessions (可選擇)

驗證策略的建立

上面我們有提到 passport 本身就是使用策略模式的實作,而它的定義就是 :

定義一系列的演算法,把它們一個個封裝起來,並且可以相互替換。

所以在這邊,我們需要定義驗證的策略(演算法),例如使用 facebook 登入驗證、google 登入驗證或自訂的驗證策略。

而我們這裡直接看官網的自訂驗證策略localStrategy,下面的程式碼中,我們會定義一個localStrategy,它準備用來驗證我們的request

LocalStrategy的兩個參數為optionsverify,我們option需要先定義要用來驗證的欄位usernamepassowrd,然後verify就是驗證規則,就是下面那個function裡面的東東。

var users = {
    zack: {
        username: 'zack',
        password: '1234',
        id: 1,
    },
    node: {
        username: 'node',
        password: '5678',
        id: 2,
    },
}

// LacalStrategy(options,verify)
var localStrategy = new LocalStrategy({
    usernameField: 'username',
    passwordField: 'password',
},
    function (username, password, done) {
        user = users[username];

        if (user == null) {
            return done(null, false, { message: 'Invalid user' });
        };

        if (user.password !== password) {
            return done(null, false, { message: 'Invalid password' });
        };

        done(null, user);
    }
)

那這邊有個問題 ~ 那就是奇怪,為什麼他沒有驗證 username 或 password 這兩個欄位是否合法呢 ?

因為LocalStrategy已經幫我們處理好了,我們直接來看一下它的原始碼 :

Strategy.prototype.authenticate = function(req, options) {
  options = options || {};
  var username = lookup(req.body, this._usernameField) || lookup(req.query, this._usernameField);
  var password = lookup(req.body, this._passwordField) || lookup(req.query, this._passwordField);
  
  if (!username || !password) {
    return this.fail({ message: options.badRequestMessage || 'Missing credentials' }, 400);
  }
  
  var self = this;
  
  function verified(err, user, info) {
    if (err) { return self.error(err); }
    if (!user) { return self.fail(info); }
    self.success(user, info);
  }
  
  try {
    if (self._passReqToCallback) {
      this._verify(req, username, password, verified);
    } else {
      this._verify(username, password, verified);
    }
  } catch (ex) {
    return self.error(ex);
  }
};

上面這一段是當我們執行了下面這段時程式碼時,就會執行的東東,從上面程式碼中我們可以知道,它會先去req.body中,尋找我們定義的兩個欄位usernamepassowrd,然後檢查看看他是否合法,當一切都 ok 時,我們就會執行上面有提到的verify函數,來進行驗證。

Passport.authenticate('local', { session: false })

中間件的設定

接下來我們將要在route中,增加 passport 這個中間件 (middleware),我們這邊選擇使用 express 來當我們的 web framework。

我們在使用時需要先選擇我們要使用的策略,我們直接用上面所建立的localStrategy,如果你有建立其它的例如facebookgoogle的策略也都可以使用。

// 註冊策略
Passport.use('local', localStrategy);

var app = Express();
app.use(BodyParser.urlencoded({ extended: false }));
app.use(BodyParser.json());
app.use(Passport.initialize());

其中Passport.use('local', localStrategy);這行就是將我們剛剛建立的策略註冊到 passport 中,我們直接看他的原始碼,會更了解它在做啥 :

嗯他非常的簡單,就是將我們的註冊的策略丟到一個物件中。

Authenticator.prototype.use = function(name, strategy) {
  if (!strategy) {
    strategy = name;
    name = strategy.name;
  }
  if (!name) { throw new Error('Authentication strategies must have a name'); }
  
  this._strategies[name] = strategy;
  return this;
};

然後呢我們就要在 route 上加 passport 中間件,這樣的話,我們每一個進來到這個 route 的 request 都會被 passport 我們指定的策略進行驗證。

app.post(
    '/login',
    Passport.authenticate('local', { session: false }),
    function (req, res) {
        res.send('User ID ' + req.user.id.toString());
    }
);

Session的設定

我們在驗證完畢後應該是會取得到某個使用者,像我們上面範例中的這行 :

user = users[username];

當然,這只是範例,正常情況下應該是去 db 或其它地方取得使用者,但我們這裡就一切從簡。

接下來我們兩個問題 ~

session 和 cookie 是啥 ?

雖然我還算理解,但是這邊還是簡單的說一下。

首先這兩個都是個儲放機制。

再來 session 是只能在 server 進行維持,每當 client 在連接 server 時,會由 server 產生成一個唯一的 sessionId,並用它來連接 server 內的 session 存放空間,而通常來說 sessionId,也同時會保存在客戶端的 cookie 中,每次 client 在訪問 server 時都會用它來存取 session 資料。

而 cookie 則是在客戶端的儲放機制,它是由瀏覽器來維持,但注意,它可以在 client 端與 server 端進行修改,為什麼會有 cookie 呢 ? 主要就是因為 http 是無狀態的協議,每一次讀取頁面時,都是獨立的狀態,所以就需要使用 cookie 來連結前後文。

對了 cookie 還有一點要記,那就是每一次的請求,cookie 都會一起被發送到 server 端喔。

我們懂了以下的基本知道後,再來問個問題。

我們每一次登入,就要取資料庫驗證和取得一次使用者嗎 ????

答案是否定的。

正常不太會這樣處理,假設我們是用 fb 登入,那不就變成,每一次使用者到這頁面時,畫面都會掉到 fb 要你登入,然後在跳回來原本頁面,這樣太浪費時間了。

所以說,passport 它會做以下兩件事情 :

  • 將使用者資訊存放在 server 的 session 中。
  • 然後會在使用者的瀏覽器設定 cookie。

那我們在 passport 要如何使用呢 ? 首先關於第一點,passport 提供了serializeUser讓我們將使用者資訊存放置 server 的 session中。

passport.serializeUser(function(user, done) {
  done(null, user.id);
});

然後關於第二點,每一次進行請求時,passport 都會將傳進來的 cookie 中某個存放該session資訊的欄位,取得到我們剛剛存的user.id,然後在使用它,來取得完整的user資訊,並將它存放到req.user之中。

passport.deserializeUser(function(id, done) {
  User.findById(id, function(err, user) {
    done(err, user);
  });
});

實作一個登入系統

我們這邊來實作個登入系統,使用者只要有登入後,接下來的 route 都可以從 session 中取得到該名使用者的資訊。

所有的程式碼

1. app.js 基本的註冊

var Passport = require('passport');
var LocalStrategy = require('passport-local').Strategy;
var Express = require('express');
var BodyParser = require('body-parser');
var session = require('express-session');
var cookieParser = require('cookie-parser');

var app = Express();
app.use(BodyParser.urlencoded({ extended: false }));
app.use(BodyParser.json());
app.use(cookieParser());

app.use(session({
    secret: "test",
    resave: false,
    saveUninitialized: false,
}))
app.use(Passport.initialize());
app.use(Passport.session()); // 一定要在 initialize 之後

2. 驗證策略

var users = {
    mark: {
        username: 'mark',
        password: '1234',
        id: 1,
    },
    node: {
        username: 'node',
        password: '5678',
        id: 2,
    },
}

var localStrategy = new LocalStrategy({
    usernameField: 'username',
    passwordField: 'password',
},
    function (username, password, done) {
        user = users[username];

        if (user == null) {
            return done(null, false, { message: 'Invalid user' });
        };

        if (user.password !== password) {
            return done(null, false, { message: 'Invalid password' });
        };

        done(null, user);
    }
)

Passport.use('local', localStrategy);

3. 建立登入的route

下面為我們登入的 route 建立。

app.post(
    '/login',
    Passport.authenticate('local',{session: true}),
    function (req, res) {
        res.header("Access-Control-Allow-Origin", "*");
        res.header("Access-Control-Allow-Headers", "X-Requested-With");
        res.send('User ID ' + req.user.id.toString());
    }
);

那 passport 是那裡置至 session 和 cookie 呢 ?

答案是在這裡 :

passport.serializeUser(function(user, done) {
  done(null, user.id);
});

然後我們來呼叫這個 route,然後你到 chrome 的 application 看,你會發現,他有存放個 cookie 。

4. 建立取得使用者的 route

然後接下來,我們執行http://127.0.0.1:3000/getInfo後,這段程式碼app.use(Passport.session());就會將我們從前端傳回來的 cookie,進行分析,並和 session 進行比對,然後就會將使用者資料存放到req.user裡囉

app.get('/getInfo',function(req,res){
    const user = req.user;
    res.send(user);
})

注意點 : deserializeUser 無法被呼叫到

有一點要注意一下,如果你發現你的deserializeUser老是無法被呼叫到,那問題是在下面這段 :

app.use(session({
    secret: "test",
    resave: false,
    saveUninitialized: false,
}))

有些人會寫成下面這樣,cookie: { secure: true }這個參數需要配合https才能使用。

你的好朋友 stackoverflow

app.use(session({
    secret: 'goodjob secret',
    resave: false, // don't save session if unmodified
    saveUninitialized: false,
    cookie: { secure: true },
}));

參考資料

 
4 months ago

上一章節中我們有提到rabbitmq,它是用來建立中介式架構broker,但這種架構有什麼問題呢 ? 那就是分散式架構的頭號公敵單點失效(single point of failure)。

所以後來就有人提出使用對等式架構來解決這個問題,這個架構就是會將broker給移除掉,每一個用戶端同時也是伺服器端,像比特幣這種應用就是用該結構來處理。

但相對的,它也有缺點,那就是要建置起來較為複雜,用在大規模的網路上,管理較難、安全性較低。

使用 ZEROMQ 進行對等式架構 (peer-to-peer)實作

zeromq它是一套網路通訊函式庫,記得他不是一個伺服器,而是一個lib,它在socket api之上做了一層封裝,將網路、通訊、進程等抽象化為統一的 API 接口,它和 socket 的區別是 :

  • socket : 點對對的關係(一對一)
  • zeromq : 多點對多點的關係(多對多)

那 zeromq 有什麼特點呢 ? 它有以下四個特點 :

  • 去中心化 (無 broker)
  • 強調訊息收發模式
  • 以統一的接口支持多種底層通信方式
  • 異步
  • 速度飛快 (請參考這篇比較)

不過有一點要注意一下,zeromq 它不是一個獨立的伺服器進程 (rabbitmq 是),所以嚴格來說它不是 mq ,它沒有 mq 這種在中間解耦合的能力,事實上他的名命也說了 zero mq 。

zeromq 主要提供三種類型的通訊模式分別如下 :

REQ (request) / REP (reply) 模式

這模式就是傳統的一個 reuest 配一個 response 的模式,非常的簡單。

下面這段程式碼是發送請求(request)的程式碼。

var zmq =  require('zeromq');
var requester = zmq.socket('req');

requester.on('message', function (reply) {
    console.log(`Received reply: ${reply.toString()}`)
})

console.log('Send msg');
requester.send('Hello Mark');


requester.connect('tcp://localhost:5555');

process.on('SIGINT', function () {
    requester.close();
})

然後下面這段程式碼為收到請求後,進行回傳的程式碼,這也可以理解成一個 server,它會一直等待 request 的 loop,然後針對每次的請求都進行回覆(Reply)。

var zmq = require('zeromq');

var responder = zmq.socket('rep');

responder.on('message', function (request) {
    console.log(`Received request : ${request.toString()}`);

    setTimeout(function () {
       responder.send("Ok ~ I Received your msg"); 
    },1000);
})

responder.bind('tcp://127.0.0.1:5555', function (err) {
    if(err){
        console.log(err);
    }else{
        console.log('Listening on 5555');
    }
})

process.on('SIGINT', function () {
    responder.close();
    
})

不過這邊有二點要注意,當你將 server 進行重啟時,client 不會自動的重新連上 server ,如果想要建立一個高可靠性的 server 請參考官網該篇文章,它說明的很詳細囉 ~

reliable-request-reply

而另外一點就是,不論先開啟 client 或 server 都沒關係,在傳統觀念上 server 就是要先開,然後 client 才連的上,但在這裡,它們的關係是節點對節點,也就是說沒有主或從的關係,只有誰發誰送的問題。

Pub / Sub 模式

它基本上是一種很常見的設計模式,像我們在使用jquery時的事件機制就很常看到它,如下 :

$(".test").on('click', function(){
    /do something...
})

上面的程式碼中,當頁面獨發了click事件後,就會發佈(pub)一個訊息,給有訂閱(sub)的使用者說,我獨發了 click 了喔,然後使用者在來處理獨發後的事情。

這種模式的優點就在於解耦合,發佈者無須預先知道訊息的接受者,則也使得這種模式很適合用在變化多端的分佈式架構中。

我們簡單的用一句話來說明 zeromq 的 pub/sub 模式,就是下面這句 :

當訊息透過 pub socket 傳送後,便會擴播至所有已連線的 sub socket

這種類型的模式,很適合用來處理股價報價,每個 subscriber 都會去和 publisher 訂閱事件,當有新個報價時,就會通知所有有訂閱報價的 subscriber。

接下來我們來開使實作程式碼。

首先我們下面這段程式碼是用來建立 zeromq 的publisher,也就是會將訊息從這邊發送出去給已連線的subscriber

// pub.js

var zmq = require('zeromq');
var pubSocket = zmq.socket('pub');

pubSocket.bindSync('tcp://127.0.0.1:3000');
console.log('Publisher bound to port 3000');

setInterval(function(){
    pubSocket.send(['mar',new Date()]);
},1000);

而下面這段程式碼就是subscriber,它用來訂閱訊息來源,然後會使用on這監聽器,來收得 pub 過來的訊息。

// sub.js
var zmq = require('zeromq');

var subSocket = zmq.socket('sub');
var port = "3000";

subSocket.connect(`tcp://127.0.0.1:${port}`);
subSocket.subscribe('mark');
console.log(`Subscriber connected to port ${port}`);

subSocket.on('message', function(topic, message){
    console.log(topic.toString());
    console.log(message.toString());
})

我們可以看你心情來決定要先開啟publishersubscriber,zeromq 它有提供一個機制,他會自動重新連線,也就是說,當然二個都開啟後,如果將publisher關掉在重啟,你的subscriber還是可以繼續收到資料。

然後我們來執行程式碼看看。我們會開啟一個publisher和二個subscriber

node pub.js
node sub.js
node sub.js

然後我們應該是會看到如下的結果,兩個subscriber每隔十秒鐘會收到一次從publisher來的資料。

mark
Thu Jul 20 2017 17:18:58 GMT+0800 (CST)

mark
Thu Jul 20 2017 17:18:59 GMT+0800 (CST)

Push / Pull

這種模式又被稱為管道(pipe)模式,它是單向的,從 push 單向推送到 pull 端,這種模式和上面的pub/sub最模式最大的差別在於 :

push 傳送的一堆資料,會被平均分散至多個 pull 端,就像是 load balance的機制一樣。

以下的程式碼為 pull 端的建立。

// pull.js 

var zmq = require('zeromq');
var pullSocket = zmq.socket('pull');

pullSocket.connect('tcp://127.0.0.1:3000');
console.log('Worker connected to port 3000');

pullSocket.on('message',function(msg){
    console.log(msg.toString());
})

而下面的程式碼為 push 端的建立。

// push.js
var zmq = require('zeromq');
var sockPush = zmq.socket('push');

sockPush.bindSync('tcp://127.0.0.1:3000');
console.log('Producer bound to port 3000');

var i =0;
setInterval(function(){
    sockPush.send(`mark wake up ~ : ${i}`);
    i++;
},1000);

然我們開始執行。

node push.js
node pull.js
node pull.js

這時你會看到下面的結果顯示出,每一個 push 出去的資料都會平分給另外兩個 pull 端。

mark wake up ~ : 10
mark wake up ~ : 11
mark wake up ~ : 12
mark wake up ~ : 14
mark wake up ~ : 16
mark wake up ~ : 18
mark wake up ~ : 20
mark wake up ~ : 22
mark wake up ~ : 24
mark wake up ~ : 13
mark wake up ~ : 15
mark wake up ~ : 17
mark wake up ~ : 19
mark wake up ~ : 21
mark wake up ~ : 23

這種模式事實上很像我們之前所談到的負載平衡,他們的概念的確是一樣的沒錯,這種模式也代表我們可以將一個複雜的任務平均分配下去,當各 pull 端完成時,在全部一起收集起來使用。

接下來我們再來建置一個分散式的雜湊碼破解器 ~

建立一個分散式的雜湊碼破解器

這個應用主要是可以根據一組字母表做出各種排列組合,藉此對輸入的雜湊碼(MD5、SHA1等)來進行破解,這個架構就是一個典型的平行管線。

這個爆力破解的過程如下

首先我們會先建立一個push端,他們將我們指定的字串,進行各種排列組合,例如abc,會產生abcacbbac等……,然後使用串流來讀取出來,並且 push 到每一個 pull端。

我們下面的程式碼中alphabet代表這我們要進行的排序組合,然後不可能英文 26 個字母全部排列,會出人命的,所以我們會用maxLength來進行限制,我們該值為 4 的意思代表只從 26 個字母內選取出 4 個字來進行排列組合。

也因為上面 4 個字的限制,我們測試時輸入的單字要只有 4 個字母。

//ventilator.js
var zmq = require('zeromq');
var variationsStream = require('variations-stream');
var alphabet = 'abcdefghijklmnopqrstuvwxyz';
var batchSize = 10000;
var maxLength = 4;
var searchHash = process.argv[3];

var ventilator = zmq.socket('push');
ventilator.bindSync('tcp://127.0.0.1:5000');

var batch = [];
variationsStream(alphabet, maxLength)
    .on('data', function (combination) {
        console.log(combination);
        batch.push(combination);
        if (batch.length === batchSize) {
            var msg = {
                searchHash: searchHash,
                variations: batch,
            }
            
            ventilator.send(JSON.stringify(msg));
            batch = [];
        }
    }).on('end', function () {
        var msg = {
            searchHash: searchHash,
            variations: batch,
        }
        ventilator.send(JSON.stringify(msg));
    });

接下來在 pull 端收到從 push 端來的字串後,我們會將該字串轉換成sha1 hash碼,然後我們在將該碼與輸入碼(我們要破解的碼)進行比對,最後當比對到時相同的東西時,我們就會將結果 push 到另一個收集結果的 pull 端 (就是toSink所連結的地方)

// worker.js
var zmq = require('zeromq');
var crypto = require('crypto');
var fromVentilator = zmq.socket('pull');
var toSink = zmq.socket('push');

fromVentilator.connect('tcp://127.0.0.1:5000');
toSink.connect('tcp://127.0.0.1:5001');
console.log('Worker connect to 5001');

fromVentilator.on('message',function (buffer) {
    var msg = JSON.parse(buffer);
    var variations = msg.variations;
    variations.forEach(function(word) {
        console.log(`Processing: ${word}`);            
        var shasum = crypto.createHash('sha1');
        shasum.update(word);
        var digest = shasum.digest('hex');
        if(digest === msg.searchHash){
            console.log(`Found! => ${word}`);
            toSink.send(`Found! ${digest} => ${word}`);
        }
    });
})

其中下面這段,是指將我們從 26 個字母中產生任選出 4 個所產生出的排列組合的單字,進行sha1 hash加密,產生出 hash 碼。

    var shasum = crypto.createHash('sha1');
    shasum.update(word);
    var digest = shasum.digest('hex');

最後,當我們從 worker 那收到破解後的結果,就進行輸入。

var zmq = require('zeromq');
var sink = zmq.socket('pull');
sink.bindSync('tcp://127.0.0.1:5001');

sink.on('message',function (buffer) {
    console.log(`Message from worker: ${buffer.toString()}`);
})

我們來執行看看,我們要先開啟兩個 worker 和一個 sink。

node worker.js
node worker.js
node sink.js

然後我們在開啟ventilator.js,用來開始啟產生單字的排列組合,其中f1b5a91d4d6ad523f2610114591c007e75d15084是指marksha1 hash碼。

node ventilator.js f1b5a91d4d6ad523f2610114591c007e75d15084

然後當破解完,你可以看到 sink 那的輸出結果。

Message from worker: Found! f1b5a91d4d6ad523f2610114591c007e75d15084 => mark

結論

本篇文章中,我們說明了如何使用zeromq進行對等式架構建置,並且還了它的三種模式 :

  • REQ / REP
  • PUB / SUB
  • PUSH / PULL

這三種模式是 zeromq 中的基本,它們還有更多的變化類型,但都只是這三個的組合型,如果想了解更多,官網的資料已經夠多囉,請慢慢自已研究吧 ~

參考資料

 
4 months ago

在前幾篇文章中,我們說明了如何將系統進行擴展,而接下來呢,我們將要說明如何使用訊息佇列來進行整合,事實上之前的每篇文章中都要提到一個名稱IPC通信,其中裡面就包含了訊息佇列 (message queue)

訊息佇列基本上是用來行程間溝通或是同行程內不同執行序溝通,他提供了異步的溝通協定,這個意思就是指當你傳送一堆訊息給 A 時,A 可以不用即時的來處理這些訊息,這也代表這訊息可堆積再處理,白話文就是 :

訊息接受者如果爆了,我訊息發送者還是可以一直發送訊息,等你好了,你還是可以取得完整的訊息。

我們可以想想http協定他是一個同步協定,這也代表你傳送一個request必須等待伺服器發送response

至於我們為什麼要用message queue請參考下面這篇文章,他真的已經寫的很完整了。

使用訊息佇列的十個理由---簡中

然後我們先簡單的說明一下訊息系統的基礎。

訊息系統架構

基本上分為以下兩種 :

對等式 (peer-to-peer)

在對等式的架構下,每一個節點都直接將訊息傳送給接受者,這種方法基本上會比較複雜,因為他還要決定各自結點的的通訊協定,但還是有一些優點 :

  • 避免單點故障。
  • 和中介者模式比較來少了中間一層,速度應該是比較快。
  • 彈性較高。

以下為對等式架構的圖示 :

zeroMQ 他可以幫助我們建立對等式架構

中介者模式 (message broker)

中介者模式就是所有的節點,都會連結到某個broker,一切都由broke來處理,每個節點不需要知道,我和誰溝通,只需要知道要傳送的訊息內容即可。但缺點就是上面對等式的優點。

以下為中介者架構的圖示 :

RabbitMQ 就是專門用來建立這個架構的東東。

接下來的文章中,我們將要先來實作一些rabbitmq

RabbitMQ

在上面的章節中,我們應該有說到,分佈式架構除了對等式架構外,還有一個是中介者架構,中介者的主要作用就是讓訊息接受者與傳送者之間完全的解偶,而rabbitmq就是一個支援AMQP (Advanced Message Queuing Protocol)協議的中間介者。

如下圖所示,它就是中間綠綠那個,我們稱他為中介者 broker

AMQP是什麼 ?

它是一種協議,AMQP 是一個提供統一訊息服務的應用層標準協議(osi第七層),也就是設定於其它應用軟體之間的通訊,像 http、https、ftp 等都是應用層協議。

根據該協議,客戶端與訊息中間件(broker)可傳送訊息,不受客戶端/中間件不同產品,不同開發語言的條件限制。

它有三個總要概念

佇列 (queue) :

這東東它是儲存訊息的架構,然後裡面的訊息它會被客戶端拿走。一個佇列可能會推多個客戶端取走訊息,這時處理的方式和我們之前說的負載平衡差不多。

佇列它還有以下三種特性 :

  • 可延續性 : 意即若 broker 重新啟動時,則佇列也自動重新建立。那裡面的訊息著麼辦呢 ? 只有被示為需保訊的訊息,才會存入磁碟,並於重啟時還原。
  • 專用性 : 一個佇列可綁定特定的訂閱著,若彼此連線關閉時,則該佇列會說掰掰。
  • 自動刪除 : 當有沒任何訂閱者連線時,便刪除佇列。
交換器 ( exchange ) :

他主要的功能是將訊息傳輸送到一個或多個佇列,這個東西就是放在producerqueue之間,架構大概長的如下圖 :

這邊我們會有一個問題,那就是為什麼需要這個 exchange 而不是直接producerquenu間溝通就好呢 ? 這點我們後面在來說明。

綁定器 ( bind ) :

這個就是上面的交換器與佇列之間的連結

以上的東西都會被中介者管理,它會建立一個抽象的通道,用於維護與中介者之間的通訊狀態。

使用 rabbitmq (mac)

首先我們要先安裝 rabbitmq :

brew update 
brew install rabbitmq

然後安裝好後,我們需要去系統的.zsrch.bashrc設置路徑,這樣我們才能在 terminal 上執行他的指令。

PATH=$PATH:/usr/local/sbin

然後我們就可以執行下列指令開啟rabbitmq server :

rabbitmq-server

如果看到下列的出輸結果,則代表開啟成功。

              RabbitMQ 3.6.9. Copyright (C) 2007-2016 Pivotal Software, Inc.
  ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/
  ##  ##
  ##########  Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log
  ######  ##        /usr/local/var/log/rabbitmq/rabbit@localhost-sasl.log
  ##########
              Starting broker...
 completed with 10 plugins.

如果要關啟請執行下列指令,而不要使用alt + c

rabbitmqctl stop

接下來我們就安裝 node 的 rabbitmq 的 lib ampqplib

npm install --save amqplib

使用rabiitmq實作一個 Producer => Queue 的架構

接下來我們從最簡單的使用開始,我們會簡單的實作如下圖的架構,共有三個實體producerqueueconsume,其中queue現階段就是我們的rabbitmq server

首先我們會先建立一個producer來負責產生訊息,從下面的程式碼我們可以知道,它需要先連線到我們的rabbitmq server,並且建立一個通道,然後,我們可以指定要將資料送到特定的queue,而我們 queue 取名為mark

其中,下面的程式碼中,比較需要注意的下面這行,這行是會判斷該 queue 是否存在,如果不存在則建立,並且durable : false代表的意思為當 queue 重開裡面的資料會消失

ch.assertQueue(quenu_name, {durable: false});
var amqp = require('amqplib/callback_api');

amqp.connect('amqp://127.0.0.1', function (err, conn) {
    console.log("connect amqp server !");

    conn.createChannel(function (err, ch) {
        var quenu_name = "mark";

        ch.assertQueue(quenu_name, {durable: false});

        ch.sendToQueue(quenu_name, new Buffer('Hello Mark'));
        console.log('Send a message');
    })
})

再來我們來實作consumer來當作我們訊息的接受者,一樣也是需要去連線到radditmq server和通道,然後會監聽這個通道所傳下來的訊息。

其中我們的 ch.consume有個參數是{noAck: true},代表這如果該channel不會使用act來進行確認,而如果是 false 它代表的意思為,如果中介者未收到 ack (確認),則訊息就會被保留在佇列裡以待再次處理。

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://127.0.0.1', function (err, conn) {
    conn.createChannel(function (err, ch) {
        var queue_name = "mark";

        ch.assertQueue(queue_name, { durable: false });
        console.log("Waitting the meesages");
        ch.consume(queue_name, function (msg) {
            console.log(`Received the msg : ${msg.content.toString()}`);

        }, {noAck: true});
    })
})

最後我們來執行兩次producer,然後再打開consumer來接受資料,你會發現,他還是接受的得,而且二次傳的資料都有收到,這就是我們前面有提供的message quenue的優點 :

consumer 如果爆了,我 producer 還是一直可以發送資料,等你好了,你還是可以取得完整的資料。

這邊我們提出一個問題。

那 quenue 應該有最大值吧,不然一直沒有訊息流出,遲早會爆的吧 ?

嗯沒錯,所以radditmq它有提供兩個方法來設定每個queue的最大值 :

  • 使用 policy 來進行設定。
  • 使用參數來設定。

我們簡單的使用參數來限定,下面指令設定 quenue 最大為10 byte

ch.assertQueue(queue_name, { durable: false, maxLength: 10  });

那如果 queue 超過10 byte後會如何呢 ?

我們實際執行看看就知道,首先每 0.5 秒傳送一個數字給 queue 結果如下:

Send a message:1
Send a message:2
Send a message:3
Send a message:4
Send a message:5
Send a message:6
Send a message:7
Send a message:8
Send a message:9
Send a message:10
Send a message:11
Send a message:12
Send a message:13
Send a message:14
Send a message:15
Send a message:16
Send a message:17
Send a message:18
Send a message:19
Send a message:20

然後我們 consumer 接收的結果如下 :

Received the msg : 11
Received the msg : 12
Received the msg : 13
Received the msg : 13
Received the msg : 14
Received the msg : 15
Received the msg : 16
Received the msg : 17
Received the msg : 18
Received the msg : 19
Received the msg : 20

從上面的結果可知,因為每個單字為 1 byte,所以當 queue 儲放了前十個數字後,就超過 10 byte了,然後它將先進來的就先把他刪除,所以結果才能如上面所示,只收到 11 以後的數字。

使用rabiitmq實作一個 Producer => Exchange => Queue 的架構

實際上在 rabbitmq 中,我們的 producer 完全不會直接傳送訊息給 queue,producer 完全不知道會傳給那個 queue ,而是透過 exchange 來進行處理,而這邊就準備回答我們上面問的一個問題。

為什麼要有 exchange 呢 ?

我在 quora 發現也有人有相同的問題 (Why does RabbitMQ have both exchanges and queues?)。

然後我發現有一個老兄的說明很傳神,我簡單的說明一下。

想像一下你在 app store ,當你進入店內後,有一位服務員問你說 "你需要什麼呢 ? " ,然我回答說 "我要尋找耳機",然後該名服務員就將你引導到處理耳機的 queue ,接下來下一客人進來服務員就問 "你需要什麼呢 ?" ,然後該名客人回答 "我要修理我的電腦",然後服務員就將它引導到處理修改電腦的 queue 去處理。

所以根據那位老兄的說明,你可以發現服務員就是我們的Exchange,如果沒有了它,那就代表這每一位進來店裡的老兄,只能自已去尋找所需要的 queue ,而且還可能找錯,那是不是很浪費時間呢 ?

在理解了exchange功用後,我們就開始來它的實作。

我們先來畫張圖,你應該會更容易理解,我們等一下做的東西,其中我們先實作 error 那條線,其它的線事實上大同小異。

首先我們來建立producer,下面的程式碼中,可以看到我們會建立一個exchange而不在是queue,並且我們發送訊息的對像也改為exchange

還有我們在發送訊息時,有指定routing_key,這也代表我們有指定發送到的綁定 routing key 為error的 queue 。

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://127.0.0.1', function (err, conn) {
    console.log("connect amqp server !");

    conn.createChannel(function (err, ch) {
        var exchange_name = "logs";
        var msg = "Hello mark";
        var routing_key = "error";

        ch.assertExchange(exchange_name, 'direct', { durable:false });
        ch.publish(exchange_name, routing_key, new Buffer(msg));
        console.log(`send msg: ${msg}`);
    })
})

其中下面這段程式碼中的direct代表這個 exchange 的種類,我們必須給定這個 exchange 的類型,用來決定要如何將訊息從 exchange 發送出。

ch.assertExchange(exchange_name, 'direct', { durable:false });

它主要有四種類型 :

  • direct : 會需要設定一個叫routing key的參數綁定 queue,然後在發送訊息時指定routing key,exchange 就會將該訊息傳送到指定的 queue (上面範例所使用的)。
  • topic : 大至上和 direct 相同,但這裡的routing key可以用匹配的方式。
  • fanout : 此類型的路由規則最簡單,就是收到訊息後,將它發送到所有綁定的佇列中。
  • header : 它的路由規則是由header來判斷,如果要做一些複雜的路由規則,那就用他。

接下來我們來看看consumer的程式碼,如下,我們在綁定 queue 時有指定 routing_key ,就代表這個 consume 只能收到指定的 queue 的訊息。

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://127.0.0.1', function (err, conn) {
    conn.createChannel(function (err, ch) {
        var exchange_name = "logs";
        ch.assertExchange(exchange_name, 'direct', { durable: false });

        ch.assertQueue('', { exclusive: true }, function (err, q) {
            console.log('Waiting for messages');

            var routing_key = 'error';
            ch.bindQueue(q.queue, exchange_name, routing_key);
            ch.consume(q.queue, function (msg) {
                console.log(`Received msg:${msg.content.toString()}`);
                console.log(`routing key is:${msg.fields.routingKey.toString()}`);
            }, { noAck: true });
        })
    })
})

然後我們就來執行看看,結果如下 :

// producer
connect amqp server !
send msg: Hello mark

// consumer
Waiting for messages
Received msg:Hello mark
routing key is:error

結論

本篇文章中,我們先簡單的說明了message queue的概念,它是我們跨進程與執行緒的溝通工具之一,並且它是屬於異步的架構。

接下來我們也說明了訊息系統架構,它基本上可分兩類,一種為對等式架構,而另一種為中介者架構,前者可以使用zeromq來建立,而後者可使用rabbitmq來建立。

最後呢,我們就開始使用rabbitmq來進行實作,本文中只簡單的介紹一些基本的使用方式。

參考資料