1%% @author Couchbase <info@couchbase.com>
2%% @copyright 2014-2018 Couchbase, Inc.
3%%
4%% Licensed under the Apache License, Version 2.0 (the "License");
5%% you may not use this file except in compliance with the License.
6%% You may obtain a copy of the License at
7%%
8%%      http://www.apache.org/licenses/LICENSE-2.0
9%%
10%% Unless required by applicable law or agreed to in writing, software
11%% distributed under the License is distributed on an "AS IS" BASIS,
12%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13%% See the License for the specific language governing permissions and
14%% limitations under the License.
15%%
16-module(json_rpc_connection).
17
18-behaviour(gen_server).
19
20-include("ns_common.hrl").
21
22-export([start_link/2,
23         perform_call/3, perform_call/4,
24         reannounce/1]).
25
26%% gen_server callbacks
27-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
28         terminate/2, code_change/3]).
29
30-record(state, {label :: string(),
31                counter :: non_neg_integer(),
32                sock :: port(),
33                id_to_caller_tid :: ets:tid()}).
34
35-define(PREFIX, "json_rpc_connection-").
36
37-define(RPC_TIMEOUT, ?get_timeout(json_rpc_timeout, 60000)).
38
39label_to_name(Pid) when is_pid(Pid) ->
40    Pid;
41label_to_name(Label) when is_list(Label)  ->
42    list_to_atom(?PREFIX ++ Label).
43
44start_link(Label, GetSocket) ->
45    gen_server:start_link(?MODULE, {Label, GetSocket}, []).
46
47perform_call(Label, Name, EJsonArg, Timeout) ->
48    EJsonArgThunk = fun () -> EJsonArg end,
49    gen_server:call(label_to_name(Label), {call, Name, EJsonArgThunk}, Timeout).
50
51perform_call(Label, Name, EJsonArg) ->
52    perform_call(Label, Name, EJsonArg, infinity).
53
54reannounce(Pid) when is_pid(Pid) ->
55    gen_server:cast(Pid, reannounce).
56
57init({Label, GetSocket}) ->
58    proc_lib:init_ack({ok, self()}),
59    InetSock = GetSocket(),
60
61    Name = label_to_name(Label),
62    case erlang:whereis(Name) of
63        undefined ->
64            ok;
65        ExistingPid ->
66            erlang:exit(ExistingPid, new_instance_created),
67            misc:wait_for_process(ExistingPid, infinity)
68    end,
69    true = erlang:register(Name, self()),
70    ok = inet:setopts(InetSock, [{nodelay, true}]),
71    IdToCaller = ets:new(ets, [set, private]),
72    _ = proc_lib:spawn_link(erlang, apply, [fun receiver_loop/3, [InetSock, self(), <<>>]]),
73    ?log_debug("Observed revrpc connection: label ~p, handling process ~p",
74               [Label, self()]),
75    gen_event:notify(json_rpc_events, {started, Label, self()}),
76
77    gen_server:enter_loop(?MODULE, [],
78                          #state{label = Label,
79                                 counter = 0,
80                                 sock = InetSock,
81                                 id_to_caller_tid = IdToCaller}).
82
83handle_cast(reannounce, #state{label = Label} = State) ->
84    gen_event:notify(json_rpc_events, {needs_update, Label, self()}),
85    {noreply, State};
86handle_cast(_Msg, _State) ->
87    erlang:error(unknown).
88
89handle_info({chunk, Chunk}, #state{id_to_caller_tid = IdToCaller} = State) ->
90    {KV} = ejson:decode(Chunk),
91    {_, Id} = lists:keyfind(<<"id">>, 1, KV),
92    [{_, From}] = ets:lookup(IdToCaller, Id),
93    ets:delete(IdToCaller, Id),
94    ale:debug(?JSON_RPC_LOGGER, "got response: ~p", [KV]),
95    {RV, Result} =
96        case lists:keyfind(<<"error">>, 1, KV) of
97            false ->
98                {ok, ok};
99            {_, null} ->
100                {ok, ok};
101            {_, Error} ->
102                case Error of
103                    <<"rpc: can't find method ", _/binary>> ->
104                        {ok, {error, method_not_found}};
105                    <<"rpc: can't find service ", _/binary>> ->
106                        {ok, {error, method_not_found}};
107                    <<"rpc: ", _/binary>> ->
108                        ?log_error("Unexpected rpc error: ~p. Die.", [Error]),
109                        {stop, {error, {rpc_error, Error}}};
110                    _ ->
111                        {ok, {error, Error}}
112                end
113        end,
114    Reply = case Result of
115                ok ->
116                    {_, Res} = lists:keyfind(<<"result">>, 1, KV),
117                    {ok, Res};
118                {error, _} ->
119                    Result
120            end,
121    gen_server:reply(From, Reply),
122    case RV of
123        stop ->
124            {stop, {error, rpc_error}, State};
125        ok ->
126            {noreply, State}
127    end;
128handle_info(socket_closed, State) ->
129    ?log_debug("Socket closed"),
130    {stop, shutdown, State};
131handle_info(Msg, State) ->
132    ?log_debug("Unknown msg: ~p", [Msg]),
133    {noreply, State}.
134
135handle_call({call, Name, EJsonArgThunk}, From, #state{counter = Counter,
136                                                      id_to_caller_tid = IdToCaller,
137                                                      sock = Sock} = State) ->
138    EJsonArg = EJsonArgThunk(),
139
140    NameB = if
141                is_list(Name) ->
142                    list_to_binary(Name);
143                true ->
144                    Name
145            end,
146    MaybeParams = case EJsonArg of
147                      undefined ->
148                          [];
149                      _ ->
150                          %% golang's jsonrpc only supports array of
151                          %% single arg
152                          [{params, [EJsonArg]}]
153                  end,
154    EJSON = {[{jsonrpc, <<"2.0">>},
155              {id, Counter},
156              {method, NameB}
157              | MaybeParams]},
158    ale:debug(?JSON_RPC_LOGGER,
159              "sending jsonrpc call:~p", [ns_config_log:sanitize(EJSON)]),
160    ok = gen_tcp:send(Sock, [ejson:encode(EJSON) | <<"\n">>]),
161    ets:insert(IdToCaller, {Counter, From}),
162    {noreply, State#state{counter = Counter + 1}}.
163
164terminate(_Reason, _State) ->
165    ok.
166
167code_change(_OldVsn, State, _Extra) ->
168    {ok, State}.
169
170
171receiver_loop(Sock, Parent, Acc) ->
172    RecvData = case gen_tcp:recv(Sock, 0) of
173                   {error, closed} ->
174                       Parent ! socket_closed,
175                       erlang:exit(normal);
176                   {ok, XRecvData} ->
177                       XRecvData
178               end,
179    Data = case Acc of
180               <<>> ->
181                   RecvData;
182               _ ->
183                   <<Acc/binary, RecvData/binary>>
184           end,
185    NewAcc = receiver_handle_data(Parent, Data),
186    receiver_loop(Sock, Parent, NewAcc).
187
188receiver_handle_data(Parent, Data) ->
189    case binary:split(Data, <<"\n">>) of
190        [Chunk, <<>>] ->
191            Parent ! {chunk, Chunk},
192            <<>>;
193        [Chunk, Rest] ->
194            Parent ! {chunk, Chunk},
195            receiver_handle_data(Parent, Rest);
196        [SingleChunk] ->
197            SingleChunk
198    end.
199