Erlangのメッセージングでバッファを使う
Erlang Angerの3章、3.3.2あたりで過負荷についての対処法として、キューのバッファを作るのが効果的という話が載っていたので、実際にどのくらい差が出るのか簡単に計測してみた話です。
問題設定:
この二つの設定に対して、1,000〜500,000メッセージをbenchサーバーから並列で送信します。
PCの環境:
バッファなし
echoサーバー
handle_castコールバックに処理を追加しました。
受け取るリクエストの中身の送り主FromSender
にMsg
を返信します。
handle_cast({echo, Msg, FromSender}, State) -> FromSender ! Msg, {noreply, State}.
benchサーバー
以下、benchサーバーです。
start_bench(Num) -> start_bench(Num, node()). start_bench(Num, Node) -> Start = erlang:system_time(millisecond), FinPid = self(), Pid = spawn_link(fun() -> loop(Num, #state{fin_pid=FinPid,start=Start}) end), register(bench_server, Pid), send_echo(Num, Node), receive finish -> ok end. loop(Num, #state{start=Start,count=Cnt,timeout_cnt=TCnt,fin_pid=Fin}) when Num =:= Cnt + TCnt -> print_result(Start, erlang:system_time(millisecond), Cnt, TCnt), Fin ! finish; loop(Num, State = #state{count=Cnt, timeout_cnt=TCnt}) -> receive _Msg -> loop(Num, State#state{count=Cnt+1}) after 3000 -> loop(Num, State#state{timeout_cnt=TCnt+1}) end. send_echo(0, _, _) -> ok; send_echo(N, ToNode) -> spawn(gen_server, cast, [{echo_server, ToNode}, {echo, <<"echo message">>, {bench_server, node()}}]), send_echo(N-1, ToNode). print_result(Start, End, Num, TNum) -> Diff = End-Start, io:format("~p\t~p\t~p\t~f~n",[TNum, Num, Diff, Num*1000/Diff]).
start_bench(Num, Node)
でNode
宛にNum
数のメッセージを並列して送信します。
loop関数がbench_serverの実態で、Num
数の返事がechoサーバーから返ってきたら終了します。
また、3秒立っても何もレスポンスが来ない場合はtimeoutカウントをインクリメントさせてます。
(この3秒の数値に大した意味はないです。3秒待っても返事がない = 十分遅い かなという程度です。)
print_result関数では計測結果をprintします。 左から順に、タイムアウト数、送信完了数、実行時間、リクエスト/秒です。
benchスクリプト
#!/usr/bin/env escript %% -*- erlang -*- %%! -smp enable -name bench@127.0.0.1 -setcookie queuebuf +K true +P 1000000 main([NodeStr]) -> chdir(), load(), Node = list_to_atom(NodeStr), io:format("start bench at ~p~n~n", [node()]), io:format("timeout\tsent\ttime(msec)\trate(req/sec)~n"), lists:foreach(fun(N) -> bench_server:start_bench(N, Node) end, [1000,10000,50000,100000,200000,300000,500000]). %% 実行の都度コンパイルする load() -> case filelib:ensure_dir("./ebin") of ok -> ok; _ -> file:make_dir("./ebin") end, compile:file("./bench_server.erl", [{outdir, "./ebin"}]), code:add_patha("./ebin"). %% 一応どこからでも実行できるようディレクトリを移動する chdir() -> {ok, Cwd} = file:get_cwd(), D = iolist_to_binary([Cwd, "/", ?FILE]), c:cd(filename:dirname(D)).
実行する際に引数にechoサーバーのnodeを指定します。
計測
%% echoサーバーが bacon@127.0.0.1 だった場合 $ ./bench/bench.erl bacon@127.0.0.1 /home/tkyshm/erlang/src/bench/queuebuf/bench start bench at 'bench@127.0.0.1' timeout sent time(msec) rate(req/sec) 0 1000 41 24390.243902 0 10000 48 208333.333333 0 50000 281 177935.943060 0 100000 456 219298.245614 0 200000 1070 186915.887850 0 300000 7956 37707.390649 0 500000 14602 34241.884673
上記、計測結果です。
300,000メッセージから急激なパフォーマンス低下を見せています (今回は結果だけ載せてます。原因については別の機会にでも)。
次に、bufferを組み込んだパターンを試していきます。
バッファあり
バッファありのケースでは、次のようなコンセプトで実装していきます。
- buffにある程度蓄積してからまとめて送信する
- echoサーバー、benchサーバーはバッファプロセスにメッセージを送信
- バッファプロセスがechoサーバー、benchサーバーにメッセージを送信
- バッファプロセスはバッファサイズが閾値を超えたときに送信
- バッファプロセスは一定周期でバッファをフラッシュ(送信)
ここのバッファプロセスは外のノードと通信するためのルーターのような役割を持ちます。
バッファプロセス buff_router
-module(buff_router). -behaviour(gen_server). ... 省略 .... % non_neg_integer()はerlang:system_time(second)が入るけど使ってなかった -type message() :: [{node(), {non_neg_integer(), [term()]}}]. -record(state, { msg_buff = [] :: [message()] }). start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). %% MsgsがListの場合 = 複数のMsgと決めつけてる. 結構無理やり. route(ToNode, Pname, Msgs) when is_list(Msgs) -> lists:foreach(fun(Msg) -> route(ToNode, Pname, Msg) end, Msgs); route(ToNode, Pname, Msg) -> gen_server:cast(?SERVER, {enqueue, ToNode, Pname, Msg}). init([]) -> {ok, #state{}, ?TIMEOUT}. handle_call(_Request, _From, State) -> Reply = ok, {reply, Reply, State}. %% sendはbuff_router => Pnameの名前付きプロセスにMsgsを送信する %% 受け取りての想定はechoサーバーなのでechoサーバのhandle_infoで処理する handle_cast({send, Msgs}, State) -> lists:foreach(fun({Pname, Msg}) -> Pname ! Msg end, Msgs), {noreply, State, ?TIMEOUT}; %% echoサーバー or benchサーバー => buff_router の流れ handle_cast({enqueue, ToNode, Pname, Msg}, State = #state{msg_buff=Buff}) when is_atom(ToNode) -> NewBuff = buffer_message(ToNode, {Pname, Msg}, Buff), {noreply, State#state{msg_buff=NewBuff}, ?TIMEOUT}. %% タイムアウトしたときメッセージflushする handle_info(timeout, State = #state{msg_buff=Buff}) -> flush_buffer(Buff), {noreply, State#state{msg_buff=[]}, ?TIMEOUT}. %% msg_buffにメッセージを詰める(もっとキレイにかけそう...) buffer_message(ToNode, Msg, Buff) -> case lists:keysearch(ToNode, 1, Buff) of {value, {ToNode, {_, Msgs}}} -> case length(Msgs) >= ?BUFF_MAXSIZE of false -> lists:keyreplace(ToNode, 1, Buff, {ToNode, {erlang:system_time(second), [Msg|Msgs]}}); true -> % メッセージをToNodeへ送信 gen_server:cast({?SERVER, ToNode}, {send, [Msg|Msgs]}), lists:keydelete(ToNode, 1, Buff) end; false -> [{ToNode, {erlang:system_time(second), [Msg]}}|Buff] end. flush_buffer(Buff) -> lists:foreach(fun flush_message/1, Buff). flush_message({ToNode, {_Time, Msgs}}) -> % メッセージをToNodeへ送信 gen_server:cast({?SERVER, ToNode}, {send, Msgs}).
route関数はechoサーバーまたはbenchサーバーが利用するAPIです。
APIは宛先ノード名(ToNode
)、プロセス名(Pname
)、メッセージ(Msg
)の3つを引数に取ります。
ToNode
(ノード名)をキーにしてmsg_buff
に追加していきます。buffのサイズが閾値?BUFF_MAXSIZE
を超えていた場合はメッセージを宛先ノードへ送信し、msg_buff
から削除します。
また、閾値を超えないとbuffに残り続けるので、とりあえず何も考えずに?TIMEOUT
で定期的にflushして送信させてます。
外側のノードへメッセージを送信する場合は gen_server:cast({?SERVER, Node}, Msgs)
を呼んでいます。
sendリクエストの処理では同ノード内のPname
プロセス宛にメッセージを送信します。
echoサーバーの修正
handle_cast({echo, FromNode, SenderName, Msg}, State) -> buff_router:route(FromNode, SenderName, Msg), {noreply, State}.
buff_routerを使用するように修正し、FromNodeをメッセージの中に含めています。 buff_routerがどのノードに送信すればいいかを判断させるために追加されています。
benchサーバーの修正
send_echo(0, _) -> ok; send_echo(N, ToNode) -> spawn(buff_router, route, [ToNode, echo_server, {echo, node(), bench_server, <<"echo message">>}]), send_echo(N-1, ToNode).
こちらも、buff_routerを使うように修正しています。
benchスクリプトの修正
buff_router:start_link(),
上記をbench_server:start_bench/2
の前に呼びます。
計測
$ ./bench/bench.erl bacon@127.0.0.1 /home/tkyshm/erlang/src/bench/queuebuf/bench start bench at 'bench@127.0.0.1' timeout sent time(msec) rate(req/sec) 0 1000 213 4694.835681 0 10000 239 41841.004184 0 50000 354 141242.937853 0 100000 430 232558.139535 0 200000 644 310559.006211 0 300000 902 332594.235033 0 500000 1370 364963.503650
先程の300,000と500,000を比較すると大分改善されていそうです。
timeout sent time(msec) rate(req/sec) ------------------------------------------------ # バッファなし 0 300000 7956 37707.390649 0 500000 14602 34241.884673 ------------------------------------------------ # バッファあり 0 300000 902 332594.235033 0 500000 1370 364963.503650
多分改善されました。
これより先の細かい調査とかはしてないので、今回はこの結果でお終いです。
※ 記録としてgithubにもあげておきました。
erlang-bench/queuebuf at master · tkyshm/erlang-bench · GitHub
感想
Erlangのノード間のメッセージパッシングはTCP通信になるため、単純に通信回数を減らすことで速度は上がるのかなと漠然と思ってました。 ただ、挙動としては普通にメールボックスに溜まりすぎたのが原因になっていそうな雰囲気だったけれど、まだそこまでの詳細は確認してないです (http://erlang.org/doc/man/erlang.html#process_flag-2のmessage_queue_dataで書かれているGCのコストかもしれない)。 どうやって調べればいいかも含めて、時間があるときにやろうかなと思ってます。
今回みたいなバッファに貯めたメッセージをドカンと送ってしまう、みたいなやり方で実装してみたものの、バッファサイズやタイムアウト(そもそもこのタイムアウトがイケてなさそう)などの設定値含めて検証が必要とも感じました。思っただけで、やってません。
また、このベンチサーバーたまに失敗します。原因は分かってますが直してません。
参考
Stuff Goes Bad: Erlang in Anger
GitHub - ferd/pobox: External buffer processes to protect against mailbox overflow in Erlang