elib/ernie_server.erl in mojombo-ernie-0.3.1 vs elib/ernie_server.erl in mojombo-ernie-0.3.2

- old
+ new

@@ -9,10 +9,14 @@ terminate/2, code_change/3]). -record(state, {lsock = undefined, pending = queue:new()}). +-record(request, {sock = undefined, + info = undefined, + action = undefined}). + %%==================================================================== %% API %%==================================================================== start_link(Args) -> @@ -62,34 +66,21 @@ %% {noreply, State, Timeout} | %% {stop, Reason, State} %% Description: Handling cast messages %%-------------------------------------------------------------------- handle_cast({process, Sock}, State) -> - case gen_tcp:recv(Sock, 0) of - {ok, BinaryTerm} -> - logger:debug("Got binary term: ~p~n", [BinaryTerm]), - Term = binary_to_term(BinaryTerm), - logger:info("Got term: ~p~n", [Term]), - case Term of - {call, '__admin__', Fun, Args} -> - State2 = process_admin(Sock, Fun, Args, State); - Any -> - State2 = process_normal(BinaryTerm, Sock, State) - end, - {noreply, State2}; - {error, closed} -> - ok = gen_tcp:close(Sock), - {noreply, State} - end; + Request = #request{sock = Sock}, + State2 = receive_term(Request, State), + {noreply, State2}; handle_cast({asset_freed}, State) -> case queue:is_empty(State#state.pending) of false -> case asset_pool:lease() of {ok, Asset} -> - {{value, {pending, BinaryTerm, Sock}}, Pending2} = queue:out(State#state.pending), + {{value, Request}, Pending2} = queue:out(State#state.pending), % io:format("d", []), - spawn(fun() -> process_now(BinaryTerm, Sock, Asset) end), + spawn(fun() -> process_now(Request, Asset) end), {noreply, State#state{pending = Pending2}}; empty -> % io:format(".", []), {noreply, State} end; @@ -133,43 +124,93 @@ process_admin(Sock, reload_handlers, _Args, State) -> asset_pool:reload_assets(), gen_tcp:send(Sock, term_to_binary({reply, <<"Handlers reloaded.">>})), ok = gen_tcp:close(Sock), State; -process_admin(Sock, Fun, _Args, State) -> +process_admin(Sock, _Fun, _Args, State) -> gen_tcp:send(Sock, term_to_binary({reply, <<"Admin function not supported.">>})), ok = gen_tcp:close(Sock), State. -process_normal(BinaryTerm, Sock, State) -> +receive_term(Request, State) -> + Sock = Request#request.sock, + case gen_tcp:recv(Sock, 0) of + {ok, BinaryTerm} -> + logger:debug("Got binary term: ~p~n", [BinaryTerm]), + Term = binary_to_term(BinaryTerm), + logger:info("Got term: ~p~n", [Term]), + case Term of + {call, '__admin__', Fun, Args} -> + process_admin(Sock, Fun, Args, State); + {info, _Command, _Args} -> + Request2 = Request#request{info = BinaryTerm}, + receive_term(Request2, State); + _Any -> + Request2 = Request#request{action = BinaryTerm}, + process_request(Request2, State) + end; + {error, closed} -> + ok = gen_tcp:close(Sock), + State + end. + +process_request(Request, State) -> + ActionTerm = binary_to_term(Request#request.action), + case ActionTerm of + {cast, _Mod, _Fun, _Args} -> + Sock = Request#request.sock, + gen_tcp:send(Sock, term_to_binary({noreply})), + ok = gen_tcp:close(Sock), + logger:debug("Closing cast.~n", []); + _Any -> + ok + end, case queue:is_empty(State#state.pending) of false -> - Pending2 = queue:in({pending, BinaryTerm, Sock}, State#state.pending), + Pending2 = queue:in(Request, State#state.pending), % io:format("Q", []), State#state{pending = Pending2}; true -> - try_process_now(BinaryTerm, Sock, State) + try_process_now(Request, State) end. -try_process_now(BinaryTerm, Sock, State) -> +try_process_now(Request, State) -> case asset_pool:lease() of {ok, Asset} -> % io:format("i", []), - spawn(fun() -> process_now(BinaryTerm, Sock, Asset) end), + spawn(fun() -> process_now(Request, Asset) end), State; empty -> % io:format("q", []), - Pending2 = queue:in({pending, BinaryTerm, Sock}, State#state.pending), + Pending2 = queue:in(Request, State#state.pending), State#state{pending = Pending2} end. -process_now(BinaryTerm, Sock, Asset) -> - % io:format(".", []), - % error_logger:info_msg("From Internet: ~p~n", [BinaryTerm]), - {asset, Port, Token} = Asset, - logger:debug("Asset: ~p ~p~n", [Port, Token]), - {ok, Data} = port_wrapper:rpc(Port, BinaryTerm), - % error_logger:info_msg("From Port: ~p~n", [Data]), - asset_pool:return(Asset), - ernie_server:asset_freed(), - gen_tcp:send(Sock, Data), - ok = gen_tcp:close(Sock). +process_now(Request, Asset) -> + try unsafe_process_now(Request, Asset) of + _AnyResponse -> ok + catch + _AnyError -> ok + after + asset_pool:return(Asset), + ernie_server:asset_freed(), + gen_tcp:close(Request#request.sock) + end. + +unsafe_process_now(Request, Asset) -> + BinaryTerm = Request#request.action, + Term = binary_to_term(BinaryTerm), + case Term of + {call, Mod, Fun, Args} -> + logger:debug("Calling ~p:~p(~p)~n", [Mod, Fun, Args]), + Sock = Request#request.sock, + {asset, Port, Token} = Asset, + logger:debug("Asset: ~p ~p~n", [Port, Token]), + {ok, Data} = port_wrapper:rpc(Port, BinaryTerm), + gen_tcp:send(Sock, Data), + ok = gen_tcp:close(Sock); + {cast, Mod, Fun, Args} -> + logger:debug("Casting ~p:~p(~p)~n", [Mod, Fun, Args]), + {asset, Port, Token} = Asset, + logger:debug("Asset: ~p ~p~n", [Port, Token]), + {ok, _Data} = port_wrapper:rpc(Port, BinaryTerm) + end. \ No newline at end of file