1%% @author Couchbase <info@couchbase.com>
2%% @copyright 2017-2018 Couchbase, Inc.
3%%
4%% Licensed under the Apache License, Version 2.0 (the "License");
5%% you may not use this file except in compliance with the License.
6%% You may obtain a copy of the License at
7%%
8%%      http://www.apache.org/licenses/LICENSE-2.0
9%%
10%% Unless required by applicable law or agreed to in writing, software
11%% distributed under the License is distributed on an "AS IS" BASIS,
12%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13%% See the License for the specific language governing permissions and
14%% limitations under the License.
15
16-module(replicated_storage).
17
18-behaviour(gen_server).
19
20-export([start_link/4, start_link_remote/5, wait_for_startup/0,
21         anounce_startup/1, sync_to_me/3]).
22
23-export([init/1, handle_call/3, handle_cast/2,
24         handle_info/2, terminate/2, code_change/3]).
25
26-callback init(term()) -> term().
27-callback init_after_ack(term()) -> term().
28-callback get_id(term()) -> term().
29-callback find_doc(term(), term()) -> term() | false.
30-callback find_doc_rev(term(), term()) -> term() | false.
31-callback all_docs(pid()) -> term().
32-callback get_revision(term()) -> term().
33-callback set_revision(term(), term()) -> term().
34-callback is_deleted(term()) -> boolean().
35-callback save_docs([term()], term()) -> {ok, term()} | {error, term()}.
36
37-include("ns_common.hrl").
38-include("pipes.hrl").
39
40-record(state, {child_module :: atom(),
41                child_state :: term(),
42                replicator :: pid()
43               }).
44
45start_link(Name, Module, InitParams, Replicator) ->
46    gen_server:start_link({local, Name}, ?MODULE,
47                          [Module, InitParams, Replicator], []).
48
49start_link_remote(Node, Name, Module, InitParams, Replicator) ->
50    misc:start_link(Node, misc, turn_into_gen_server,
51                    [{local, Name}, ?MODULE,
52                     [Module, InitParams, Replicator], []]).
53
54wait_for_startup() ->
55    ?log_debug("Start waiting for startup"),
56    receive
57        {replicated_storege_pid, Pid} ->
58            ?log_debug("Received replicated storage registration from ~p", [Pid]),
59            Pid;
60        {'EXIT', ExitPid, Reason} ->
61            ?log_debug("Received exit from ~p with reason ~p", [ExitPid, Reason]),
62            exit(Reason)
63    after 10000 ->
64            ?log_error("Waited 10000 ms for replicated storage pid to no avail. Crash."),
65            exit(replicated_storage_not_available)
66    end.
67
68anounce_startup(Pid) ->
69    ?log_debug("Announce my startup to ~p", [Pid]),
70    Pid ! {replicated_storege_pid, self()}.
71
72sync_to_me(Name, Nodes, Timeout) ->
73    gen_server:call(Name, {sync_to_me, Nodes, Timeout}, infinity).
74
75init([Module, InitParams, Replicator]) ->
76    Self = self(),
77    ChildState1 = Module:init(InitParams),
78    Self ! replicate_newnodes_docs,
79
80    proc_lib:init_ack({ok, Self}),
81
82    ChildState2 = Module:init_after_ack(ChildState1),
83    gen_server:enter_loop(?MODULE, [],
84                          #state{child_module = Module,
85                                 child_state = ChildState2,
86                                 replicator = Replicator}).
87
88handle_call({interactive_update, Doc}, _From,
89            #state{child_module = Module,
90                   child_state = ChildState,
91                   replicator = Replicator} = State) ->
92    Rand = crypto:rand_uniform(0, 16#100000000),
93    RandBin = <<Rand:32/integer>>,
94    {NewRev, FoundType} =
95        case Module:find_doc(Module:get_id(Doc), ChildState) of
96            false ->
97                {{1, RandBin}, missing};
98            ExistingDoc ->
99                {Pos, _DiskRev} = Module:get_revision(ExistingDoc),
100                Deleted = Module:is_deleted(ExistingDoc),
101                FoundType0 = case Deleted of
102                                 true ->
103                                     deleted;
104                                 false ->
105                                     existent
106                             end,
107                {{Pos + 1, RandBin}, FoundType0}
108        end,
109
110    case Module:is_deleted(Doc) andalso FoundType =/= existent of
111        true ->
112            {reply, {not_found, FoundType}, State};
113        false ->
114            NewDoc = Module:set_revision(Doc, NewRev),
115            ?log_debug("Writing interactively saved doc ~p",
116                       [ns_config_log:sanitize(NewDoc, true)]),
117            case Module:save_docs([NewDoc], ChildState) of
118                {ok, NewChildState} ->
119                    Replicator ! {replicate_change, Module:get_id(NewDoc),
120                                  Module:on_replicate_out(NewDoc)},
121                    {reply, ok, State#state{child_state = NewChildState}};
122                {error, Error} ->
123                    {reply, Error, State}
124            end
125    end;
126handle_call({mass_update, Context}, From, #state{child_module = Module,
127                                                 child_state = ChildState} = State) ->
128    Updater =
129        ?make_consumer(
130           pipes:fold(
131             ?producer(),
132             fun (Doc, {Errors, St}) ->
133                     {reply, RV, NewSt} =
134                         handle_call({interactive_update, Doc}, From, St),
135                     {case RV of
136                          ok ->
137                              Errors;
138                          Error ->
139                              [{Doc, Error} | Errors]
140                      end, NewSt}
141             end, {[], State})),
142    {RV1, NewState} =
143        Module:handle_mass_update(Context, Updater, ChildState),
144    {reply, RV1, NewState};
145handle_call(sync_token, From, #state{replicator = Replicator} = State) ->
146    ?log_debug("Received sync_token from ~p", [From]),
147    Replicator ! {sync_token, From},
148    {noreply, State};
149handle_call({sync_to_me, Nodes, Timeout}, From,
150            #state{replicator = Replicator} = State) ->
151    ?log_debug("Received sync_to_me with timeout = ~p, nodes = ~p",
152               [Timeout, Nodes]),
153    proc_lib:spawn_link(
154      fun () ->
155              Res = gen_server:call(Replicator, {sync_to_me, Nodes, Timeout},
156                                    infinity),
157              ?log_debug("sync_to_me reply: ~p", [Res]),
158              gen_server:reply(From, Res)
159      end),
160    {noreply, State};
161handle_call(Msg, From, #state{child_module = Module, child_state = ChildState} = State) ->
162    case Module:handle_call(Msg, From, ChildState) of
163        {reply, Res, NewChildState} ->
164            {reply, Res, State#state{child_state = NewChildState}};
165        {noreply, NewChildState} ->
166            {noreply, State#state{child_state = NewChildState}}
167    end.
168
169handle_cast({replicated_batch, CompressedBatch}, State) ->
170    ?log_debug("Applying replicated batch. Size: ~p", [size(CompressedBatch)]),
171    Batch = misc:decompress(CompressedBatch),
172    true = is_list(Batch) andalso Batch =/= [],
173    {noreply, handle_replication_update(Batch, false, State)};
174handle_cast({replicated_update, Doc}, State) ->
175    {noreply, handle_replication_update([Doc], true, State)};
176
177handle_cast(Msg, #state{child_module = Module, child_state = ChildState} = State) ->
178    {noreply, NewChildState} = Module:handle_cast(Msg, ChildState),
179    {noreply, State#state{child_state = NewChildState}}.
180
181handle_info(replicate_newnodes_docs, #state{child_module = Module,
182                                            replicator = Replicator} = State) ->
183    Producer =
184        pipes:compose(
185          Module:all_docs(self()),
186          pipes:map(
187            fun ({batch, Docs}) ->
188                    ToReplicate = [Module:on_replicate_out(Doc) || Doc <- Docs],
189                    {batch, ToReplicate};
190                (DocsWithIds) ->
191                    [{Id, Module:on_replicate_out(Doc)} ||
192                        {Id, Doc} <- DocsWithIds]
193            end)),
194    Replicator ! {replicate_newnodes_docs, Producer},
195    {noreply, State};
196handle_info(Msg, #state{child_module = Module, child_state = ChildState} = State) ->
197    {noreply, NewChildState} = Module:handle_info(Msg, ChildState),
198    {noreply, State#state{child_state = NewChildState}}.
199
200terminate(_Reason, _State) ->
201    ok.
202
203handle_replication_update(Docs, NeedLog,
204                          #state{child_module = Module,
205                                 child_state = ChildState} = State) ->
206    DocsToWrite =
207        lists:filtermap(
208          fun (Doc) ->
209                  Converted = Module:on_replicate_in(Doc),
210                  case should_be_written(Converted, Module, ChildState) of
211                      true ->
212                          {true, Converted};
213                      false ->
214                          false
215                  end
216          end, Docs),
217    [?log_debug("Writing replicated doc ~p", [ns_config_log:tag_user_data(D)])
218        || NeedLog, D <- DocsToWrite],
219
220    {ok, NewChildState} =
221        case DocsToWrite of
222            [] -> {ok, ChildState};
223            _ -> Module:save_docs(DocsToWrite, ChildState)
224        end,
225    State#state{child_state = NewChildState}.
226
227code_change(_OldVsn, State, _Extra) ->
228    {ok, State}.
229
230should_be_written(Doc, Module, ChildState) ->
231    %% this is replicated from another node in the cluster. We only accept it
232    %% if it doesn't exist or the rev is higher than what we have.
233    Rev = Module:get_revision(Doc),
234    case Module:find_doc_rev(Module:get_id(Doc), ChildState) of
235        false ->
236            true;
237        DiskRev when Rev > DiskRev ->
238            true;
239        _ ->
240            false
241    end.
242