12 days ago

Container 是什麼 ?

Laravel Container 是什麼呢 ? 我們先來理解 Container 容器 是什麼。

容器抽象一點概念是指用來裝東西的載體,向菜籃也算個容器,而在 Laravel 中所代表的意思就是指 :

裡面裝了一堆可以用的服務載體,就叫 Container。

像我們每當要執行 Laravel 時,都會先執行下面這段程式碼,其中 $app 就是我們的 Container,然後接下來會使用 Container 來實體化一些物件,例如 $kernel。

<?php
public/index.php

$app = require_once __DIR__.'/../bootstrap/app.php';

/*
|--------------------------------------------------------------------------
| Run The Application
|--------------------------------------------------------------------------
|
| Once we have the application, we can handle the incoming request
| through the kernel, and send the associated response back to
| the client's browser allowing them to enjoy the creative
| and wonderful application we have prepared for them.
|
*/

$kernel = $app->make(Illuminate\Contracts\Http\Kernel::class);

$response = $kernel->handle(
    $request = Illuminate\Http\Request::capture()
);

$response->send();

$kernel->terminate($request, $response);

為什麼要使用 Container ?

上面我們理解 Container 是做什麼用以後,接下來我們要來想想一件事情。

為什麼 Laravel 要使用 Container 呢,為什麼上面的要實體化 $knernel 時,不使用 new Knernel() 這種實體化的方式呢 ?

因為它想解決依賴與耦合。

這就是 Conainter 想解決的事情。

(高)依賴與耦合

高依賴與耦合 : 程式碼中綁死了某個模組,如下面程式碼綁死了 Log Service。

假設有一段程式碼如下 :

<?php

class Log
{
    public function send(log): void
    {
      $awsLogService = new AWSLogService();
      $awsLogService->send(log);
    }  
}

class AWSLogService
{
    public function send(log): void
    {
       ....
    }
}

但假設今天我們要將 Log 改傳到 GCP ( Google 雲端 ),那我們程式碼要修改成如下 :

<?php

class Log
{
    public function send(log): void
    {
      //$awsLogService = new AWSLogService();
      //$awsLogService->send(log);
      
      $gcpLogService = new GCPLogService();
      $gcpLogService->send(log);
    }  
}

class GCPLogService
{
    public function send(log): void
    {
       ....
    }
}

// 使用

$log = new Log();
$log->send('log.....');

從上面程式碼中,我們可以注意到我們沒當要換個服務時,都需要修改程式碼,並且這裡還有一個缺點,你要如何做單元測試 ? 程式碼裡面完全的綁死了 AWSLogService 或是 GCPLogService,沒有地方可以給我們進行替換,沒辦法替換就代表我們在做測試時,只能真的將資料丟到 AWS 或 GCP。

(低) 依賴與耦合

然後由於有上面說的缺點,因此會將程式碼改成如下。基本上就是將 LogService 改成由使用這個物件時來決定是用選擇 AWS 還是 GCP,並且這兩個 service 都實作同一個 ILogService 的 interface。

<?php

class Log
{
    private ILogService $logService;
  
    public function __construct(ILogService $logService)
    {
      $this->logService = $logService;
    }

    public function send(log): void
    {
      $this->logService->send(log);
    }  
}

class GCPLogService implements ILogService
{
    public function send(log): void
    {
       ....
    }
}

class AWSLogService implements ILogService
{
    public function send(log): void
    {
       ....
    }
}

interface ILogService 
{
    public function send();
}

// 使用
$log = new Log(new AWSLogServcie());
$log->send('log......');

好接下來在拉回主題。

為什麼要使用 Laravel Container ?

上面我們的範例程式碼最後要執行時,會如下 :

<?php

$log = new Log(new AWSLogServcie());
$log->send('log......');

這樣事實上沒什麼問題。

但是如果這一段程式碼有很多地方使用怎麼辦 ? 有沒有可能系統中統一都要使用 AWS 的,但是其中一個地方忘了改,而不小心使用到 GCP ? 嗯這是有可能發生的。

還有另一個問題,這一段程式碼本身就依賴了Log這個類別,這樣事實上還是沒有解決依賴的問題。

因此 Laravel 建立了 Container,並且會在開啟服務時,先行註冊好,例如下面偽代碼。只要在這個 conatiner 內部的 class 都會根據它註冊好的東西來進行處理。

<?php

$containter = require('Container');

// 它會在這一段先將 ILogService 綁定好,如果 construct 中有使用到它的,將會將它實體化為 // AWSLogServcie。 
$containter->bind(ILogService, AWSLogServcie::class);

// 實體化 Log 類別。
$log = $container->make(Log::class);

$log->send('log....');
那有兩個類別,它們內部有使用相同抽像類別,但這時它們實際上要使用不同的類別要怎麼處理呢 ?

Laravel 官網有給個範例如下,Photo 與 Video 都有使用到 Filesystem 這個抽象類別,但它們實際上要使用不一樣的類別,則可以使用如下的方法來進行指定。

<?php

$this->app->when(PhotoController::class)
          ->needs(Filesystem::class)
          ->give(function () {
              return Storage::disk('local');
          });

$this->app->when(VideoController::class)
          ->needs(Filesystem::class)
          ->give(function () {
              return Storage::disk('s3');
          });

Contextual Bindings (上下文绑定)

Laravel 如何建立 Container ?

這裡我們就要開始來研究一下 Laravel Container 的原始碼。

首先最一開始是這裡,它會實體化一個 $app conatiner。

<?php

$app = new Illuminate\Foundation\Application(
    $_ENV['APP_BASE_PATH'] ?? dirname(__DIR__)
);

接下來我們來看一下 Illuminate\Foundation\Application 的程式碼。這裡可以知道 Application 繼承了 Container 這個類別。

<?php

class Application extends Container implements ApplicationContract, HttpKernelInterface
{
    public function __construct($basePath = null)
    {
        if ($basePath) {
            $this->setBasePath($basePath);
        }
        $this->registerBaseBindings();
        $this->registerBaseServiceProviders();
        $this->registerCoreContainerAliases();
    }

}

laravel5.7-container

然後 Container 類別中,有兩個方法是重點那就是bindmake

bind

建立抽象與實體的綁定表

bind 使用方式

基本上分為以下四種 :

<?php

// 1. 類別綁定 clouse
App::bind('UserRepository', function()
{
    return new AWSUserRepository;
});

// 2. 抽像類別綁定實際類別
App::bind('UserRepositoryInterface', 'DbUserRepository');

// 3. 實際類別綁定
APP::bind('UserRepository')

// 4. singleton 綁定
App::singleton('UserRepository', function()
{
    return new AWSUserRepository;
});
原始碼解析

laravel5.7-container-bind

<?php

/**
     * Register a binding with the container.
     *
     * @param  string  $abstract
     * @param  \Closure|string|null  $concrete
     * @param  bool  $shared
     * @return void
     */
    public function bind($abstract, $concrete = null, $shared = false)
    {
        $this->dropStaleInstances($abstract);
       
        // 例如這種 APP::bind('UserRepository') 的註冊,就會執行這一段。
        if (is_null($concrete)) {
            $concrete = $abstract;
        }
        
        // 如果是上面那種情況或是沒有 Closure,就直接產生一個 Closure。
        if (! $concrete instanceof Closure) {
            $concrete = $this->getClosure($abstract, $concrete);
        }

        // 綁定,就是用一個 HashTable 來建立綁定對應。
        $this->bindings[$abstract] = compact('concrete', 'shared');
        
        // 如果此類別已被 resolve 則進行 rebound。
        if ($this->resolved($abstract)) {
            $this->rebound($abstract);
        }
    }
    
        /**
     * Get the Closure to be used when building a type.
     *
     * @param  string  $abstract
     * @param  string  $concrete
     * @return \Closure
     */
    protected function getClosure($abstract, $concrete)
    {
        return function ($container, $parameters = []) use ($abstract, $concrete) {
            if ($abstract == $concrete) {
                return $container->build($concrete);
            }

            return $container->make($concrete, $parameters);
        };
    }

make

產生實際的實體物件

使用方法
<?php

$app->make('UserRepository');
原始碼解析

laravel5.7-container-make
laravel5.7-containier-resolve

<?php

 /**
     * Resolve the given type from the container.
     *
     * @param  string  $abstract
     * @param  array  $parameters
     * @return mixed
     */
    public function make($abstract, array $parameters = [])
    {
        return $this->resolve($abstract, $parameters);
    }


 /**
     * Resolve the given type from the container.
     *
     * @param  string  $abstract
     * @param  array  $parameters
     * @return mixed
     */
    protected function resolve($abstract, $parameters = [])
    {
        $abstract = $this->getAlias($abstract);

        $needsContextualBuild = ! empty($parameters) || ! is_null(
            $this->getContextualConcrete($abstract)
        );


        // 如果此抽象類別已經實體化了,且 construct 沒使用其它外部注入,則回傳此物件。
        if (isset($this->instances[$abstract]) && ! $needsContextualBuild)
        {
            return $this->instances[$abstract];
        }

        $this->with[] = $parameters;

        // 這個地方有兩種情況
        // 1. 從抽象類別的建構式取出有使用的類別,並回傳。
        // 2. 如果沒有,則從 bindings 中找出對應的實體類別。
        $concrete = $this->getConcrete($abstract);
        
        // isBuildable => true
        // 1. $concrete 與 $abstract 為相同 (也就直接使用類別來綁定)
        // 
        // isBuildable => false
        // 1. 直接使用介面。 
        // 2. $abstract 本身內部還有依賴的外部套件。
        if ($this->isBuildable($concrete, $abstract)) {
            $object = $this->build($concrete);
        } else {
            $object = $this->make($concrete);
        }
        
        // 不太懂
        foreach ($this->getExtenders($abstract) as $extender) {
            $object = $extender($object, $this);
        }

        // 註冊的類別如果被指定為 singleton 就要 cache 它。
        if ($this->isShared($abstract) && ! $needsContextualBuild) {
            $this->instances[$abstract] = $object;
        }

        $this->fireResolvingCallbacks($abstract, $object);

        // 記錄那個類別已經被 resolve
        $this->resolved[$abstract] = true;

        array_pop($this->with);

        return $object;
    }

    /**
     * Determine if the given concrete is buildable.
     *
     * @param  mixed   $concrete
     * @param  string  $abstract
     * @return bool
     */
    protected function isBuildable($concrete, $abstract)
    {
        return $concrete === $abstract || $concrete instanceof Closure;
    }

參考資料

 
21 days ago

在之前筆者的這篇文章中:

一個基於 AWS Elasticsearch 的用戶行為 log 系統建立

在們學習了如何使用 AWS 的相關工具來建立一個用戶行為的 LOG 分析系統。

但是這篇文章中所提到的架構有個問題。

這個版本有什麼問題呢 ?

那就是在某些情況下它會一直噴以下錯誤 :

ServiceUnavailableException: Slow down.

那為什麼會一直噴 Slow down 呢 ?

會發生這個的原因在於,我們有採到 aws firehose 的限制,如下:
Amazon Kinesis Data Firehose 有以下限制。

如果将 Direct PUT 配置为数据源,每个 Kinesis Data Firehose 传输流 都会受以下限制的约束:

* 对于 美国东部(弗吉尼亚北部)、美国西部(俄勒冈) 和 欧洲(爱尔兰):5,000 条记录/秒;2,000 个事务/秒;5 MB/秒。
* 对于 欧洲 (巴黎)、亚太地区(孟买)、美国东部(俄亥俄州)、欧洲(法兰克福)、南美洲(圣保罗)、亚太区域(首尔)、欧洲 (伦敦)、亚太区域(东京)、美国西部(加利福尼亚北部)、亚太区域(新加坡)、亚太区域(悉尼) 和 加拿大 (中部):1000 条记录/秒;1000 个事务/秒;1 MB/秒。


! 注意
当 Kinesis Data Streams 配置为数据源时,此限制不适用,Kinesis Data Firehose 可无限扩展和缩小。

來源 : 官網

加強版

原本的版本如下圖。

然後我們會將它修改成如下圖,就是在資料源與 firehose 之間多增加了 data stream。

使用 AWS data stream 有以下幾個好處 :

  • 可以自由的調整傳輸限制。(這樣就可以解決上述的問題)
  • 未來如果有其它單位想要接受這個資料源,那只要請對方接上這個 data stream,它就可以受到資料了。

AWS Kinesis Data Stream 申請

事實上就只有兩個東西要填寫Stream NameShard Number

其中這裡簡單的說一下 Shard 概念。

Stream Shard (碎片)

在 AWS kinesis data stream 中有個 shard 的概念,它就是指 stream 的子集合。

每條 stream 都是由 1 至 n 個 shard 所組合成,這樣有幾個好處 :

  • 在傳輸資料給 stream 時,可以將傳輸量平均的分散給不同 shard,這樣可以避免觸碰到每個 shard 的傳輸限制。
  • 你可以指定那一些類型的資料傳輸到 A Shard,那些類型的資料傳輸到 B Shard,這樣有助於你放便管理資料流。

Shard 的限制

上面有提到每個 stream 都有傳輸限制,這裡我們就來看一下它的限制有那些。

以下從 Aws 官網擷取 :

  • 單一碎片每秒可擷取多達 1 MiB 的資料 (包括分割區索引鍵) 或每秒寫入 1,000 筆記錄。同樣地,如果您將串流擴展到 5,000 個碎片,串流每秒即可擷取多達 5 GiB 或每秒 500 萬筆記錄。若您需要更多的擷取容量,可以使用 AWS Management Console 或 UpdateShardCount API 輕鬆擴展串流中的碎片數目。
  • GetRecords 每次呼叫可從單一碎片擷取最多 10 MiB 的資料,每次呼叫最多 10,000 筆記錄。每呼叫一次 GetRecords 即計為一筆讀取交易。
  • 每個碎片每秒可支援最多 5 筆讀取交易。每筆讀取交易可提供多達 10,000 筆記錄,每筆交易的上限為 10 MiB。
  • 每個碎片透過 GetRecords 每秒可支援最多 2 MiB 的總資料讀取速率。如果呼叫 GetRecords 傳回 10 MiB,在接下來的 5 秒內發出的後續呼叫將擲回例外狀況。
如何將資料傳輸到指定的 Shard

下面為一段 nodejs 寫入資料到 stream 的範例碼,其中注意到PartitionKey這個東東,它就是可以幫助你指定到想要的 Shard。

'use strict';

const AWS = require('aws-sdk');
const streamName = process.env['AWS_KINESIS_STREAM'];
const uuidv1 = require('uuid/v1');

const kinesis = new AWS.Kinesis({region: process.env['AWS_KINESIS_REGION']});

module.exports = {
  putRecord: (packet) => {
    return new Promise((resolve, reject) => {
    // 多加換行符號是因為這樣才能在 aws athena 進行解析

      const recordParams = {
        Data: JSON.stringify(packet) + '\n',
        StreamName: streamName,
        PartitionKey: uuidv1()
      };

      kinesis.putRecord(recordParams, (err) => {
        if (err) {
          reject(err);
        }
        resolve();
      });
    });
  }
};

PartitionKey基本上就是用來讓 AWS kinesis 來決定你要去那一個 Shard。

假設你的文件 A 傳輸時 PartitionKey 設為 GroupA 這個文字,那它就會跑到某個 Shard A 去,如果這時再傳輸個文件 B 並且 PartitionKey 也設定為 GroupA,那這一份文件也會傳輸到 Shard A。

所以當你想將同一類型的文件,都傳輸到同一個 Shard 時,記得將 PartitionKey 設為相同。

但如果是想將它平均分散到每一個 Shard 呢 ?

事實上有兩個方法,首先第一種方法就是每一丟資料時,先去抓這個 stream 看它有幾個 shards,然後再根據它的數量,來隨機產生個數字,例如有 4 個 shards 那你每次丟資料時,就從 1 ~ 4 隨機產生一個數字,然後再將它設到 PartitionKey 中,那這樣基本上就會平均分配。

而另一種方法就是每一次的 PartitionKey 都使用 uid 來設定,這樣也可以將他平均的進行分配。

不過我是比較建議用第二種,因為第一種每一次都要去 AWS 那抓取 stream 裡的 shards 大小,這樣太耗時間了。

參考資料

 
4 months ago

我相信有使用過 Elasticsearch 的人都應該是會被他的日期時區的問題搞到很火。

在開始搞前先說說我的簡單需求:

馬克大希望可以使用 ISO 標準來進行範圍搜尋,例如2017-07-16T19:20:30

這時通常時間的儲法會有兩種選擇:

  • Timestamp
  • ISO 標準

咱們先來看看 Timestamp 的儲法與查找

下面為範例程式碼(nodejs),其中 putRecord 我就不寫了,因為只是範例,反正就是透過 aws kinesis 來將資料丟到 aws elasticsearch 上。

其中 test 為我們要丟到 elasticsearch 的資料,這裡我們要注意的 created_at 我們將會丟 timestamp 的進去。

const streamName = 'mark-stream';

const test = {
  name: 'Mark III',
  age: 40,
  created_at: Date.now() // timestamp 1533634630945 ,
};

putRecord(streamName, test, (err, res) => {
  console.log(err);
});

Elasticsearch 查找

然後我們直接下來找找剛剛新增的那一筆。

curl 127.0.0.1/_search?pretty
{
        "_index" : "api",
        "_type" : "log",
        "_id" : "2139103",
        "_score" : 1.0,
        "_source" : {
          "name" : "Mark III",
          "age" : 40,
          "created_at" : 1533634726145 (2018年08月07日17點38分46秒),
        }
      }

那接下來我們在來根據時間區間來進行搜尋會如何呢??

POST _search
{
    "query": {
        "range" : {
            "created_at" : {
                "gt" : "2018-08-07T17:00:22" ,
                "lt" : "2018-08-07T18:00:22",
            }
        }
    },
    "sort": [
    {
      "created_at": "asc"
    }
  ]
}

找不到 !!!!

然後我們如果改成如下的 query,就找的到了……

POST api/_search
{
    "query": {
        "range" : {
            "created_at" : {
                "gt" : "2018-08-07T09:00:22" ,
                "lt" : "2018-08-07T10:00:22"
            }
        }
    },
    "sort": [
    {
      "created_at": "asc"
    }
  ]
}

為什麼儲 timestamp 的搜尋要將時間減 8 小時呢 ??

先來看看 timestamp 的意思為啥 ?

timestamp 是指格林威治時間1970年01月01日00时00分00秒到現在的總秒數。

那格林威治時間離咱(台灣)這裡多遠 ?

  • 8 個小時

所以說 1533634726145 實際上儲放是指2018年08月07日9點38分46秒而不是2018年08月07日17點38分46秒,所以假設我們不給時間區域而直接下 query 就會發生找不到的情況,下面為我們有給時間區域的下法,這樣就找的到了。

這種下法的意思就是,我們要找這段時間的資料,並且我們的時間區域為+8小時。

POST api/_search
{
    "query": {
        "range" : {
            "created_at" : {
                "gt" : "2018-08-07T17:00:22",
                "lt" : "2018-08-07T18:00:22",
                "time_zone": "+08:00"
            }
        }
    },
    "sort": [
    {
      "created_at": "asc"
    }
  ]
}

那在 kibana 的 discover 要如何下 query ?

在 kibana 如果執行下面的 lucene query 的話,會找到不到。

created_at: [2018-08-07T17:00:22 TO 2018-08-07T18:00:22]

一樣要和他說明你現在在什麼時區才可以找到。

created_at: [2018-08-07T17:00:22+08:00 TO 2018-08-07T18:00:22+08:00]

再來看看 ISO 標準的儲法

const streamName = 'mark-stream';

const test = {
  name: 'Mark III',
  age: 40,
  created_at: "2018-08-07T17:00:22Z" //這個時間已經有先加 8 個小時了,
};

putRecord(streamName, test, (err, res) => {
  console.log(err);
});

先單純的看 Elasticsearch 查找有沒有問題 ~

然後我們一樣先用 es 的 search 來找找。

POST _search
{
    "query": {
        "range" : {
            "created_at" : {
                "gt" : "2018-08-07T16:30:22" ,
                "lt" : "2018-08-07T17:30:22",
            }
        }
    },
    "sort": [
    {
      "created_at": "asc"
    }
  ]
}

然後發現 找得到 !!!

主要原因是我們直接是儲放固定的 ISO 時間,而不像是上面 timestamp 它會幫你先轉一下成 ISO 然後你在查找,它在幫你轉時,會轉成 +0 的時間,所以才會找不到。

再來看看 kibana 內的顯示與查找有沒有問題 ~

首先 kibana 內顯示會有問題 !

首先你只會看到下面這個資料,注意我們上面儲的是2018-08-07T17:00:22Z,WTF 為啥kibana 顯示變成2018-08-08T01:00:22.22

問題在於 kibana 認為 Elasticsearch 裡所儲放的時間區段為+0,所以到了 kibana 預設會判斷你的 browser 設定那個時區,然後咱們這是台灣所以會自動的轉換成:

2018-08-07T17:00:22Z + 8 h = 2018-08-08T01:00:22.22

name:Mark III 
age:40 
created_at: 2018-08-08T01:00:22.22 
_id:49585877623136865488831537954762517193201839360268304386.0 _type:log 
_index:api 
_score:1
搜尋沒問題 !
created_at: [2018-08-07T16:30:22 TO 2018-08-07T17:30:22]
那要如何顯示的問題呢 ?

建立在 elasticsearch 時,儲 ISO 時和他說時間區段,如下,注意多了+08:00

const streamName = 'mark-stream';

const test = {
  name: 'Mark III',
  age: 40,
  created_at: "2018-08-07T17:30:22+08:00" //這個時間已經有先加 8 個小時了,
};

putRecord(streamName, test, (err, res) => {
  console.log(err);
});

然後不論是在 elasticsearch 或 kibana 搜尋時,時間都要多加時間區段:

created_at: [2018-08-07T16:30:22+08:00 TO 2018-08-07T17:30:22+08:00]
POST _search
{
    "query": {
        "range" : {
            "created_at" : {
                "gt" : "2018-08-07T16:30:22+08:00" ,
                "lt" : "2018-08-07T17:30:22+08:00",
            }
        }
    },
    "sort": [
    {
      "created_at": "asc"
    }
  ]
}

結論

基本上如何一開始就選擇儲 timestamp 那後來只要在查找時,標示你現在是在那個時區,那就都可以搜尋到。但是如果一開始就要儲 ISO 標準時,就要用上面的儲法,這樣在 es 與 kibana 才能查找到你需要的資料。

順到說一下,我一開始選擇時會選擇 ISO 標準有以下幾個原因:

  1. 服務沒有多時區的問題。
  2. 希望 es 中儲放 iso 標準,有助於直接尋找時,可以很容易知道他的時間點,不需要在透過 kibana 或其它工具來轉換顯示。
  3. 因為同時間還有一份 log 會儲放到 s3,如果是儲 timestamp 我們使用 athena 查找後的顯示,也很難看,而且如果是將檔案抓下來用 grep,就更麻煩了。
 
5 months ago

在筆者的一個基於 AWS Elasticsearch 的用戶行為 log 系統建立中,我們建立了一個使用者行為分析的 log 系統,文章中有提到,我們會將比較舊的 log 放置到 S3 中,所以本篇文章我們將要學習的主題為:

如何時用 AWS Athena 來尋找 S3 中的資料

另外本篇另一個外傳也要順到說一下,這外傳的主題為:

使用 AWS Kinesis 丟 json 資料到 S3 ,你會總是只能 query 到一行資料 !

接下來下面為本篇章節:

  • AWS Athena 的簡單介紹
  • 使用 AWS Athena 將 S3 的檔案建立成類似 SQL 的 Table
  • 使用 AWS Athena 來進行 query (日期區間、指定欄位、大小數量)
  • 坑 ! 使用 AWS Kinesis 丟 json 資料到 S3 ,你會總是只能 query 到一行資料 !

AWS Athena 的簡單介紹

簡單的白話文說一下 AWS Athena 是啥:

它可以讓我們使用 SQL 來去 S3 搜尋你要的資料

它的收錢機制是,你下的 SQL 去掃描了多少資料,來決定你要付多少 $$ (所以 query 請下準確,不要讓它去掃多於的資料)

它似乎有索引的機制,但很貴 !

接下來我們將簡單的說明操作方式。

使用 AWS Athena 將 S3 的檔案建立成類似 SQL 的 DB 與 Table

第一步: 建立 DB

在 query 欄位裡面輸入以下的 sql。

CREATE DATABASE markDB

執行結果如下,然後你就會看到左邊有個 markDB 的選項。

第二步: 建立 Table

首先我們 S3 的測試檔案內容如下:

{"name":"Mark","age":20,"created_at":"2018-07-03T15:08:43.626Z","fans":[{"name":"Ian"},{"name":"Jack"}]}
{"name":"Mark II","age":30,"created_at":"2018-07-03T15:09:13.416Z","fans":[{"name":"Mary"},{"name":"John"}]}
{"name":"Mark III","age":40,"created_at":"2018-07-03T15:09:56.975Z","fans":[{"name":"Mary"},{"name":"Baba"}]}
格式
{
  name: <string>,
  age: <integer>,
  created_at: <string> (iso 日期格式)
  fans: [
    name: <string>
  ]
}

然後我們要先進入到 Athena 頁面裡,根據上述的檔案內容,建立一個 Table。其中比較需要注意的為ROW FORMAT SERDE,這個就是我們的檔案解析器,假設你的檔案是存 csv 格式的,那這裡就需要更換。

CREATE EXTERNAL TABLE IF NOT EXISTS markdb.user (
  `name` string,
  `age` int,
  `created_at` string,
  `fans` array<struct<name:string>> 
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = '1'
) LOCATION 's3://marklin-s3/api2018/'
TBLPROPERTIES ('has_encrypted_data'='false');

然後建立好後,你可以直接下達下面的 sql 來看看有沒有資料:

SELECT * FROM user

結果如下圖:

使用 AWS Athena 來進行 query (日期區間、指定欄位、大小數量)

接下來,我們來根據常用的情況,來看看在 Athena 要如何搜尋到你要的資料,事實上就只是下下 SQL 而以。

更多的操作符號請參閱此篇文章

馬克大想要從 S3 中尋找日期為 20170703-15:09:01 以後所建立的 user ,要如何下呢 ?

如下,這裡我說明一下,為什麼我這裡用from_iso8601_timestamp,主要的原因在於我的日期格式是存 iso 格式,也就是長這樣"2018-07-03T15:09:56.975Z",在 Athena 中,如果你要使用 iso 格式來當你的格式,那有以下兩個地方要注意:

  • 在 create table 時你的欄位要設成string不能用Date or Timestamp
  • 在 query 時,你要用from_iso8601_timestamp來將 string 轉成 timestamp 來進行搜尋。
SELECT * FROM user 
WHERE from_iso8601_timestamp(created_at) > from_iso8601_timestamp('2018-07-03T15:09:00Z')

馬克大想要找 fans 中有 Ian 的 user 要如何下呢 ?

SELECT * FROM user 
CROSS JOIN UNNEST(fans) AS t(fan)
WHERE fan.name = 'Ian'

馬克大想要找 user 中 age 大於20與小於40歲的 user 資料,要如何下呢 ?

SELECT * FROM user 
WHERE age BETWEEN 25 AND 35 

坑 ! 使用 AWS Kinesis 丟 json 資料到 S3 ,你會總是只能 query 到一行資料 !

事情是這樣,我這裡都是使用以下的方式來將資料丟到 S3。

資料來源(json) -> AWS kinesis -> S3

然後呢我每一次在 AWS Athena 下 query (table 用上面的 script 建立)時,都會發現每一次下抓全部資料的 query 時都只有第一筆資 !

後來調查了一下,我發現 Kinesis 幫我送到 S3 時,產生的檔案如下:

{ name: 'Mark', age: 20 }{ name: 'Ian', age: 30 }

然後這份文件中,下面這段話有說到,每一個 json 記錄都只能單一行 !

Be sure your JSON record is on a single line

Athena doesn't currently support multi-line JSON records.

Fuc u ! 所以正確能解析的格式是應該要如下:

{ name: 'Mark', age: 20 }
{ name: 'Ian', age: 30 }

我最想罵的就是是 AWS kinesis 自已幫我儲的,明知 Athena 有這規則,卻不來個換行。

所以呢,最後為了要解決這個問題,我就只到在每一次丟的 json string 都加了一個\n。不要笑 ~ 連我在 Amazon 工作的同事聽到都直接噴飯,都說是 bug 吧。

JSON.stringify(data) + '\n'

喔對了,上面是在資料來源時加個換行,如果你覺得資料來源很多,懶的在這加,那就只能使用 AWS lambda 來進行處理,就有點像下面這流程:

資料來源 -> AWS kinesis -> AWS lambda(加換行) -> S3

有時我在懷疑,是不是 AWS lambda 的收錢策略呢?

參考資料

AWS Athena 用戶指南

 
5 months ago

上一篇文章『一個基於 AWS Elasticsearch 的用戶行為 log 系統建立』中我們說明了,如何使用 AWS Elasticsaerch 來建立收集 log 的系統,而 log 系統通常也有一種需求,那就是需要定期的清除舊的 log ,所以本篇文章的主題為:

要如何定期的清除 Elasticsearch 文件 ?

然後我們會分成以下幾個章節:

  • 最直覺式的定期刪除方法與缺點。
  • 為什麼大量文件的清除對 Elasticsearch 會很耗資源呢 ?
  • 大量文件清除方法 - 時間索引策略。

最直覺式的定期刪除方法與缺點

假設有以下的資料:

{
  data: 'Hi Mark ~',
  created_at: '20180101'
},
{
  data: 'HI Fuc u Mark',
  created_at: '20180201'
}

那我們要清除 1 月份的 log ,那我們最直覺的做法,應該會如下的操作:

  1. 搜尋所有 created_at 為 1 月的 doc。
  2. 再將所有搜尋出的 doc 給清除。

上面這方法在小量資料時,是沒問題的,問題是出在大量資料。

那為什麼大量資料刪除會有問題呢 ?

Elasticsearch 在進行刪除時,它是將 doc 給一個此 doc 已刪除的 tag ,然後再接下來的搜尋時,會自動將有 tag 的 doc 給過濾掉,這也代表在清除的當下資源沒有被釋放出來。

接下來 Elasticsearch 會在某個條件下,會執行segment merging的工作,這個時後它才會將實際上的文件清除,而且這個工作在大量資料下,會非常的消耗 cpu 與 i/o 的資源。

為什麼大量文件的清除對 Elasticsearch 會很耗資源呢 ?

要理解這個問題,我們就要從倒排索引開始說起,不熟可以去我這篇Elasticsearch 的 Document 建立原理看個兩三下。

倒排索引的建立

首先,每當一個 doc 建立時,它會先被丟到一個叫 memory buffer 的地方,等到一段時間 or buffer 滿了,系統會將它建立成一個segment,如下圖。

而這個 segment 就是我們的倒排索引集合,它也是在我們在進行搜尋時,會實際去尋找的地方。這裡有一個很重要的事情要說:

每一個 segment 內的倒排索引是不可以變的。

所以說如果你新增了第二個 doc ,它會在去新增一個 segment,那你在『某段時間』內,會有 2 個 segment,然後搜尋時,就是去每個 segment 中搜尋,然後將結果進行合併,得出結果。

多個 segment 會有什麼問題呢 ?

首先 segment 裡面是存放倒排索引的資訊,而這個東西,它是寫在硬碟中。所以如果每一次進行搜尋時,有 100 segment 個代表你要開啟 100 個檔案,如果越來越多,你的 i/o 與 file descriptor 遲早會出問題。

解決多個 segment 的方法 segment merging,但它很耗資源

所以 Elasticsearch 提出了一個機制,那就是segment merging

這個東東將是會定時的將小 segment 合成一個大的 segment,如下圖。

這項工作是非常的消耗資源的,如果你有 100 個小的 segment,你就要將開啟 100 條的 i/o 並且需要進行大量的運算。

而且如果你還大量刪除了 doc ,它還要去某個檔案中,抓取已刪除的檔案編號,然後在和原本的每一個 segment 進行比對,再組合成 1 個新的 segment。這想也知道會非常的耗 CPU 與 i/o 資源。

大量文件清除方法 - 時間索引策略

所以為了解決這個問題,我們將會使用時間索引策略來進行 doc 的刪除。

這策略概念就是每天(or 區間)產生一個 index,然後過時了再砍掉它

首先為了別讓人搞混,我先畫張圖,此索引非倒排索引。從下圖中我們可以知道一個 Elasticsearch 的 Index 是由不同節點的 shard 組合而成,然後每個 shard 裡面包含了 doc 與 segment。

所以說,我們可以知道每個 segment 都是包含在一個 index 中,那我們想想看下面這個問題 ?

假如我們直接刪除了 index 後,segment 會著麼樣呢 ?

答案就是 doc 與 segment 都一起消失,不需要在做那些 segment merging 啥的。

所以我們這裡的策略就是:

根據時間來建立 index (假設每天) ,然後每當要清除舊 log 時,我們就將指定的 index 給清除就好,這樣就不需要執行 segment merging 這種耗資源的工作了

時間索引策略 part1 - 建立 index template

首先我們要建立一個 index 的 template,如下,建立完成以後,我們接下來每次只要建立的索引名稱為api-*這種類型 (ex. api-2018-01-01) ,系統就會依照下面的範本來進行建立。

然後aliases就是別命,假設建立出來的索引為 api-2018-01-01,我們就可以使用 api 這個別命來操作它,所以這也代表我們每一次新增 doc 指令索引時,不用一直換啊換。

 await client.indices.putTemplate({
      name: 'api-template',
      template: 'api-*',
      body: {
        'settings': {
          'number_of_shards': 5,
          'number_of_replicas': 1
        },
        'aliases': {
          'api': {}
        },
        'mappings': {
          'log': logMapping
        }
      }
    });

時間索引策略 part2 - 排程每天建立一個新的索引,並將操作指向它

下面這個 api 就是會依據max_agemax_docs的條件,來決定是否建立索引,其中一個符合,那就會建立新的索引,並且將操作(新增 doc)指向這個新建立的索引。

下面這個範例,rollover 會自動依流水號來建立 index,當然也可以依據時間來建立,請參考官網這篇文章

POST api/_rollover
{
  "conditions": {
    "max_age":   "1d",
    "max_docs":  5
  }
}
{
  "old_index": "api-logs-1",
  "new_index": "api-logs-2",
  "rolled_over": true,
  "dry_run": false,
  "conditions": {
    "[max_docs: 5]": true,
    "[max_age: 7d]": false
  }
}

P.S 如果是使用 AWS kinesis 的話,這一步可以不用做,在設定它時,有個叫 Index rotation 的參數可以設定,它可以設定 hour、day、week、month,功能就和上面的一樣

時間索引策略 part3 - 定時的清除索引

如果你的索引名稱如api-2018-01-01這種類型的話,你可以依據它來選擇清除,而如果你的命名不是這樣的話,那你可以使用下面這個 api 來知道每一個索引的建立時間。

curl http://localhost:9200/_cat/indices\?h\=h,s,i,id,p,r,dc,dd,ss,creation.date.string

結果如下圖。

時間索引策略 part4 - 搜尋的操作

rollover api 上面有提到,會將所以的操作自動的轉向到新的索引,所以你如果要進行搜尋操作時,你可以執行下面的指令,這樣你所有的索引都可以尋找到。

curl http:127.0.0.1:9200/api-*/_search?pretty

參考資料

 
6 months ago

本篇文章中,我們要說明的主題為 :

如何使用 AWS Elasticsearch 來建立一個用戶行為 log 系統。

本篇文章中,我們將分成以下的主題:

  1. Log 系統的架構說明
  2. AWS 的工具申請 (Elasticsearch、Kinesis、S3)
  3. Log client 端的小實作

Log 系統的架構說明

V1

一個最簡單的 log 架構,應該會長的如下圖一樣,一個 log 來源與 log 接受端。

其中 log 接受端,有很多種選擇,你可以選擇來源端的本機,並且選擇將之儲放成文字檔,又或是儲放在某個資料庫中,各種儲放法都優有缺。

這裡我們選擇了使用Elasticsearch來當接受端,主要的理由如下:

  1. 可以進行快速的搜尋
  2. 可擴展性強

但相對的與文本儲放相比,那缺點就是空間一定比文本的大,因為文本可以壓縮,不過文本的搜尋速度可就 QQ 囉。

V2

那 V1 有什麼缺點呢 ? 假設我們 Elasticsearch 上天堂,或是要停機更新一下,那這些 log 會著麼樣呢 ? 當然就是消了囉,雖然你可能會覺得 log 消失一些沒啥差別,但如果剛好是出問題的地方,那你真的會罵髒話了。

所以這裡我們會增加一個Broker,架構圖如下,所有的資料來源都會先送到Broker來後在送到儲放點。

這裡我們選擇了AWS kinesis,它的優點如下:

  1. 擁有 Queue 的機制,也就是說如果資料儲放點上天堂在 24 小時以內,只要回復了,它會自動將這些 log 在丟過去。
  2. AWS Kinesis 可處理任何數量的串流資料,不用擔心它爆掉就對了。
  3. 可以設定 log 同步也備份到 S3。
!!! 2018-11-02 Updated

注意關於第二點,AWS Kinesis 可以處理任何數量的串流這句話,是有條件的,要使用 AWS Kinesis Data Streams 然後在接到 AWS kinesis firhose stream 才能擴展數量。

V3

那 V2 有啥缺點呢 ? 事實上已經沒啥太大的缺點,但是有個問題,因為我們資料來源端是儲放在 Elasticsearch ,而它的缺點就是,成本比較高,基本上 1 MB 的壓縮文檔 log ,轉換到 Elasticsarch 中大約會乘上 10 ~ 15 倍,所以除非公司錢很多,不然不會將太多的 log 儲到 Elasticsearch 中。

所以這裡我們的方案是,只在 Elasticsearch 中儲放約 1 個月的資料,然後超過一個月的資料都將儲放到 S3 中,有需要時在時用AWS Athena來查詢。

最後架構就長的如下圖:

AWS 的工具申請 (Elasticsearch、Kinesis、S3)

由於 Elasticsearch 與 S3 建立資訊,網路上都很多了,所以本篇就不多說囉。

AWS Elasticsearch

下圖為 aws elasticsearch 建立好的狀態,然後你只要用 curl 打它給的網址,有出現像下面的訊息輸出,那就建立成功囉。

AWS S3

就是點個 create bucket 那個鈕,然後一直按就好了,然後其它細節 google 一下就有囉。

AWS Kinesis

接下來 AWS Kinesis 的設定 google 比較難找到,所以來個比較詳細點兒的說明。

AWS Kinesis 有分為四種,其中我們要使用的為Amazon Kinesis Data Firehose

然後建立步驟如下。

1. 填寫它的 Delivery stream name

到時我們要丟 log 到 stream 時需要使用到他。

2. 選擇資料來源

它有兩個選項分別為 Direct PUT or other sources 與 Kinesis stream ,這裡我們選擇 Direct PUT or other sources,我們只要知道選了它,就可以使用 AWS SDK 的 PUT APIs 來將資料丟到 Kinesis 中。

3. 選擇是否要將資料進行加工後,再丟到儲放端

這裡可以讓我們決定,是不是要將 kinesis 中的資料,經過『加工』後,再丟到儲放端,加工的選擇有 AWS Lambda 或是 AWS Glue ,這裡我們先不需要處理,所以都選Disable

4. 選擇資料儲放端 - AWS Elasticsearch

然後選擇我們要把資料丟到 Amzaon Elasticsearch Service。

5. 設定 AWS Elasticsearch 的目的

這裡我們一個一個來看要填啥。

  • Domain: 就是選擇你建立好的 AWS ES 的 domain,正常來說你點那個選項鈕應該都會跑出你剛剛建立的 AWS ES。
  • Index: 選擇你要將資料丟到 ES 的那個索引,如果那個索引不在則會自動新建一個。這裡我們建立一開始先預先建好,如果沒有,它會依據你丟的 doc 來建立一個索引,而這索引可能有很多你用不到的東西。
  • Index rotation: 這個地方你可以設定是否給你的索引設定時間簽,假設你上面設的索引為api,那如果選擇一天,那生出來的索引會長成api-2018-01-01這樣,然後你到第二天時再丟個 doc 索引會長成api-2018-01-2,這裡關係到索引策略的問題,如果只是簡單試用,就不用設這個 (會另開一篇來討論索引策略)。
  • Type: 就是設定 ES Index 中 type 選擇。
  • Retry duration: 就是重試時間。

6. 設定 S3 備份

這就是 kinesis 方便的地方,他可以自動的幫我們將 log 備份到 S3,而你可以選擇全部備份或是失敗的 log 才記錄。

7. 設定 AWS kinesis 的執行區間

AWS kinesis 並不是一收到資料就直接將它丟到儲放端,它有兩個條件。

  1. Queue 的 buffer 大小 (1 - 100 MB)。
  2. 幾秒鐘一次 (60 - 900 sec)。

所以記好,這個系統架構,並不是丟了一個 log 指令後,馬上就會在 Elasticsearch 看到 ! 最快也要一分鐘後。

8. 設定 S3 是否要壓縮與加密 (END)

這個地方就是決定 S3 備份要不要壓縮與加密,這裡會不會影響到AWS Athena查詢,需待查。

Log client 端的實作

要使用 AWS SDK APIs 要先在我們的家目錄中的~/.aws/credentials設定一個檔案,內容如下:

[default]
aws_access_key_id=your access key
aws_secret_access_key=your secret access key

然後我們就可來進行實做,咱們使用nodejs來將 log 丟到AWS kinesis中。

下面的程式碼就是將 log 丟到 AWS kinesis 中,就是如此的簡單。這裡有兩個東西要注意一下,首先是region記得要選擇你所建立 kineses 所在的區域 ; 另一個就是streamName,這個記得要改成你所建立的 stream 名稱。

const AWS = require('aws-sdk');
const firehose = new AWS.Firehose({region: 'ap-northeast-1'});

function putRecord (dStreamName, data, callback) {
  var recordParams = {
    Record: {
      Data: JSON.stringify(data)
    },
    DeliveryStreamName: dStreamName
  };

  firehose.putRecord(recordParams, callback);
}

const streamName = 'mark-api-stream';
const time = (new Date()).toISOString();
const log = {
  data: `HI Mark ${time}`
};
putRecord(streamName, log, (err, res) => {
  console.log(err);
  console.log(res);
});

最後送完後等一分鐘在去 Elasticsearch 與 S3 應該就會有資料了。

curl 'your aws elasticsearch ul'/{index}/_search?pretty

參考資料

 
8 months ago

在這一篇文章中,我們將要理解兩個問題 :

  1. 在新增一個 document 時,會建立 json 實體與索引,那這兩個東東會存放到那兒去 ?
  2. 而在建立索引時,它又存放了什麼東東 ?

在開始前,我們先簡單的複習一下 Elasticsearch 的基本觀念。

Elasticsearch ( ES ) 的前提觀念概要

Elasticsearch 是一種分散的搜尋引擎,它也有和關聯式資料庫相似的結構,如下圖。

所以假設我們要新增一筆 document 應該是會長的像下面這樣。

POST /markcorp/employee (/(index)/(type))

上面這行的語意就是新增一筆 document 到 markcorp (index) 的 employee 類別
(type)

{
  id: 123
  name: ‘Mark’,
  age: 18
}

然後當我們要去 ES 尋找這筆資料時,就可以使用它提供的 Restful API 來直接尋找:

GET 127.0.0.1:9200/markcorp/employee/123

在有了簡單的基本概念後接下來就可以來尋找我們這篇文章的問題。

新增一個 document 時資料會存放到那 ??

像我們上面已經建立好了 document ,那實際上在 ES 中它是存放在那呢 ?? 雖然我們上面說它是對應到 RDBMS 的概念,但實際存放的地方不是存放在 markcorp 這個資料庫下的 employee 表下。

嚴格來說它是存放在 markcorp 這個 index 裡面,並且它的類型是 employee 。

那 index 裡的實體 document 又是放在那裡呢 ??

答案是在shards裡,咱們可以執行下面的指定看到 index 底下有那些 shards

curl 127.0.0.1:9200/markcorp/_search_shards

那 shards 是什麼,不如我們先來說說為什麼要有 shards ,一個 index 有可能會存放大量的 document ,這時所有的 document 都存放在同一個地方,一定會產生硬體貧頸,所以為了解決這個問題 ES 提供了方法可以將 index 分散成塊,而這個塊就被稱為『 shards 』。

所以簡單的說 index 是由多個 shard 所組成,然後 document 會實際存放在 shards 中。

然後不同的 shards 可能會存放在不同的節點上,而這裡指的節點就是指不同實體,你也可以先想簡單一點就是機器。

上面這張圖就是 ES 的 Cluster 基本架構,每當有文檔要建立時,它會依據 index 有那些分片,然後來將它丟到『某一個』分片中,當然它有辦法指定到分片中,不過這不是該篇文章要討論的主題。

我相信有人看到上面那張圖時會想說,如果其中一個節點上天堂了,那不就代表那個節點的 document 都會找不到嗎 ? 沒錯 ~ 不過你想想你都想得到了,那開發 ES 的人會想不到嗎 ??

ES 的解法就是每一個節點除了存放你上面看到的分片,它事實上還多存放了其它節點的備份分片,如下圖,假設我們有三個節點,然後每個節點上面有一個分片和另一個分片的備份,而當節點 2 上天堂時,我們在節點 1 還有分片 2 的備份,所以還是可以找的到分片 2 的 document 。

那它除了將 document 實體建立起來,還有建立什麼東西嗎 ??

有的,那就是建立一些索引(此索引不是上面說的 index ),來幫助我們更快速的搜尋到它。

而要理解它存了啥,那就要來理解理解 ES 的倒排索引,如下圖,它的方向就是 ES 要搜尋時跑的方向,它會先去 term index 中尋找某個東西,然後可以指到 term dictionary ,接下來在從 dictionary 可以找到指定的 posting list ,最後 posting list 裡面就列了,你要的 document 編號。

接下來我們將從 posting list 開始來說起,以下為範例 documents 。

{
  id: 1,
  name: ‘Mark',
  age: 18
},
{
  id: 2,
  name: ‘Ack',
  age: 28
},{
  id: 3,
  name: ‘Ad'
  age: 17
}
{
  id: 4,
  name: ‘Ban'
  age: 28
}

Posting List

上面的 documents 會被轉換下面的列表,所以如果 client 要搜尋 age 為 28 歲的,馬上就能找到對應的 2 與 4 號 document。

name
Term Posting List
Mark 1
Ack 2
Ad 3
Ban 4
age
Term Posting List
18 1
17 3
28 [2,4]

Term Dictionary

上面搜尋的方法看試可以,但假設有成千上萬個 term 呢 ? 例如你 name 的 term 有好幾千萬筆,所以為了解決這個問題 ES 會將所有的 term 進行排序,這樣就可以使用二分搜尋法來達到 O(logn) 的時間複雜度囉。( 二分搜尋可參考哥的這篇文章 傳送門)

所以 name 這個會變成下面像下面這張表一樣,有排序過的 term。

Term Dictionary Posting List
Ack 2
Ad 3
Ban 4
Mark 1

Term Index

上面這些東西如果數量小時,放在記憶體內還行,但問題是如果 term 很多,導致 term dictionary 非常大的話,放在記憶體內會出事情的,所以 ES 的解法就是Term Index

Term Index 本身就是像個樹,它會根據上面的 term dictionary 產生出樹如下圖:

Term Index 基本上是存放在記憶體中,每當進行搜尋時會先去這裡尋找到對應的 index ,然後再根據它去硬碟中尋找到對應的 term dictionary 最後就可以成功的找到指定的 document 囉。

結論

最後簡單的總結一下本篇文章所提的兩個問題的結論。

1 . 在新增一個 document 時,會建立 json 實體與索引,那這兩個東東會存放到那兒去 ?

Ans: 會存放到某一個 shard 中,而 shard 又存放在每個節點裡面。

2 . 而在建立索引時,它又存放了什麼東東 ?

Ans: 會建立三個東西分別為 Posting List 、Dictionary 與 Term Index ,其中前兩者是存放在硬碟中,而最後的 index 是存放在記憶體中。

參考資料

 
8 months ago

本篇文章中,我們將要很快速的學習以下幾個重點:

  1. elasticsearch 的基本觀念。
  2. 使用 docker 建立 elastisearch 服務。
  3. 新增 document。
  4. 取得 document。
  5. 修改 document。
  6. 搜尋 document。

elasticsearch 的基本觀念

Elasticserach 是一種分散式的搜尋引擎,它非常適合用來處理大量資料的搜尋與分析,像 github 就是拿他來搜尋它們所有的程式碼,而且它也提供了豐富的 restful api 來給我們進行操作。

Elasticserach 有這和關聯式資料庫相似的結構,如下圖。

所以假設我們要新增一筆在 markcorp 某一位員工的文檔會長的如下:

index: markcorp
type: employee

{
  id: 123
  name: ‘Mark’,
  age: 18
}

然後當我們要去 ES 尋找這筆資料時,就可以使用它提供的 Restful API 來直接尋找:

GET 127.0.0.1:9200/markcorp/employee/123

使用 docker 建立 elastisearch 服務

接下來的教學可以直接用這個專案來直接執行:

git clone https://github.com/h091237557/docker-composer-tools.git
cd elasticsearch/
docker-compose up

下面為官網所直接使用的docker compose的檔案。(官網傳送門)

version: '2.2'
services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:6.2.3
    container_name: elasticsearch
    environment:
      - cluster.name=docker-cluster
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - esdata1:/usr/share/elasticsearch/data
    ports:
      - 9200:9200
    networks:
      - esnet
  elasticsearch2:
    image: docker.elastic.co/elasticsearch/elasticsearch:6.2.3
    container_name: elasticsearch2
    environment:
      - cluster.name=docker-cluster
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      - "discovery.zen.ping.unicast.hosts=elasticsearch"
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - esdata2:/usr/share/elasticsearch/data
    networks:
      - esnet

volumes:
  esdata1:
    driver: local
  esdata2:
    driver: local

networks:
  esnet:

以下有幾個配置要注意一下。

environment

  • cluster.name : 這個就是設定這個 ES cluster 的名稱,所有在相同機器上且命名相同 cluster.name 的都將在相同的 cluster 裡。
  • bootstrap.memory_lock : 這個設定 true 是為了要防止 swapping 抓到 ES 的 memory 來用,導致節點不穩而脫離 cluster。官網
  • ES_JAVA_OPTS: -Xms512m -Xmx512m 代表設定 ES 的最大與最小的 heap 空間為 512 mb。
  • discovery.zen.ping.unicast.hosts: 這是為了讓此節點知道去連結 elasticsearch ( docker 節點 )。

ulimits

這個參數就是可以設定 docker 容器的 ulimits 參數,其中官網這裡會設定 memlock,事實上我還在研究它。不過主要事實和上面的 bootstrap.memory_lock 的原因有關,待調查。

ulimits:
      memlock:
        soft: -1
        hard: -1

確保 Elasticsearch 有成功執行

請指定執行下面的執令,然後該會看到如下的資訊。

curl 127.0.0.1:9200

---
{
  "name" : "OcaPXYM",
  "cluster_name" : "docker-cluster",
  "cluster_uuid" : "Cg2ogE6ETbOhSEh0E8m-3w",
  "version" : {
    "number" : "6.2.3",
    "build_hash" : "c59ff00",
    "build_date" : "2018-03-13T10:06:29.741383Z",
    "build_snapshot" : false,
    "lucene_version" : "7.2.1",
    "minimum_wire_compatibility_version" : "5.6.0",
    "minimum_index_compatibility_version" : "5.0.0"
  },
  "tagline" : "You Know, for Search"
}

新增與取得文檔

我們試這新增一筆 markcorp 的一筆員工資料看看,上面有提到 ES 提供了 restful api 給我們操作,所以我們只要準備好員工資料的 json 檔。

{
  name: 'Mark',
  age: 18,
  habit: 'Cut someone' 
}

然後使用 curl 執行下面的指令就可以新增一筆資料到裡面囉。

curl -X POST -H "Content-Type: application/json" -d @./post.json 127.0.0.1:9200/markcorp/employee

執行完的訊息
{"_index":"markcorp","_type":"employee","_id":"Mmbls2IBnSbSo4fQfVml","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":0,"_primary_term":1}%

上面的指令重點有兩個,第一個就是post在 restful api 中就代表這新增的意思,然後第二個重點就是下面這段 uri ,它說明了這筆資料要新增的地點,markcorp 就是 index 而 employee 就是 type 的意思。

127.0.0.1:9200/markcorp/employee

然後我們就可以使用下面的 restful api 來取得該筆資料,其中 employee 後面的那個英文就是文檔 id。

curl 127.0.0.1:9200/markcorp/employee/Mmbls2IBnSbSo4fQfVml?pretty

執行完結果
{
  "_index" : "markcorp",
  "_type" : "employee",
  "_id" : "Mmbls2IBnSbSo4fQfVml",
  "_version" : 1,
  "found" : true,
  "_source" : {
    "name" : "Mark",
    "age" : 18,
    "habit" : "cut someone"
  }
}

更新文檔

更新文檔的方法也是相同的,使用 put 方法,然後在指定要更新誰就可以囉。

127.0.0.1:9200/markcorp/employee/Mmbls2IBnSbSo4fQfVml
curl -X PUT -H "Content-Type: application/json" -d @./update.json 127.0.0.1:9200/markcorp/employee/Mmbls2IBnSbSo4fQfVml?pretty

{
  "_index" : "markcorp",
  "_type" : "employee",
  "_id" : "Mmbls2IBnSbSo4fQfVml",
  "_version" : 3,
  "result" : "updated",
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "failed" : 0
  },
  "_seq_no" : 2,
  "_primary_term" : 1
}

搜尋文檔

假設我們現在有兩筆資料。

{
  name: "Mark",
  age: 18,
  habit: "cut someone"
},
{
  name: "Ian",
  age: 18,
  habit: "hack someone"

}

然後我們現在要搜尋興趣為cut的員工,就執行下面的指令。

curl 127.0.0.1:9200/markcorp/employee/_search?q=habit:'cut'

---
執行結果

{
    "took": 146,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": 1,
        "max_score": 0.2876821,
        "hits": [
            {
                "_index": "markcorp",
                "_type": "employee",
                "_id": "YGYhtGIBnSbSo4fQe2Lh",
                "_score": 0.2876821,
                "_source": {
                    "name": "Mark",
                    "age": 18,
                    "habit": "cut someone"
                }
            }
        ]
    }
}

上面的搜尋是最最基本的搜尋,elasticserach 他提供了非常強大的分析與搜尋工具,將留到下一篇文章中來說明。

 
9 months ago

本篇文章中我們將會學習到以下幾個重點

  1. 什麼是 Prometheus 呢 ?
  2. 要如何監控 node http server 呢 ?
  3. 我想從 Prometheus 監控自訂的資訊,要如何做呢 ?

什麼是 Prometheus 呢 ?

在我們平常開發完系統時,我們常常會有個需求,那就是要如何監控我們的系統呢 ?
以確保它 cpu 往上衝時,我們會知道呢。

當然我們可以很簡單的寫個小程式,定期的去呼叫系統取他的 cpu,這是很淺的東東 ~ 那如果是還要一個 api 的請求次數呢 ? 或是平均的某個 api 的請求次數或圖表呢 ? 這時如果還要自幹一個,那就太麻煩囉,所以這時我們就可以使用Prometheus ~

Prometheus 官網上面寫了下面這段話 :

Power your metrics and alerting with a leading open-source monitoring solution.

這句話就是 Prometheus 存在的目的。

Prometheus 的架構

太細節的不說囉 ~ 這裡大概列出這個架構的三個重點:

  1. Prometheus 是用 pull 去取得目標資訊,下面的 pull metrics 就是這個意思,而這裡你只先去記一點,如果你有個 http server ,然後你要用 Prometheus 去監控 server ,那 Prometheus 就會去 xxxx_host/metrics 取得資訊。
  2. PromQL 是 Prometheus 所提供的查詢語言,利用它可以快速的找到我們想要的資訊 (大概)。
  3. AlertManager 是一個警告系統,你只要配置好 Prometheus 在某個東東到了報警線時,就自動發送警告到 AlertManager 然後它會使用某些方法通知你,例如 email or slack。

安裝 Prometheus

請直接到官網直接下載下來。

https://prometheus.io/download/

接下來在解壓縮

tar xvfz prometheus-*.tar.gz
cd prometheus-*

然後進到解壓縮後的資料夾後,執行以下指令,就可以開啟 Prometheus 。

./prometheus

要如何監控 node http server 呢 ?

再開始前我們先去 prometheus server 的資料夾下修改一下 prometheus.yml 這檔案,基本上我們只要先調整 scrape_configs 裡的 scrape_configs,設定 prometheus server 要去監控的目標,如下我們去監控localhost:3000

# my global config
global:
  scrape_interval:     5s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
  evaluation_interval: 5s # Evaluate rules every 15 seconds. The default is every 1 minute.
  # scrape_timeout is set to the global default (10s).

# Alertmanager configuration
alerting:
  alertmanagers:
  - static_configs:
    - targets:
      # - alertmanager:9093

# Load rules once and periodically evaluate them according to the global 'evaluation_interval'.
rule_files:
  # - "first_rules.yml"
  # - "second_rules.yml"

# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
scrape_configs:
  # The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
  - job_name: 'test'

    static_configs:
      - targets: ['localhost:3000']

然後我們就可以簡單的寫一個 nodejs 的 http server 。

const http = require('http')
const port = 3000

const requestHandler = (request, response) => {
    response.end('Hello Node.js Server!')
}

const server = http.createServer(requestHandler)

server.listen(port, (err) => {
    if (err) {
        return console.log('something bad happened', err)
    }

    console.log(`server is listening on ${port}`)
})

接下來,我們需要建立一個 api endpoint 的/metrics

const http = require('http');
const port = 3000;

const requestHandler = (request, response) => {
  if (request.url === '/metrics') {

  }
  response.end('Hello Node.js Server!');
};

const server = http.createServer(requestHandler);

server.listen(port, (err) => {
  if (err) {
    return console.log('something bad happened', err);
  }

  console.log(`server is listening on ${port}`);
});

然後上面有說過 Prometheus 會去監控的目標抓取資訊,而他抓的地方就是/metrics,然後這時我們裡面就要回傳資訊回去。

這裡我們會使用prom-client套件,這個套件是一個 Prometheus client ,它會幫我們抓取他自訂的資料,並且將資料已 Prometheus 可以接受的格式回傳回去。

npm install prom-client

然後再將 endpoint 修改成如下。

const http = require('http');
const port = 3000;
const client = require('prom-client');
const collectDefaultMetrics = client.collectDefaultMetrics;
collectDefaultMetrics();

const requestHandler = (request, response) => {
  if (request.url === '/metrics') {
    response.end(client.register.metrics());
  }
  response.end('Hello Node.js Server!');
};

const server = http.createServer(requestHandler);

server.listen(port, (err) => {
  if (err) {
    return console.log('something bad happened', err);
  }

  console.log(`server is listening on ${port}`);
});

那要如何驗證有沒有資料呢 ? 你只要去 chrome 然後打http://localhost:3000/metrics然後你看到下面的資訊,就代表你有在產生資料囉,下面這些是prom-client自已會去抓 process 的一些相關資訊,如果要自訂的資訊請看下一章結 ~

# HELP process_cpu_user_seconds_total Total user CPU time spent in seconds.
# TYPE process_cpu_user_seconds_total counter
process_cpu_user_seconds_total 0.015771 1520243222641

# HELP process_cpu_system_seconds_total Total system CPU time spent in seconds.
# TYPE process_cpu_system_seconds_total counter
process_cpu_system_seconds_total 0.000973 1520243222641

# HELP process_cpu_seconds_total Total user and system CPU time spent in seconds.
# TYPE process_cpu_seconds_total counter
process_cpu_seconds_total 0.016744000000000002 1520243222641

# HELP process_start_time_seconds Start time of the process since unix epoch in seconds.
# TYPE process_start_time_seconds gauge
process_start_time_seconds 1520243212

# HELP process_resident_memory_bytes Resident memory size in bytes.
# TYPE process_resident_memory_bytes gauge
process_resident_memory_bytes 26382336 1520243222641

# HELP nodejs_eventloop_lag_seconds Lag of event loop in seconds.
# TYPE nodejs_eventloop_lag_seconds gauge
nodejs_eventloop_lag_seconds 0.00048715 1520243222642

# HELP nodejs_active_handles_total Number of active handles.
# TYPE nodejs_active_handles_total gauge
nodejs_active_handles_total 3 1520243222641

# HELP nodejs_active_requests_total Number of active requests.
# TYPE nodejs_active_requests_total gauge
nodejs_active_requests_total 0 1520243222641

最後我們先開啟 Prometheus ,然後連線到http://localhost:9090/,最後到 status 裡面的 target 裡面,你如果看到你的 endpoint 的 state 是up就代表 Prometheus 有成功的去那抓資料囉。

我想從 Prometheus 監控自訂的資訊,要如何做呢 ?

定義好符合 Prometheus 的時序資料格式 metric

假設我們想要自訂個內容,然後給 Prometheus server 抓取,第一步我們要先定義好『資料模式』,你把他想成 server 與 client 的協定就好囉,然後他長的如下:

<metric name>{<label name>=<label value>, ...}

注意 ! metric name 只能用 _ 來分割名詞 Ex. chat_room_count

假設我們要定義一個『ID 1 的聊天室用戶人數』那他的定義應該會長下面這樣:

chatRoomCount{ chat_id=“1”} 100(人數)

決定時序資料的類型

在 Prometheus 中有提供四種時序資料的類型

Counter

這種類型用於『累積值』,例如 Prometheus 內建提供的 http 請求數或錯誤量,它的類型就是 Counter 。

http_response_total{method="GET",endpoint="/api/peoples"} 10
http_response_total{method="GET",endpoint="/api/peoples"} 20
http_response_total{method="GET",endpoint="/api/peoples"} 30
Gauge

這種類型用於『常規值』,例如 cpu 使用率或記憶體使用率就是此類型。

memory_usage_bytes{host=“server-01"} 50
memory_usage_bytes{host=“server-01"} 100
memory_usage_bytes{host=“server-01"} 80
Histogram

主要用於一段時間範圍內對資料的採集,並且可針對內容進行分組。

{小於100毫秒=5次,小於500毫秒=1次,小於100毫秒=2次}
Summary

與 Histogram 相同且支持百分比與跟蹤的結果。

比較詳細的類型說明請參考下篇文章,寫的很詳細的。
傳送門

實作『用戶使用』人數的自訂內容

假設我們希望 Prometheus 可以去指定的聊天室 Server,抓取使用人數的資訊,那我們要如何實作呢 ?

根據上面的教學我們要先定義好資料格式與資料類型。

//資料格式 => 代表聊天室`1`的使用人數當下有多少人.
chatRoomCount{ chat_id=“1”} 100(人數)
//資料類型 => 會選擇 Gauge 而不選 Counter 是因為聊天室的人數是會上下變動,而不是只增加或減少。
Gauge

接下來我們就來實作一下,首先做出自訂資料的格式定義與類型。

// 定義自訂 metric 的格式與類型
// 格式: chatRoomCount{ chat_id=“1”} 100(人數)
// 類型: Guage
const guage = new client.Gauge({
  name: 'chatRoomCount',
  help: 'The metric provide the count of chatroom`s people',
  labelNames: ['chat_id']
});

然後下面就是所有的程式碼,主要的重點就是定義格式,然後讓 Prometheus 從/metrics這個 api 取得資料前,先將 count 資訊更新到 metric 裡面。

const http = require('http');
const port = 3000;
const client = require('prom-client');
let count = 0;
const guage = new client.Gauge({
  name: 'chatRoomCount',
  help: 'The metric provide the count of chatroom`s people',
  labelNames: ['chat_id']
});

const requestHandler = (request, response) => {
  if (request.url === '/metrics') {
    // 更新 metric
    guage.set({
      chat_id: '1'
    }, count);
    response.end(client.register.metrics());
  }
  if (request.url === '/add') {
    count++;
    response.end(`now ~ count:${count}`);
  }
  if (request.url === '/leave') {
    count--;
    response.end(`now ~ count:${count}`);
  }

  response.end('Hello Node.js Server!');
};

const server = http.createServer(requestHandler);

server.listen(port, (err) => {
  if (err) {
    return console.log('something bad happened', err);
  }
  console.log(`server is listening on ${port}`);
});

那我們要如何確認有產生資料呢 ? 你先加幾個用戶幾去,然後再去打/metrics就可以看到結果囉。

http://localhost:3000/add // 加一人
http://localhost:3000/add // 加一人

http://localhost:3000/metrics

結果如下。

# HELP chatRoomCount The metric provide the count of chatroom`s people
# TYPE chatRoomCount gauge
chatRoomCount{chat_id="1"} 2

最後你在去http://localhost:9090你就可以看到那個 tab 中會多增加了chatRoomCount的標籤,然後點進去選 graph 你就可以看到你的圖表了。

參考資料

 
10 months ago

本文中我們將會知道兩件事件

為什麼要使用命令模式呢 ? 
什麼是命令模式呢?

為什麼要使用命令模式呢 ?

我們先來想想,假設我們要做一個簡單的計算機的功能,然後他有提供以下方法:

然後實際上執行大概會長這樣 :

add(5) => current = 5
sub(3) => current = 2
mul(3) => current = 6
div(3) => current = 2

這樣我們大概會寫個最簡單的程式碼,大概會長成下面這樣:

class Calculator {
    constructor(){
        this.current = 0;
    }
    add(value){
        this.current += value;
    }
    sub(value){
        this.current -= value;
    }
    mul(value){
        this.current *= value;
    }
    div(value){
        this.current /= value;
    }
    getCurrent(){
        return this.current;
    }
}

const client = new Calculator();
client.add(5);
client.sub(3);
client.mul(3);
client.div(3);

console.log(client.getCurrent());

這有啥問題呢 ?

如果我們這時要增加一個undo的功能呢 ? 上面的程式碼的結構就無法做這種功能了,因為它的緊偶合了。

緊耦合白話文就是你們(模組和類別)關係太好囉 ~ 要修理 A 的話 B 也要先打一頓才行。

而我們上面範例關係太好的兩位可以定義為『行為請求者』與『行為實現者』,行為請求就是指我們外面指接client.add(5),而行為實現者則為add方法裡面的實作。

也因為上面這種狀況,所以我們無法做undo功能,如果我們想要奇耙一點在這案例做排程或是記錄請求日誌的話,也都很難實現。

所以解法就是 :

將『行為請求者』與『行為實現者』的解耦合,也就是所謂的『命令模式』。

什麼是命令模式呢?

就是將『行為請求者』與『行為實現者』分開模式。

下圖中,我們會在請求者與實現者的中間增加一個東西,叫作呼叫者,你也可以稱為Invoker

這張圖的概念你可以簡單的想成,你去一間餐館食飯,然後你就是『請求者』,負責接受點菜的服務生就是『呼叫者』,而最後實際做飯的就是『實現者』。

那為什麼這叫命令模式呢 ? 因為我們會將所有的請求,都封成一個『命令 command 』物件,接下來的服務生,會將這此命令寫在紙上,然後再由他來決定什麼時後要丟給廚師,而客戶如果要取消命令時,也都會由服務生這裡來經手。

程式碼實作

接下來我們要將上面的程式碼來進行修改,首先我們會多增加上面那張圖中的Invoker類別,記好他就是服務生,用來叫廚師做飯的。

這段程式碼中,execute就是用來實際叫廚師做飯的方法,而undo就是用來執行取消這命令的方法。

class Invoker {
    constructor() {
        this.commands = [];
        this.current = 0;
    }

    execute(command) {
        this.commands.push(command);
        this.current = command.execute(this.current);

        console.log(`Execute command : ${command.name} , and result : ${this.current}`);
    }
    undo() {
        const command = this.commands.pop();
        this.current = command.undo(this.current);

        console.log(`Execute undo and result : ${this.current}`);
    }
    getCurrent() {
        return this.current;
    }
}

然後下面是我們每一個命令,這裡每一個都是廚師,然後裡面都有定義好這個命令實際要做的事情與取消時要做的事情,由於我是用 JS 這種語言來撰寫範例,所以沒有個抽象類別或 介面,不然每一個命令應該都會繼承一個叫 Command 的類別或介面。

class AddCommand {
    constructor(value) {
        this.value = value;
        this.name = "Add";
    }

    execute(current) {
        return current + this.value;
    }

    undo(current) {
        return current - this.value;
    }
}

class SubCommand {
    constructor(value) {
        this.value = value;
        this.name = "Sub";
    }

    execute(current) {
        return current - this.value;
    }

    undo(current) {
        return current + this.value;
    }
}

class MulCommand {
    constructor(value) {
        this.value = value;
        this.name = "Mul";
    }

    execute(current) {
        return current * this.value;
    }

    undo(current) {
        return current / this.value;
    }
}
class DivCommand {
    constructor(value) {
        this.value = value;
        this.name = "Div";
    }

    execute(current) {
        return current / this.value;
    }

    undo(current) {
        return current * this.value;
    }
}

最後執行時,我們會麻煩 invoker (服務生),叫實際執行者 (廚師) 進行工作,並且服務生那裡都有記住我們要點的菜,如果臨時想取消,就很簡單囉。

const invoker = new Invoker();
invoker.execute(new AddCommand(5)); // current => 5

invoker.execute(new SubCommand(3)); // current => 2

invoker.execute(new MulCommand(3)); // current => 6

invoker.execute(new DivCommand(3)); // current => 2


invoker.undo(); // current => 6


console.log(invoker.getCurrent());

結論

今天我們學習了『命令模式』,它主要的功能與目的如下 :

就是將『行為請求者』與『行為實現者』分開的模式,為了更彈性操作命令。

你只要記得餐廳的概念就可以理解命令模式的實作了。

再來我們談談它的優缺點。

它最主要的優點是,可讓我們將『行為要求者』與『行為執行者』分開,使得我們可以做更多的運用,例如取消、寫日誌、交易事務 (就是要麻所有命令都執行要麻不要執行)。

但缺點呢 ?
不能否認程式碼的複雜度增加與變長,這也代表,不是所有類似這種命令的功能都需要用到這種模式,在設計一個系統時最怕『過度設計』,所以如果你們確定你的系統是需要『對命令進行特殊的動作時(ex: undo)』時,才需要使用到這種模式。

像我剛剛的計算機範例,如果不需要undo,那用最一開始的範例就夠了 ~~

參考資料