1% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2% use this file except in compliance with the License. You may obtain a copy of
3% the License at
4%
5%   http://www.apache.org/licenses/LICENSE-2.0
6%
7% Unless required by applicable law or agreed to in writing, software
8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10% License for the specific language governing permissions and limitations under
11% the License.
12
13-module(couch_compaction_daemon).
14-behaviour(gen_server).
15
16% public API
17-export([start_link/0, config_change/3]).
18
19% gen_server callbacks
20-export([init/1, handle_call/3, handle_info/2, handle_cast/2]).
21-export([code_change/3, terminate/2]).
22
23-include("couch_db.hrl").
24
25-define(CONFIG_ETS, couch_compaction_daemon_config).
26-define(DISK_CHECK_PERIOD, 1).          % minutes
27-define(KV_RE,
28    [$^, "\\s*", "([^=]+?)", "\\s*", $=, "\\s*", "([^=]+?)", "\\s*", $$]).
29-define(PERIOD_RE,
30    [$^, "([^-]+?)", "\\s*", $-, "\\s*", "([^-]+?)", $$]).
31
32-record(state, {
33    loop_pid
34}).
35
36-record(config, {
37    db_frag = nil,
38    view_frag = nil,
39    period = nil,
40    cancel = false,
41    parallel_view_compact = false
42}).
43
44-record(period, {
45    from = nil,
46    to = nil
47}).
48
49
50start_link() ->
51    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
52
53
54init(_) ->
55    process_flag(trap_exit, true),
56    ?CONFIG_ETS = ets:new(?CONFIG_ETS, [named_table, set, protected]),
57    ok = couch_config:register(fun ?MODULE:config_change/3),
58    load_config(),
59    Server = self(),
60    Loop = spawn_link(fun() -> compact_loop(Server) end),
61    {ok, #state{loop_pid = Loop}}.
62
63
64config_change("compactions", DbName, NewValue) ->
65    ok = gen_server:cast(?MODULE, {config_update, DbName, NewValue}).
66
67
68handle_cast({config_update, DbName, deleted}, State) ->
69    true = ets:delete(?CONFIG_ETS, ?l2b(DbName)),
70    {noreply, State};
71
72handle_cast({config_update, DbName, Config}, #state{loop_pid = Loop} = State) ->
73    case parse_config(DbName, Config) of
74    {ok, NewConfig} ->
75        WasEmpty = (ets:info(?CONFIG_ETS, size) =:= 0),
76        true = ets:insert(?CONFIG_ETS, {?l2b(DbName), NewConfig}),
77        case WasEmpty of
78        true ->
79            Loop ! {self(), have_config};
80        false ->
81            ok
82        end;
83    error ->
84        ok
85    end,
86    {noreply, State}.
87
88
89handle_call(Msg, _From, State) ->
90    {stop, {unexpected_call, Msg}, State}.
91
92
93handle_info({'EXIT', Pid, Reason}, #state{loop_pid = Pid} = State) ->
94    {stop, {compaction_loop_died, Reason}, State}.
95
96
97terminate(_Reason, _State) ->
98    true = ets:delete(?CONFIG_ETS).
99
100
101code_change(_OldVsn, State, _Extra) ->
102    {ok, State}.
103
104
105compact_loop(Parent) ->
106    {ok, _} = couch_server:all_databases(
107        fun(DbName, Acc) ->
108            case ets:info(?CONFIG_ETS, size) =:= 0 of
109            true ->
110                {stop, Acc};
111            false ->
112                case get_db_config(DbName) of
113                nil ->
114                    ok;
115                {ok, Config} ->
116                    case check_period(Config) of
117                    true ->
118                        maybe_compact_db(DbName, Config);
119                    false ->
120                        ok
121                    end
122                end,
123                {ok, Acc}
124            end
125        end, ok),
126    case ets:info(?CONFIG_ETS, size) =:= 0 of
127    true ->
128        receive {Parent, have_config} -> ok end;
129    false ->
130        PausePeriod = list_to_integer(
131            couch_config:get("compaction_daemon", "check_interval", "60")),
132        ok = timer:sleep(PausePeriod * 1000)
133    end,
134    compact_loop(Parent).
135
136
137maybe_compact_db(DbName, Config) ->
138    case (catch couch_db:open_int(DbName, [])) of
139    {ok, Db} ->
140        DDocNames = db_ddoc_names(Db),
141        case can_db_compact(Config, Db) of
142        true ->
143            {ok, DbCompactPid} = couch_db:start_compact(Db),
144            TimeLeft = compact_time_left(Config),
145            case Config#config.parallel_view_compact of
146            true ->
147                ViewsCompactPid = spawn_link(fun() ->
148                    maybe_compact_views(DbName, DDocNames, Config)
149                end),
150                ViewsMonRef = erlang:monitor(process, ViewsCompactPid);
151            false ->
152                ViewsMonRef = nil
153            end,
154            DbMonRef = erlang:monitor(process, DbCompactPid),
155            receive
156            {'DOWN', DbMonRef, process, _, normal} ->
157                couch_db:close(Db),
158                case Config#config.parallel_view_compact of
159                true ->
160                    ok;
161                false ->
162                    maybe_compact_views(DbName, DDocNames, Config)
163                end;
164            {'DOWN', DbMonRef, process, _, Reason} ->
165                couch_db:close(Db),
166                ?LOG_ERROR("Compaction daemon - an error ocurred while"
167                    " compacting the database `~s`: ~p", [DbName, Reason])
168            after TimeLeft ->
169                ?LOG_INFO("Compaction daemon - canceling compaction for database"
170                    " `~s` because it's exceeding the allowed period.",
171                    [DbName]),
172                erlang:demonitor(DbMonRef, [flush]),
173                ok = couch_db:cancel_compact(Db),
174                couch_db:close(Db)
175            end,
176            case ViewsMonRef of
177            nil ->
178                ok;
179            _ ->
180                receive
181                {'DOWN', ViewsMonRef, process, _, _Reason} ->
182                    ok
183                end
184            end;
185        false ->
186            couch_db:close(Db),
187            maybe_compact_views(DbName, DDocNames, Config)
188        end;
189    _ ->
190        ok
191    end.
192
193
194maybe_compact_views(_DbName, [], _Config) ->
195    ok;
196maybe_compact_views(DbName, [DDocName | Rest], Config) ->
197    case check_period(Config) of
198    true ->
199        case maybe_compact_view(DbName, DDocName, Config) of
200        ok ->
201            maybe_compact_views(DbName, Rest, Config);
202        timeout ->
203            ok
204        end;
205    false ->
206        ok
207    end.
208
209
210db_ddoc_names(Db) ->
211    {ok, _, DDocNames} = couch_db:enum_docs(
212        Db,
213        fun(#doc_info{id = <<"_design/", _/binary>>, deleted = true}, _, Acc) ->
214            {ok, Acc};
215        (#doc_info{id = <<"_design/", Id/binary>>}, _, Acc) ->
216            {ok, [Id | Acc]};
217        (_, _, Acc) ->
218            {stop, Acc}
219        end, [], [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}]),
220    DDocNames.
221
222
223maybe_compact_view(DbName, GroupId, Config) ->
224    DDocId = <<"_design/", GroupId/binary>>,
225    case (catch couch_view:get_group_info(DbName, DDocId)) of
226    {ok, GroupInfo} ->
227        case can_view_compact(Config, DbName, GroupId, GroupInfo) of
228        true ->
229            {ok, CompactPid} = couch_view_compactor:start_compact(DbName, GroupId),
230            TimeLeft = compact_time_left(Config),
231            MonRef = erlang:monitor(process, CompactPid),
232            receive
233            {'DOWN', MonRef, process, CompactPid, normal} ->
234                ok;
235            {'DOWN', MonRef, process, CompactPid, Reason} ->
236                ?LOG_ERROR("Compaction daemon - an error ocurred while compacting"
237                    " the view group `~s` from database `~s`: ~p",
238                    [GroupId, DbName, Reason]),
239                ok
240            after TimeLeft ->
241                ?LOG_INFO("Compaction daemon - canceling the compaction for the "
242                    "view group `~s` of the database `~s` because it's exceeding"
243                    " the allowed period.", [GroupId, DbName]),
244                erlang:demonitor(MonRef, [flush]),
245                ok = couch_view_compactor:cancel_compact(DbName, GroupId),
246                timeout
247            end;
248        false ->
249            ok
250        end;
251    Error ->
252        ?LOG_ERROR("Error opening view group `~s` from database `~s`: ~p",
253            [GroupId, DbName, Error]),
254        ok
255    end.
256
257
258compact_time_left(#config{cancel = false}) ->
259    infinity;
260compact_time_left(#config{period = nil}) ->
261    infinity;
262compact_time_left(#config{period = #period{to = {ToH, ToM} = To}}) ->
263    {H, M, _} = time(),
264    case To > {H, M} of
265    true ->
266        ((ToH - H) * 60 * 60 * 1000) + (abs(ToM - M) * 60 * 1000);
267    false ->
268        ((24 - H + ToH) * 60 * 60 * 1000) + (abs(ToM - M) * 60 * 1000)
269    end.
270
271
272get_db_config(DbName) ->
273    case ets:lookup(?CONFIG_ETS, DbName) of
274    [] ->
275        case ets:lookup(?CONFIG_ETS, <<"_default">>) of
276        [] ->
277            nil;
278        [{<<"_default">>, Config}] ->
279            {ok, Config}
280        end;
281    [{DbName, Config}] ->
282        {ok, Config}
283    end.
284
285
286can_db_compact(#config{db_frag = Threshold} = Config, Db) ->
287    case check_period(Config) of
288    false ->
289        false;
290    true ->
291        {ok, DbInfo} = couch_db:get_db_info(Db),
292        {Frag, SpaceRequired} = frag(DbInfo),
293        ?LOG_DEBUG("Fragmentation for database `~s` is ~p%, estimated space for"
294           " compaction is ~p bytes.", [Db#db.name, Frag, SpaceRequired]),
295        case check_frag(Threshold, Frag) of
296        false ->
297            false;
298        true ->
299            Free = free_space(couch_config:get("couchdb", "database_dir")),
300            case Free >= SpaceRequired of
301            true ->
302                true;
303            false ->
304                ?LOG_INFO("Compaction daemon - skipping database `~s` "
305                    "compaction: the estimated necessary disk space is about ~p"
306                    " bytes but the currently available disk space is ~p bytes.",
307                   [Db#db.name, SpaceRequired, Free]),
308                false
309            end
310        end
311    end.
312
313can_view_compact(Config, DbName, GroupId, GroupInfo) ->
314    case check_period(Config) of
315    false ->
316        false;
317    true ->
318        case couch_util:get_value(updater_running, GroupInfo) of
319        true ->
320            false;
321        false ->
322            {Frag, SpaceRequired} = frag(GroupInfo),
323            ?LOG_DEBUG("Fragmentation for view group `~s` (database `~s`) is "
324                "~p%, estimated space for compaction is ~p bytes.",
325                [GroupId, DbName, Frag, SpaceRequired]),
326            case check_frag(Config#config.view_frag, Frag) of
327            false ->
328                false;
329            true ->
330                Free = free_space(couch_config:get("couchdb", "view_index_dir")),
331                case Free >= SpaceRequired of
332                true ->
333                    true;
334                false ->
335                    ?LOG_INFO("Compaction daemon - skipping view group `~s` "
336                        "compaction (database `~s`): the estimated necessary "
337                        "disk space is about ~p bytes but the currently available"
338                        " disk space is ~p bytes.",
339                        [GroupId, DbName, SpaceRequired, Free]),
340                    false
341                end
342            end
343        end
344    end.
345
346
347check_period(#config{period = nil}) ->
348    true;
349check_period(#config{period = #period{from = From, to = To}}) ->
350    {HH, MM, _} = erlang:time(),
351    case From < To of
352    true ->
353        ({HH, MM} >= From) andalso ({HH, MM} < To);
354    false ->
355        ({HH, MM} >= From) orelse ({HH, MM} < To)
356    end.
357
358
359check_frag(nil, _) ->
360    true;
361check_frag(Threshold, Frag) ->
362    Frag >= Threshold.
363
364
365frag(Props) ->
366    FileSize = couch_util:get_value(disk_size, Props),
367    MinFileSize = list_to_integer(
368        couch_config:get("compaction_daemon", "min_file_size", "131072")),
369    case FileSize < MinFileSize of
370    true ->
371        {0, FileSize};
372    false ->
373        case couch_util:get_value(data_size, Props) of
374        null ->
375            {100, FileSize};
376        0 ->
377            {0, FileSize};
378        DataSize ->
379            Frag = round(((FileSize - DataSize) / FileSize * 100)),
380            {Frag, space_required(DataSize)}
381        end
382    end.
383
384% Rough, and pessimistic, estimation of necessary disk space to compact a
385% database or view index.
386space_required(DataSize) ->
387    round(DataSize * 2.0).
388
389
390load_config() ->
391    lists:foreach(
392        fun({DbName, ConfigString}) ->
393            case parse_config(DbName, ConfigString) of
394            {ok, Config} ->
395                true = ets:insert(?CONFIG_ETS, {?l2b(DbName), Config});
396            error ->
397                ok
398            end
399        end,
400        couch_config:get("compactions")).
401
402parse_config(DbName, ConfigString) ->
403    case (catch do_parse_config(ConfigString)) of
404    {ok, Conf} ->
405        {ok, Conf};
406    incomplete_period ->
407        ?LOG_ERROR("Incomplete period ('to' or 'from' missing) in the compaction"
408            " configuration for database `~s`", [DbName]),
409        error;
410    _ ->
411        ?LOG_ERROR("Invalid compaction configuration for database "
412            "`~s`: `~s`", [DbName, ConfigString]),
413        error
414    end.
415
416do_parse_config(ConfigString) ->
417    {ok, ConfProps} = couch_util:parse_term(ConfigString),
418    {ok, #config{period = Period} = Conf} = config_record(ConfProps, #config{}),
419    case Period of
420    nil ->
421        {ok, Conf};
422    #period{from = From, to = To} when From =/= nil, To =/= nil ->
423        {ok, Conf};
424    #period{} ->
425        incomplete_period
426    end.
427
428config_record([], Config) ->
429    {ok, Config};
430
431config_record([{db_fragmentation, V} | Rest], Config) ->
432    [Frag] = string:tokens(V, "%"),
433    config_record(Rest, Config#config{db_frag = list_to_integer(Frag)});
434
435config_record([{view_fragmentation, V} | Rest], Config) ->
436    [Frag] = string:tokens(V, "%"),
437    config_record(Rest, Config#config{view_frag = list_to_integer(Frag)});
438
439config_record([{from, V} | Rest], #config{period = Period0} = Config) ->
440    Time = parse_time(V),
441    Period = case Period0 of
442    nil ->
443        #period{from = Time};
444    #period{} ->
445        Period0#period{from = Time}
446    end,
447    config_record(Rest, Config#config{period = Period});
448
449config_record([{to, V} | Rest], #config{period = Period0} = Config) ->
450    Time = parse_time(V),
451    Period = case Period0 of
452    nil ->
453        #period{to = Time};
454    #period{} ->
455        Period0#period{to = Time}
456    end,
457    config_record(Rest, Config#config{period = Period});
458
459config_record([{strict_window, true} | Rest], Config) ->
460    config_record(Rest, Config#config{cancel = true});
461
462config_record([{strict_window, false} | Rest], Config) ->
463    config_record(Rest, Config#config{cancel = false});
464
465config_record([{parallel_view_compaction, true} | Rest], Config) ->
466    config_record(Rest, Config#config{parallel_view_compact = true});
467
468config_record([{parallel_view_compaction, false} | Rest], Config) ->
469    config_record(Rest, Config#config{parallel_view_compact = false}).
470
471
472parse_time(String) ->
473    [HH, MM] = string:tokens(String, ":"),
474    {list_to_integer(HH), list_to_integer(MM)}.
475
476
477free_space(Path) ->
478    DiskData = lists:sort(
479        fun({PathA, _, _}, {PathB, _, _}) ->
480            length(filename:split(PathA)) > length(filename:split(PathB))
481        end,
482        disksup:get_disk_data()),
483    free_space_rec(abs_path(Path), DiskData).
484
485free_space_rec(_Path, []) ->
486    undefined;
487free_space_rec(Path, [{MountPoint0, Total, Usage} | Rest]) ->
488    MountPoint = abs_path(MountPoint0),
489    case MountPoint =:= string:substr(Path, 1, length(MountPoint)) of
490    false ->
491        free_space_rec(Path, Rest);
492    true ->
493        trunc(Total - (Total * (Usage / 100))) * 1024
494    end.
495
496abs_path(Path0) ->
497    Path = filename:absname(Path0),
498    case lists:last(Path) of
499    $/ ->
500        Path;
501    _ ->
502        Path ++ "/"
503    end.
504