xref: /5.5.2/ns_server/src/doc_replicator.erl (revision d1858ba3)
1%% @author Couchbase <info@couchbase.com>
2%% @copyright 2014-2018 Couchbase, Inc.
3%%
4%% Licensed under the Apache License, Version 2.0 (the "License");
5%% you may not use this file except in compliance with the License.
6%% You may obtain a copy of the License at
7%%
8%%      http://www.apache.org/licenses/LICENSE-2.0
9%%
10%% Unless required by applicable law or agreed to in writing, software
11%% distributed under the License is distributed on an "AS IS" BASIS,
12%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13%% See the License for the specific language governing permissions and
14%% limitations under the License.
15%%
16%% @doc process responsible for pushing document changes to other nodes
17%%
18
19-module(doc_replicator).
20
21-include("ns_common.hrl").
22-include("pipes.hrl").
23
24-export([start_link/3]).
25
26start_link(Name, GetNodes, StorageFrontend) ->
27    proc_lib:start_link(
28      erlang, apply, [fun start_loop/3, [Name, GetNodes, StorageFrontend]]).
29
30start_loop(Name, GetNodes, StorageFrontend) ->
31    erlang:register(Name, self()),
32    proc_lib:init_ack({ok, self()}),
33    LocalStoragePid = replicated_storage:wait_for_startup(),
34
35    %% anytime we disconnect or reconnect, force a replicate event.
36    erlang:spawn_link(
37      fun () ->
38              ok = net_kernel:monitor_nodes(true),
39              nodeup_monitoring_loop(LocalStoragePid)
40      end),
41
42    %% Explicitly ask all available nodes to send their documents to us
43    [{StorageFrontend, N} ! replicate_newnodes_docs ||
44        N <- GetNodes()],
45
46    loop(GetNodes, StorageFrontend, []).
47
48loop(GetNodes, StorageFrontend, RemoteNodes) ->
49    NewRemoteNodes =
50        receive
51            {replicate_change, Id, Doc} ->
52                lists:foreach(
53                  fun (Node) ->
54                          replicate_change_to_node(
55                            StorageFrontend, Node, Id, Doc)
56                  end, RemoteNodes),
57                RemoteNodes;
58            {replicate_newnodes_docs, Producer} ->
59                AllNodes = GetNodes(),
60                ?log_debug("doing replicate_newnodes_docs"),
61
62                NewNodes = AllNodes -- RemoteNodes,
63                case NewNodes of
64                    [] ->
65                        ok;
66                    _ ->
67                        lists:foreach(
68                          fun (Node) ->
69                                  monitor(process, {StorageFrontend, Node})
70                          end, NewNodes),
71                        pipes:foreach(
72                          Producer,
73                          fun (Docs) ->
74                                  lists:foreach(
75                                    fun (Node) ->
76                                            replicate_changes_to_node(
77                                              StorageFrontend, Node, Docs)
78                                    end, NewNodes)
79                          end)
80                end,
81                AllNodes;
82            {sync_token, From} ->
83                ?log_debug("Received sync_token from ~p", [From]),
84                gen_server:reply(From, ok),
85                RemoteNodes;
86            {'$gen_call', From, {sync_to_me, NodesWanted, Timeout}} ->
87                ?log_debug("Received sync_to_me with timeout = ~p, nodes = ~p",
88                           [Timeout, NodesWanted]),
89                proc_lib:spawn_link(
90                  fun () ->
91                          handle_sync_to_me(From, StorageFrontend, NodesWanted,
92                                            Timeout)
93                  end),
94                RemoteNodes;
95            {'DOWN', _Ref, _Type, {Server, RemoteNode}, Error} ->
96                ?log_warning("Remote server node ~p process down: ~p",
97                             [{Server, RemoteNode}, Error]),
98                RemoteNodes -- [RemoteNode];
99            Msg ->
100                ?log_error("Got unexpected message: ~p", [Msg]),
101                exit({unexpected_message, Msg})
102        end,
103
104    loop(GetNodes, StorageFrontend, NewRemoteNodes).
105
106replicate_changes_to_node(StorageFrontend, Node, {batch, Docs})
107  when is_list(Docs) andalso Docs =/= [] ->
108    CompressedBatch = misc:compress(Docs),
109    ?log_debug("Sending batch of size ~p to ~p", [size(CompressedBatch), Node]),
110    gen_server:cast({StorageFrontend, Node}, {replicated_batch, CompressedBatch});
111replicate_changes_to_node(StorageFrontend, Node, Docs) when is_list(Docs) ->
112    lists:foreach(
113      fun ({Id, Doc}) ->
114              replicate_change_to_node(StorageFrontend, Node, Id, Doc)
115      end, Docs).
116
117replicate_change_to_node(StorageFrontend, Node, Id, Doc) ->
118    ?log_debug("Sending ~p to ~p", [ns_config_log:tag_user_data(Id), Node]),
119    gen_server:cast({StorageFrontend, Node}, {replicated_update, Doc}).
120
121nodeup_monitoring_loop(LocalStoragePid) ->
122    receive
123        {nodeup, _} ->
124            ?log_debug("got nodeup event. Considering ddocs replication"),
125            LocalStoragePid ! replicate_newnodes_docs;
126        _ ->
127            ok
128    end,
129    nodeup_monitoring_loop(LocalStoragePid).
130
131handle_sync_to_me(From, StorageFrontend, Nodes, Timeout) ->
132    Results = async:map(
133                fun (Node) ->
134                        gen_server:call({StorageFrontend, Node}, sync_token, Timeout)
135                end, Nodes),
136    case lists:filter(
137           fun ({_Node, Result}) ->
138                   Result =/= ok
139           end, lists:zip(Nodes, Results)) of
140        [] ->
141            gen_server:reply(From, ok);
142        Failed ->
143            gen_server:reply(From, {error, Failed})
144    end.
145