備忘録、はじめました。

作業したこと忘れないようにメモっておきます。

Erlangのメッセージングでバッファを使う

Erlang Angerの3章、3.3.2あたりで過負荷についての対処法として、キューのバッファを作るのが効果的という話が載っていたので、実際にどのくらい差が出るのか簡単に計測してみた話です。

問題設定:

buffer,erlang,message passing

この二つの設定に対して、1,000〜500,000メッセージをbenchサーバーから並列で送信します。

PCの環境:

バッファなし

echoサーバー

handle_castコールバックに処理を追加しました。 受け取るリクエストの中身の送り主FromSenderMsgを返信します。

handle_cast({echo, Msg, FromSender}, State) ->
    FromSender ! Msg,
    {noreply, State}.

benchサーバー

以下、benchサーバーです。

start_bench(Num) ->
    start_bench(Num, node()).

start_bench(Num, Node) ->

    Start = erlang:system_time(millisecond),

    FinPid = self(),
    Pid = spawn_link(fun() ->
        loop(Num, #state{fin_pid=FinPid,start=Start})
    end),

    register(bench_server, Pid),

    send_echo(Num, Node),

    receive
        finish -> ok
    end.

loop(Num, #state{start=Start,count=Cnt,timeout_cnt=TCnt,fin_pid=Fin}) when Num =:= Cnt + TCnt ->
    print_result(Start, erlang:system_time(millisecond), Cnt, TCnt),
    Fin ! finish;
loop(Num, State = #state{count=Cnt, timeout_cnt=TCnt}) ->
    receive
        _Msg ->
            loop(Num, State#state{count=Cnt+1})
    after 3000 ->
        loop(Num, State#state{timeout_cnt=TCnt+1})
    end.

send_echo(0, _, _) ->
    ok;
send_echo(N, ToNode) ->
    spawn(gen_server, cast, [{echo_server, ToNode}, {echo, <<"echo message">>, {bench_server, node()}}]),
    send_echo(N-1, ToNode).

print_result(Start, End, Num, TNum) ->
    Diff = End-Start,
    io:format("~p\t~p\t~p\t~f~n",[TNum, Num, Diff, Num*1000/Diff]).

start_bench(Num, Node)Node宛にNum数のメッセージを並列して送信します。 loop関数がbench_serverの実態で、Num数の返事がechoサーバーから返ってきたら終了します。 また、3秒立っても何もレスポンスが来ない場合はtimeoutカウントをインクリメントさせてます。 (この3秒の数値に大した意味はないです。3秒待っても返事がない = 十分遅い かなという程度です。)

print_result関数では計測結果をprintします。 左から順に、タイムアウト数、送信完了数、実行時間、リクエスト/秒です。

benchスクリプト

#!/usr/bin/env escript
%% -*- erlang -*-
%%! -smp enable -name bench@127.0.0.1 -setcookie queuebuf +K true +P 1000000
main([NodeStr]) ->
    chdir(),
    load(),

    Node = list_to_atom(NodeStr),

    io:format("start bench at ~p~n~n", [node()]),
    io:format("timeout\tsent\ttime(msec)\trate(req/sec)~n"),

    lists:foreach(fun(N) ->
        bench_server:start_bench(N, Node)
    end, [1000,10000,50000,100000,200000,300000,500000]).

%% 実行の都度コンパイルする
load() ->
    case filelib:ensure_dir("./ebin") of
        ok -> ok;
        _ -> file:make_dir("./ebin")
    end,
    compile:file("./bench_server.erl", [{outdir, "./ebin"}]),
    code:add_patha("./ebin").

%% 一応どこからでも実行できるようディレクトリを移動する
chdir() ->
    {ok, Cwd} = file:get_cwd(),
    D = iolist_to_binary([Cwd, "/", ?FILE]),
    c:cd(filename:dirname(D)).

実行する際に引数にechoサーバーのnodeを指定します。

計測

%% echoサーバーが bacon@127.0.0.1 だった場合
$ ./bench/bench.erl bacon@127.0.0.1
/home/tkyshm/erlang/src/bench/queuebuf/bench
start bench at 'bench@127.0.0.1'

timeout sent    time(msec)  rate(req/sec)
0   1000    41  24390.243902
0   10000   48  208333.333333
0   50000   281 177935.943060
0   100000  456 219298.245614
0   200000  1070    186915.887850
0   300000  7956    37707.390649
0   500000  14602   34241.884673

上記、計測結果です。

300,000メッセージから急激なパフォーマンス低下を見せています (今回は結果だけ載せてます。原因については別の機会にでも)。

次に、bufferを組み込んだパターンを試していきます。

バッファあり

バッファありのケースでは、次のようなコンセプトで実装していきます。

  • buffにある程度蓄積してからまとめて送信する
    • echoサーバー、benchサーバーはバッファプロセスにメッセージを送信
    • バッファプロセスがechoサーバー、benchサーバーにメッセージを送信
    • バッファプロセスはバッファサイズが閾値を超えたときに送信
    • バッファプロセスは一定周期でバッファをフラッシュ(送信)

ここのバッファプロセスは外のノードと通信するためのルーターのような役割を持ちます。

バッファプロセス buff_router

-module(buff_router).

-behaviour(gen_server).

... 省略 ....

% non_neg_integer()はerlang:system_time(second)が入るけど使ってなかった
-type message() :: [{node(), {non_neg_integer(), [term()]}}].

-record(state, {
    msg_buff = [] :: [message()]
}).

start_link() ->
    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

%% MsgsがListの場合 = 複数のMsgと決めつけてる. 結構無理やり.
route(ToNode, Pname, Msgs) when is_list(Msgs) ->
    lists:foreach(fun(Msg) -> route(ToNode, Pname, Msg) end, Msgs);
route(ToNode, Pname, Msg) ->
    gen_server:cast(?SERVER, {enqueue, ToNode, Pname, Msg}).

init([]) ->
    {ok, #state{}, ?TIMEOUT}.

handle_call(_Request, _From, State) ->
    Reply = ok,
    {reply, Reply, State}.

%% sendはbuff_router => Pnameの名前付きプロセスにMsgsを送信する
%% 受け取りての想定はechoサーバーなのでechoサーバのhandle_infoで処理する
handle_cast({send, Msgs}, State) ->
    lists:foreach(fun({Pname, Msg}) ->
        Pname ! Msg
    end, Msgs),
    {noreply, State, ?TIMEOUT};
%% echoサーバー or benchサーバー => buff_router の流れ
handle_cast({enqueue, ToNode, Pname, Msg}, State = #state{msg_buff=Buff}) when is_atom(ToNode) ->
    NewBuff = buffer_message(ToNode, {Pname, Msg}, Buff),
    {noreply, State#state{msg_buff=NewBuff}, ?TIMEOUT}.

%% タイムアウトしたときメッセージflushする
handle_info(timeout, State = #state{msg_buff=Buff}) ->
    flush_buffer(Buff),
    {noreply, State#state{msg_buff=[]}, ?TIMEOUT}.

%% msg_buffにメッセージを詰める(もっとキレイにかけそう...)
buffer_message(ToNode, Msg, Buff) ->
    case lists:keysearch(ToNode, 1, Buff) of
        {value, {ToNode, {_, Msgs}}} ->
            case length(Msgs) >= ?BUFF_MAXSIZE of
                false ->
                    lists:keyreplace(ToNode, 1, Buff, {ToNode, {erlang:system_time(second), [Msg|Msgs]}});
                true ->
                    % メッセージをToNodeへ送信
                    gen_server:cast({?SERVER, ToNode}, {send, [Msg|Msgs]}),
                    lists:keydelete(ToNode, 1, Buff)
            end;
        false ->
            [{ToNode, {erlang:system_time(second), [Msg]}}|Buff]
    end.

flush_buffer(Buff) -> lists:foreach(fun flush_message/1, Buff).

flush_message({ToNode, {_Time, Msgs}}) ->
    % メッセージをToNodeへ送信
    gen_server:cast({?SERVER, ToNode}, {send, Msgs}).

route関数はechoサーバーまたはbenchサーバーが利用するAPIです。 APIは宛先ノード名(ToNode)、プロセス名(Pname)、メッセージ(Msg)の3つを引数に取ります。 ToNode(ノード名)をキーにしてmsg_buffに追加していきます。buffのサイズが閾値?BUFF_MAXSIZEを超えていた場合はメッセージを宛先ノードへ送信し、msg_buffから削除します。 また、閾値を超えないとbuffに残り続けるので、とりあえず何も考えずに?TIMEOUTで定期的にflushして送信させてます。

外側のノードへメッセージを送信する場合は gen_server:cast({?SERVER, Node}, Msgs) を呼んでいます。 sendリクエストの処理では同ノード内のPnameプロセス宛にメッセージを送信します。

echoサーバーの修正

handle_cast({echo, FromNode, SenderName, Msg}, State) ->
    buff_router:route(FromNode, SenderName, Msg),
    {noreply, State}.

buff_routerを使用するように修正し、FromNodeをメッセージの中に含めています。 buff_routerがどのノードに送信すればいいかを判断させるために追加されています。

benchサーバーの修正

send_echo(0, _) ->
    ok;
send_echo(N, ToNode) ->
    spawn(buff_router, route, [ToNode, echo_server, {echo, node(), bench_server, <<"echo message">>}]),
    send_echo(N-1, ToNode).

こちらも、buff_routerを使うように修正しています。

benchスクリプトの修正

buff_router:start_link(), 

上記をbench_server:start_bench/2の前に呼びます。

計測

$ ./bench/bench.erl bacon@127.0.0.1
/home/tkyshm/erlang/src/bench/queuebuf/bench
start bench at 'bench@127.0.0.1'

timeout sent    time(msec)  rate(req/sec)
0   1000    213 4694.835681
0   10000   239 41841.004184
0   50000   354 141242.937853
0   100000  430 232558.139535
0   200000  644 310559.006211
0   300000  902 332594.235033
0   500000  1370    364963.503650

先程の300,000と500,000を比較すると大分改善されていそうです。

timeout  sent    time(msec)  rate(req/sec)
------------------------------------------------
# バッファなし
0   300000  7956    37707.390649
0   500000  14602   34241.884673
------------------------------------------------
# バッファあり
0   300000  902 332594.235033
0   500000  1370    364963.503650

多分改善されました。

これより先の細かい調査とかはしてないので、今回はこの結果でお終いです。

※ 記録としてgithubにもあげておきました。

erlang-bench/queuebuf at master · tkyshm/erlang-bench · GitHub

感想

Erlangのノード間のメッセージパッシングはTCP通信になるため、単純に通信回数を減らすことで速度は上がるのかなと漠然と思ってました。 ただ、挙動としては普通にメールボックスに溜まりすぎたのが原因になっていそうな雰囲気だったけれど、まだそこまでの詳細は確認してないです (http://erlang.org/doc/man/erlang.html#process_flag-2のmessage_queue_dataで書かれているGCのコストかもしれない)。 どうやって調べればいいかも含めて、時間があるときにやろうかなと思ってます。

今回みたいなバッファに貯めたメッセージをドカンと送ってしまう、みたいなやり方で実装してみたものの、バッファサイズやタイムアウト(そもそもこのタイムアウトがイケてなさそう)などの設定値含めて検証が必要とも感じました。思っただけで、やってません。

また、このベンチサーバーたまに失敗します。原因は分かってますが直してません。

参考

Stuff Goes Bad: Erlang in Anger

GitHub - ferd/pobox: External buffer processes to protect against mailbox overflow in Erlang

When a selective receive is taking too long