備忘録、はじめました。

作業したこと忘れないようにメモっておきます。

erlangでfluentdクライアントのOTPアプリケーションを実装した

はじめに

アプリケーションにログ、データを集めたりするのにしばしば利用される Fluentd というソフトウェアのクライアントをErlang/OTPで実装した話です。

本家のerlangのfluentdのクライアント の実装を眺めていて、 gen_eventで実装されていたため、過負荷や障害時などを考慮したい場合はひと工夫を入れる必要がありそうと思い、 今回本家と異なるビヘイビアで実装してみました。

何故gen_event使わなかったのか?

結論から言うと、別システムにデータを送信するようなケースでのgen_eventの利用は避けたかったからです。 何故避けたのかという説明をするために、簡単にgen_eventの話をします。

gen_eventは一つのイベントマネージャーと0個以上のイベントハンドラーで構成され、 マネージャーにメッセージを送信する(notify, sync_notify関数)と、 マネージャに登録したハンドラーの処理が走るという感じのビヘイビアーです。

f:id:tkyshm:20171216231642p:plain

gen_eventの便利なところは、流したいイベントは同じだけれど別目的でそれぞれ処理をしたいというのが簡単に実装できることが挙げられます。 例えば、ログのコンソール出力、ファイル書き込み、エラーのアラート発報を一つのgen_eventでまとめることができます。

ただ、gen_eventを扱う場合に気をつけなければいけない点があります。 まず、これらの複数ハンドラーは並列実行されない点です。そのため、一つのハンドラーの処理に時間がかかるとハンドラー全体に影響が出ます。 次に、1プロセスにメッセージが集中するので過負荷に弱いです。 事前に設計するイベントマネージャの流量が多いと分かっている場合、 そのマネージャーに処理が大幅に遅れるようなハンドラーを一つだけでも追加するというのは避けたほうが良いでしょう。

fluentdとのやり取りをそのままハンドラーに書き込むと、 レイテンシが大きくなったり、fluentdダウン時などの影響でシステム全体が遅くなる可能性があります。

なので、今回その点を注力したfluentdのクライアントアプリケーションを別ビヘイビアで実装することにしました。

実装

選択したbehaviorはgen_statemです。 gen_serverでも良かったのですが、以下の理由でgen_statemにしました。

  • gen_statemの練習
  • 状態を関数名として定義できるので、状態変数が少し減る
  • 再接続の処理が書きやすい

以下、概要図です。

f:id:tkyshm:20171217010638p:plain

efluentc_supがsimple_one_for_oneでefluentcのクライアントプロセスを固定数起動します(デフォルトは2)。 以下、efluentc_supのstart_linkです。

start_link() ->
    Num = application:get_env(efluentc, worker_size, ?DEFAULT_WORKER_SIZE),
    Ret = supervisor:start_link({local, ?SERVER}, ?MODULE, []),
    [supervisor:start_child(?SERVER, [Id]) || Id <- lists:seq(0, Num-1)],
    Ret.

gen_statemビヘイビアで実装しているefluentc_clientの状態遷移図は以下の感じになっています。

f:id:tkyshm:20171217012517p:plain

初期化関数(init/1)と各状態について、コードを交えつつ解説していきます。

init([Id]) ->
    Host = application:get_env(efluentc, host, localhost),
    Port = application:get_env(efluentc, port, 24224),
    BuffSize = application:get_env(efluentc, flush_buffer_size, ?DEFAULT_FLUSH_BUFFER_SIZE),
    Timeout = application:get_env(efluentc, flush_timeout, ?DEFAULT_FLUSH_TIMEOUT),
    State = #state{id = Id,
                   host = Host,
                   port = Port,
                   flush_buffer_size = BuffSize,
                   flush_timeout = Timeout},
    process_flag(trap_exit, true),
    {ok, closed, State, [{next_event, cast, connect}]}.

closed(cast, connect, State) ->
    case try_connect(State) of
        {ok, NewState, empty} ->
            {next_state, empty, NewState};
        {ok, NewState, buffered} ->
            {next_state, bufferd, NewState, flush_action()};
        {error, Reason} ->
            ?ERR_LOG("connect failed: ~p", [Reason]),
            {next_state, closed, State, reconnect_action(0)}
    end;

init/1からclosedの状態遷移するとき、init内では接続まではせずにfluentdの接続設定を状態変数(State)に持たせてclosedへ遷移します。 これは、init時に接続までしてしまうと、fluentdが死んでしまったときにプロセスの起動が出来ずに最終的にシステム全体がダウンしてしまうみたいなことを避けるためです。

初期状態にすぐにイベントが発火されるようnext_eventのアクションをオプションに持たせて返しています。 closedの状態でconnectのイベントが来たとき、try_connect関数でfluentdとの接続を試み、 成功した場合はempty状態かbuffered状態に遷移します。 emptyかbufferedかは状態変数のbufferが空かどうかで判断されます。 失敗した場合は、再接続のアクションに飛ばされます。

% @private
reconnect_action(N) ->
    [{{timeout, connect}, backoff(N), {reconnect, N}}].

reconnectアクションはtimeoutのアクションを使っています。 backoff(N) ミリ秒後にタイムアウトが発火されて、close({timeout, connect}, Req, State) へcallbackされることを期待しています。

closed({timeout, connect}, {reconnect, N}, State) ->
    case try_connect(State) of
        {ok, NewState, empty} ->
            {next_state, empty, NewState};
        {ok, NewState, buffered} ->
            {next_state, buffered, NewState, flush_action()};
        {error, Reason} ->
            ?ERR_LOG("connect failed: ~p", [Reason]),
            {next_state, closed, State, reconnect_action(N+1)}
    end;

callback先で再度接続を試みて、成功したらemptyかbuffered、失敗したらreconnect_actionでタイムアウトを待ちます。 この一連の接続処理の流れは接続が成功するまで永遠と繰り返されます。

また、closedの状態でもpostされるパターンがあるため、そのケアもします。 一定バイト数は状態変数のバッファーにもたせて、接続が開始されたと同時に順次送信するようにしてます。

closed(cast,
       {post, Tag, Data},
       #state{buffer = Buffs, buffer_caches = Caches, flush_buffer_size = FBS} = State)
  when FBS < byte_size(Buffs) ->
    Msg = encode_msg(Tag, Data),
    NewCaches = append_buffer_caches(Buffs, Caches),
    {next_state, closed, State#state{buffer = Msg, buffer_caches = NewCaches}};
closed(cast, {post, Tag, Data}, #state{buffer = Buffs} = State) ->
    Msg = encode_msg(Tag, Data),
    NewBuffs = <<Buffs/binary, Msg/binary>>,
    {next_state, closed, State#state{buffer = NewBuffs}}.

閾値は2つあります。 一度に送信するメッセージのバイト数(flush_buffer_size)と、その塊の数(buffer_cachesの長さ)です。 メッセージのサイズがflush_buffer_sizeを超えた場合、buffer_cachesのリストに追加され、 そのときにbuffer_cachesの長さが閾値を超えていたら、そのメッセージはロストします。

次に、emptyとbuffered状態のときの処理です。

empty(cast, {post, Tag, Data}, State = #state{buffer = Buffs, flush_timeout = Timeout}) ->
    Msg = encode_msg(Tag, Data),
    NewBuffs = <<Buffs/binary, Msg/binary>>,
    {next_state, buffered, State#state{buffer = NewBuffs}, flush_timeout_action(Timeout)};

empty状態はbufferにデータがからのときの状態のため、postされてbufferにデータを詰められたらbuffered状態に移行します。 bufferに詰められたデータはflush_buffered_sizeを超えたときか、flush_timeout後にfluentdへ送信されます。 fluentdへの送信は、buffered状態からのみ実行されます。

buffered(_Event, flush, State) ->
    case buffered_send(State) of
        {{error, timeout}, NextState, NewState} ->
            ?ERR_LOG("send timeout", []),
            {next_state, NextState, NewState};
        {{error, Reason}, NextState, NewState} ->
            ?ERR_LOG("send failed: ~p", [Reason]),
            {next_state, NextState, NewState, reconnect_action(0)};
        {ok, NextState, NewState} ->
            {next_state, NextState, NewState}
    end;
buffered(cast, {post, Tag, Data}, State = #state{flush_timeout = Timeout}) ->
    Msg = encode_msg(Tag, Data),
    case append_buffer(Msg, State) of
        {buffer_overflow, NewState} ->
            {next_state, buffered, NewState, flush_action()};
        {ok, NewState} ->
            {next_state, buffered, NewState, flush_timeout_action(Timeout)}
    end;

% @private
flush_action() ->
    [{next_event, cast, flush}].

% @private
flush_timeout_action(Timeout) ->
    [{state_timeout, Timeout, flush}].

flushのリクエストが来たときはfluentdへデータを送信、postリクエストが来た場合はbufferへデータを詰めていきます。 flush_buffer_sizeを超えた場合、あるいはflush_timeout後にデータをflushします。

以下、githubリポジトリです。 GitHub - tkyshm/efluentc: efluentc is the Client OTP application for Fluentd

使い方

使い方は、efluentc:post(Tag, Message) とするだけです。

> efluentc:post(<<"test.tag">>, <<"message">>).
ok

> efluentc:post('test.tag', <<"message">>).
ok

> efluentc:post('test.tag', "message").
ok

> efluentc:post('test.tag', #{<<"key">> => <<"value">>}).
ok

また、client数やflush_buffer_size、flush_timeoutなどの閾値を変更する場合は、次のようにconfigを書きます。

[
  {efluentc, [{worker_size, 10},      % 10 workers
              {fluesh_timeout, 500},  % 500msec
              {buffer_size, 1048576}, % 1MiB
              {host, localhost},
              {port, 24224}]}
]

感想

gen_statemビヘイビアの練習として、 外部と通信するようなインターフェースを作るは結構良い題材だったんじゃないかなと思いました。

今回作ったefluentcのアイディアが良いかどうかは置いておいて、 gen_statemは汎用性高くて色々と応用が効きそうで良い感じだなと思いました。

キューを入れて、送信できない場合の一時退避する領域を作ったけど、 思い返すとただ複雑にしただけかも知れないという思いもあったり。 送信できない場合にアラートを上げられるようにして、問題をすぐに発見できるような作りのほうが良かったかも知れないです。

あと、gen_eventに組み込みたくなったとしても、 各々handlerを実装してもらって、その中でefluentc:postをラップしてしまえば良さそうだと思う...。

参考URL

What is wrong with gen_event?

Erlang -- gen_event Behaviour