1% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2% use this file except in compliance with the License. You may obtain a copy of
3% the License at
4%
5%   http://www.apache.org/licenses/LICENSE-2.0
6%
7% Unless required by applicable law or agreed to in writing, software
8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10% License for the specific language governing permissions and limitations under
11% the License.
12
13-module(couch_file_write_guard).
14-behaviour(gen_server).
15
16-include("couch_db.hrl").
17
18% This module is a lot like a supervisor, but it's purpose isn't to keep files
19% open but rather prevent duplicate couch_file writers from being opened on
20% the same file, to prevent multiple process from performing uncoordinated
21% writes.
22% Without this module, it's possible that a not fully killed process tree
23% might be recreated and start writing while some dieing child processes
24% will still be writing to the file. Also it's possibe for a race
25% condition bug to do the same.
26% We don't use OTP supervisors as they scale poorly for lots of child
27% starts/stops (linear scans for child specs)
28
29-export([add/2, remove/1, disable_for_testing/0]).
30-export([init/1, handle_call/3, sup_start_link/0]).
31-export([handle_cast/2, code_change/3, handle_info/2, terminate/2]).
32
33add(Filepath, Pid) ->
34    gen_server:call(couch_file_write_guard, {add, Filepath, Pid}, infinity).
35
36
37remove(Pid) ->
38    gen_server:call(couch_file_write_guard, {remove, Pid}, infinity).
39
40
41disable_for_testing() ->
42    gen_server:call(couch_file_write_guard, disable_for_testing, infinity).
43
44
45sup_start_link() ->
46    gen_server:start_link({local,couch_file_write_guard},couch_file_write_guard,[],[]).
47
48
49init([]) ->
50    ets:new(couch_files_by_name, [set, private, named_table]),
51    ets:new(couch_files_by_pid, [set, private, named_table]),
52    {ok, true}.
53
54
55ets_add(Filepath, Pid) ->
56    case ets:insert_new(couch_files_by_name, {Filepath, Pid}) of
57    true ->
58        Ref = erlang:monitor(process, Pid),
59        true = ets:insert_new(couch_files_by_pid, {Pid, Filepath, Ref}),
60        ok;
61    false ->
62        already_added_to_file_write_guard
63    end.
64
65
66ets_remove(Pid) ->
67    case ets:lookup(couch_files_by_pid, Pid) of
68    [{Pid, Filepath, Ref}] ->
69        true = demonitor(Ref, [flush]),
70        true = ets:delete(couch_files_by_name, Filepath),
71        true = ets:delete(couch_files_by_pid, Pid),
72        {reply, ok, true};
73    _ ->
74        {reply, removing_unadded_file, true}
75        end.
76
77
78terminate(_Reason, _Srv) ->
79    % kill all files we are guarding, then wait for their 'DOWN'
80    [exit(Pid, kill) || {_, Pid} <-
81            ets:tab2list(couch_files_by_name)],
82    [receive {'DOWN', _MonRef, _Type, Pid, _Reason} -> ok end || {_, Pid} <-
83            ets:tab2list(couch_files_by_name)],
84    ok.
85
86
87handle_call({add, Filepath, Pid}, _From, true) ->
88    case ets_add(Filepath, Pid) of
89    ok ->
90        {reply, ok, true};
91    already_added_to_file_write_guard ->
92        [{Filepath, ExistingPid}] = ets:lookup(couch_files_by_name, Filepath),
93        case is_process_alive(ExistingPid) of
94        true ->
95            ?LOG_ERROR("Unable to add new writer pid: `~p` for `~s` as there"
96                       " is already an active writer process pid: `~p`",
97                       [ExistingPid, Filepath, Pid]),
98            {reply, already_added_to_file_write_guard, true};
99        false ->
100            {reply, ok, true} = ets_remove(ExistingPid),
101            ok = ets_add(Filepath, Pid),
102            {reply, ok, true}
103        end
104    end;
105handle_call({add, _Filepath, _Pid}, _From, false) ->
106    % no-op for testing
107    {reply, ok, false};
108handle_call({remove, Pid}, _From, true) ->
109    ets_remove(Pid);
110handle_call({remove, _Pid}, _From, false) ->
111    % no-op for testing
112    {reply, ok, false};
113handle_call(disable_for_testing, _From, _State) ->
114    {reply, ok, false}.
115
116
117handle_cast(Msg, _Server) ->
118    exit({unknown_cast_message, Msg}).
119
120
121code_change(_OldVsn, State, _Extra) ->
122    {ok, State}.
123
124
125handle_info({'DOWN', MonRef, _Type, Pid, Reason}, true) ->
126    case ets:lookup(couch_files_by_pid, Pid) of
127    [{Pid, Filepath, MonRef}] ->
128        true = ets:delete(couch_files_by_name, Filepath),
129        true = ets:delete(couch_files_by_pid, Pid),
130        {noreply, true};
131    _ ->
132        ?LOG_ERROR("Unexpected down message in couch_file_write_guard: ~p",
133                [Reason]),
134        exit(shutdown)
135    end;
136handle_info({'DOWN', _MonRef, _Type, _Pid, _Reason}, false) ->
137    % no-op for testing
138    {noreply, false}.
139
140