1-module(menelaus_web_alerts_srv).
2
3-include("ns_common.hrl").
4-include("ns_stats.hrl").
5
6-include_lib("eunit/include/eunit.hrl").
7%% needed to mock ns_config in tests
8-include("ns_config.hrl").
9
10-behaviour(gen_server).
11-define(SERVER, ?MODULE).
12-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
13         terminate/2, code_change/3]).
14
15-export([alert_keys/0]).
16
17%% @doc Hold client state for any alerts that need to be shown in
18%% the browser, is used by menelaus_web to piggy back for a transport
19%% until a long polling transport is used, will be single user
20%% until then, many checks for alerts every ?SAMPLE_RATE milliseconds
21
22-record(state, {
23          queue = [],
24          history = [],
25          opaque = dict:new(),
26          checker_pid,
27          change_counter = 0
28         }).
29
30%% Amount of time to wait between state checks (ms)
31-define(SAMPLE_RATE, 3000).
32
33%% Amount of time between sending users the same alert (s)
34-define(ALERT_TIMEOUT, 60 * 10).
35
36%% Amount of time to wait between checkout out of disk (s)
37-define(DISK_USAGE_TIMEOUT, 60 * 60 * 12).
38
39-export([start_link/0, stop/0, local_alert/2, global_alert/2,
40         fetch_alerts/0, consume_alerts/1]).
41
42
43%% short description for a specific error; used in email subject
44short_description(ip) ->
45    "IP address changed";
46short_description(ep_oom_errors) ->
47    "hard out of memory error";
48short_description(ep_item_commit_failed) ->
49    "write commit failure";
50short_description(overhead) ->
51    "metadata overhead warning";
52short_description(disk) ->
53    "approaching full disk warning";
54short_description(Other) ->
55    %% this case is needed for tests to work
56    couch_util:to_list(Other).
57
58%% Error constants
59errors(ip) ->
60    "IP address seems to have changed. Unable to listen on ~p.";
61errors(ep_oom_errors) ->
62    "Hard Out Of Memory Error. Bucket \"~s\" on node ~s is full. All memory allocated to this bucket is used for metadata.";
63errors(ep_item_commit_failed) ->
64    "Write Commit Failure. Disk write failed for item in Bucket \"~s\" on node ~s.";
65errors(overhead) ->
66    "Metadata overhead warning. Over  ~p% of RAM allocated to bucket  \"~s\" on node \"~s\" is taken up by keys and metadata.";
67errors(disk) ->
68    "Approaching full disk warning. Usage of disk \"~s\" on node \"~s\" is around ~p%.".
69
70%% ------------------------------------------------------------------
71%% API Function Definitions
72%% ------------------------------------------------------------------
73
74start_link() ->
75    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
76
77
78%% @doc Send alert to all connected nodes
79-spec global_alert(any(), binary() | string()) -> ok.
80global_alert(Type, Msg) ->
81    ale:info(?USER_LOGGER, to_str(Msg)),
82    [rpc:cast(Node, ?MODULE, local_alert, [{Type, node()}, Msg])
83     || Node <- [node() | nodes()]],
84    ok.
85
86
87%% @doc Show to user on running node only
88-spec local_alert({any(), node()}, binary()) -> ok | ignored.
89local_alert(Key, Val) ->
90    gen_server:call(?MODULE, {add_alert, Key, Val}).
91
92
93%% @doc fetch a list of binary string, clearing out the message
94%% history
95-spec fetch_alerts() -> {list(binary()), binary()}.
96fetch_alerts() ->
97    diag_handler:diagnosing_timeouts(
98      fun () ->
99              gen_server:call(?MODULE, fetch_alert)
100      end).
101
102-spec consume_alerts(binary()) -> boolean().
103consume_alerts(VersionCookie) ->
104    gen_server:call(?MODULE, {consume_alerts, VersionCookie}).
105
106
107stop() ->
108    gen_server:cast(?MODULE, stop).
109
110%% ------------------------------------------------------------------
111%% gen_server Function Definitions
112%% ------------------------------------------------------------------
113
114init([]) ->
115    start_timer(),
116    {ok, #state{}}.
117
118
119handle_call({consume_alerts, PassedCounter}, _From, #state{change_counter = Counter}=State) ->
120    NewState = case (catch list_to_integer(binary_to_list(PassedCounter))) of
121                   Counter ->
122                       %% match
123                       State#state{queue=[],
124                                   change_counter=Counter+1};
125                   _ ->
126                       State
127               end,
128    {reply, NewState =/= State, NewState};
129
130handle_call(fetch_alert, _From, #state{queue=Msgs, change_counter=Counter}=State) ->
131    Alerts = [Msg || {_Key, Msg, _Time} <- Msgs],
132    {reply, {lists:reverse(Alerts), list_to_binary(integer_to_list(Counter))}, State};
133
134handle_call({add_alert, Key, Val}, _, #state{queue=Msgs, history=Hist, change_counter=Counter}=State) ->
135    case lists:keyfind(Key, 1, Hist) of
136        false ->
137            MsgTuple = {Key, Val, misc:now_int()},
138            maybe_send_out_email_alert(Key, Val),
139            {reply, ok, State#state{history=[MsgTuple | Hist],
140                                    queue=[MsgTuple | lists:keydelete(Key, 1, Msgs)],
141                                    change_counter=Counter+1}};
142        _ ->
143            {reply, ignored, State}
144    end.
145
146
147handle_cast(stop, State) ->
148    {stop, normal, State};
149
150handle_cast(_Msg, State) ->
151    {noreply, State}.
152
153-spec is_checker_active(undefined | pid()) -> true | false.
154is_checker_active(undefined) -> false;
155is_checker_active(Pid) ->
156    erlang:is_process_alive(Pid).
157
158handle_info(check_alerts, #state{checker_pid = Pid} = State) ->
159    case is_checker_active(Pid) of
160        true ->
161            {noreply, State};
162        _ ->
163            case misc:flush(check_alerts) of
164                0 -> ok;
165                N ->
166                    ?log_warning("Eaten ~p previously unconsumed check_alerts~n", [N])
167            end,
168            Self = self(),
169            CheckerPid = erlang:spawn_link(fun () ->
170                                                   NewOpaque = do_handle_check_alerts_info(State),
171                                                   Self ! {merge_opaque_from_checker, NewOpaque}
172                                           end),
173            {noreply, State#state{checker_pid = CheckerPid}}
174    end;
175
176handle_info({merge_opaque_from_checker, NewOpaque},
177            #state{history=History} = State) ->
178    {noreply, State#state{opaque = NewOpaque,
179                          history = expire_history(History),
180                          checker_pid = undefined}};
181
182handle_info(_Info, State) ->
183    {noreply, State}.
184
185do_handle_check_alerts_info(#state{history=Hist, opaque=Opaque}) ->
186    BucketNames = ordsets:intersection(lists:sort(ns_memcached:active_buckets()),
187                                       lists:sort(ns_bucket:node_bucket_names(node()))),
188    RawPairs = [{Name, stats_reader:latest(minute, node(), Name, 1)} || Name <- BucketNames],
189    Stats = [{Name, OrdDict}
190             || {Name, {ok, [#stat_entry{values = OrdDict}|_]}} <- RawPairs],
191    check_alerts(Opaque, Hist, Stats).
192
193terminate(_Reason, _State) ->
194    ok.
195
196
197code_change(_OldVsn, State, _Extra) ->
198    {ok, State}.
199
200%% ------------------------------------------------------------------
201%% Internal Function Definitions
202%% ------------------------------------------------------------------
203
204%% @doc Remind myself to check the alert status
205start_timer() ->
206    timer2:send_interval(?SAMPLE_RATE, check_alerts).
207
208
209%% @doc global checks for any server specific problems locally then
210%% broadcast alerts to clients connected to any particular node
211global_checks() ->
212    [oom, ip, write_fail, overhead, disk].
213
214%% @doc fires off various checks
215check_alerts(Opaque, Hist, Stats) ->
216    Fun = fun(X, Dict) -> check(X, Dict, Hist, Stats) end,
217    lists:foldl(Fun, Opaque, global_checks()).
218
219
220%% @doc if listening on a non localhost ip, detect differences between
221%% external listening host and current node host
222-spec check(atom(), dict(), list(), [{atom(),number()}]) -> dict().
223check(ip, Opaque, _History, _Stats) ->
224    {_Name, Host} = misc:node_name_host(node()),
225    case can_listen(Host) of
226        false ->
227            global_alert(ip, fmt_to_bin(errors(ip), [node()]));
228        true ->
229            ok
230    end,
231    Opaque;
232
233%% @doc check the capacity of the drives used for db and log files
234check(disk, Opaque, _History, _Stats) ->
235
236    Mounts = ns_disksup:get_disk_data(),
237
238    UsedPre = [ns_storage_conf:this_node_dbdir(),
239               ns_storage_conf:this_node_ixdir(),
240               ns_storage_conf:this_node_logdir()],
241    UsedFiles = [X || {ok, X} <- UsedPre],
242
243    UsedMountsTmp =
244        [begin {ok, RealFile} = misc:realpath(File, "/"),
245               {ok, Mnt} = ns_storage_conf:extract_disk_stats_for_path(Mounts, RealFile),
246               Mnt
247         end || File <- UsedFiles],
248    UsedMounts = sets:to_list(sets:from_list(UsedMountsTmp)),
249    {value, Config} = ns_config:search(alert_limits),
250    MaxDiskUsed = proplists:get_value(max_disk_used, Config),
251    OverDisks = [ {Disk, Used}
252                  || {Disk, _Cap, Used} <- UsedMounts, Used > MaxDiskUsed],
253
254    Fun = fun({Disk, Used}, Acc) ->
255                  Key = list_to_atom("disk_check_" ++ Disk),
256                  case hit_rate_limit(Key, Acc) of
257                      false ->
258                          {_Sname, Host} = misc:node_name_host(node()),
259                          Err = fmt_to_bin(errors(disk), [Disk, Host, Used]),
260                          global_alert(disk, Err),
261                          dict:store(Key, misc:now_int(), Acc);
262                      true ->
263                          Acc
264                  end
265          end,
266
267    lists:foldl(Fun, Opaque, OverDisks);
268
269%% @doc check how much overhead there is compared to data
270check(overhead, Opaque, _History, Stats) ->
271    [case over_threshold(fetch_bucket_stat(Stats, Bucket, ep_meta_data_memory),
272                         fetch_bucket_stat(Stats, Bucket, ep_max_size)) of
273         {true, X} ->
274             {_Sname, Host} = misc:node_name_host(node()),
275             Err = fmt_to_bin(errors(overhead), [erlang:trunc(X), Bucket, Host]),
276             global_alert({overhead, Bucket}, Err);
277         false  ->
278             ok
279     end || Bucket <- ns_memcached:active_buckets()],
280    Opaque;
281
282%% @doc check for write failures inside ep engine
283check(write_fail, Opaque, _History, Stats) ->
284    check_stat_increased(Stats, ep_item_commit_failed, Opaque);
285
286%% @doc check for any oom errors an any bucket
287check(oom, Opaque, _History, Stats) ->
288    check_stat_increased(Stats, ep_oom_errors, Opaque).
289
290
291%% @doc only check for disk usage if there has been no previous
292%% errors or last error was over the timeout ago
293-spec hit_rate_limit(atom(), dict()) -> true | false.
294hit_rate_limit(Key, Dict) ->
295    case dict:find(Key, Dict) of
296        error ->
297            false;
298        {ok, Value} ->
299            Value + ?DISK_USAGE_TIMEOUT > misc:now_int()
300    end.
301
302
303%% @doc calculate percentage of overhead and if it is over threshold
304-spec over_threshold(integer(), integer()) -> false | {true, float()}.
305over_threshold(_Ep, 0) ->
306    false;
307over_threshold(EpErrs, Max) ->
308    {value, Config} = ns_config:search(alert_limits),
309    MaxOverheadPerc = proplists:get_value(max_overhead_perc, Config),
310    Perc = (EpErrs / Max) * 100,
311    case Perc > MaxOverheadPerc of
312        true -> {true, Perc};
313        false  -> false
314    end.
315
316
317%% @doc Check if the value of any statistic has increased since
318%% last check
319check_stat_increased(Stats, StatName, Opaque) ->
320    New = fetch_buckets_stat(Stats, StatName),
321    case dict:is_key(StatName, Opaque) of
322        false ->
323            dict:store(StatName, New, Opaque);
324        true ->
325            Old = dict:fetch(StatName, Opaque),
326            case stat_increased(New, Old) of
327                [] ->
328                    ok;
329                Buckets ->
330                    {_Sname, Host} = misc:node_name_host(node()),
331                    [global_alert({StatName, Bucket}, fmt_to_bin(errors(StatName), [Bucket, Host]))
332                     || Bucket <- Buckets]
333            end,
334            dict:store(StatName, New, Opaque)
335    end.
336
337
338%% @doc check that I can listen on the current host
339-spec can_listen(string()) -> boolean().
340can_listen(Host) ->
341    case inet:getaddr(Host, inet) of
342        {error, Err} ->
343            ?log_error("Cannot listen due to ~p from inet:getaddr~n", [Err]),
344            false;
345        {ok, IpAddr} ->
346            case gen_udp:open(0, [inet, {ip, IpAddr}]) of
347                {error, ListErr} ->
348                    ?log_error("gen_udp:open(~p) failed due to ~p", [IpAddr, ListErr]),
349                    false;
350                {ok, Socket} ->
351                    gen_udp:close(Socket),
352                    true
353            end
354    end.
355
356
357%% @doc list of buckets thats measured stats have increased
358-spec stat_increased(dict(), dict()) -> list().
359stat_increased(New, Old) ->
360    [Bucket || {Bucket, Val} <- dict:to_list(New), increased(Bucket, Val, Old)].
361
362
363%% @doc fetch a list of a stat for all buckets
364fetch_buckets_stat(Stats, StatName) ->
365    dict:from_list(
366      [{Bucket, fetch_bucket_stat(Stats, Bucket, StatName)}
367       || {Bucket, _OrdDict} <- Stats]
368     ).
369
370
371%% @doc fetch latest value of stat for particular bucket
372fetch_bucket_stat(Stats, Bucket, StatName) ->
373    OrdDict = case orddict:find(Bucket, Stats) of
374                  {ok, KV} ->
375                      KV;
376                  _ ->
377                      []
378              end,
379    case orddict:find(StatName, OrdDict) of
380        {ok, V} -> V;
381        _ -> 0
382    end.
383
384
385%% @doc Server keeps a list of messages to check against sending
386%% the same message repeatedly
387-spec expire_history(list()) -> list().
388expire_history(Hist) ->
389    Now = misc:now_int(),
390    [ {Key, Msg, Time} ||
391        {Key, Msg, Time} <- Hist, Now - Time < ?ALERT_TIMEOUT ].
392
393
394%% @doc Lookup old value and test for increase
395-spec increased(string(), integer(), dict()) -> true | false.
396increased(Key, Val, Dict) ->
397    case dict:find(Key, Dict) of
398        error ->
399            false;
400        {ok, Prev} ->
401            Val > Prev
402    end.
403
404
405%% Format the error message into a binary
406fmt_to_bin(Str, Args) ->
407    list_to_binary(lists:flatten(io_lib:format(Str, Args))).
408
409
410-spec to_str(binary() | string()) -> string().
411to_str(Msg) when is_binary(Msg) ->
412    binary_to_list(Msg);
413to_str(Msg) ->
414    Msg.
415
416extract_alert_key({Key, _Bucket}) ->
417    Key;
418extract_alert_key(Key) ->
419    Key.
420
421maybe_send_out_email_alert({Key0, Node}, Message) ->
422    case Node =:= node() of
423        true ->
424            Key = extract_alert_key(Key0),
425
426            {value, Config} = ns_config:search(email_alerts),
427            case proplists:get_bool(enabled, Config) of
428                true ->
429                    Description = short_description(Key),
430                    ns_mail:send_alert_async(Key, Description, Message, Config);
431                false ->
432                    ok
433            end;
434        false ->
435            ok
436    end.
437
438alert_keys() ->
439    [ip, disk, overhead, ep_oom_errors, ep_item_commit_failed].
440
441%% Cant currently test the alert timeouts as would need to mock
442%% calls to the archiver
443
444run_basic_test_do() ->
445    ?assertEqual(ok, ?MODULE:local_alert({foo, node()}, <<"bar">>)),
446    ?assertMatch({[<<"bar">>], _}, ?MODULE:fetch_alerts()),
447    {[<<"bar">>], Opaque1} = ?MODULE:fetch_alerts(),
448    ?assertMatch({true, {[], _}}, {?MODULE:consume_alerts(Opaque1), ?MODULE:fetch_alerts()}),
449
450    ?assertEqual(ok, ?MODULE:local_alert({bar, node()}, <<"bar">>)),
451    ?assertEqual(ignored, ?MODULE:local_alert({bar, node()}, <<"bar">>)),
452    {[<<"bar">>], Opaque2} = ?MODULE:fetch_alerts(),
453    true = (Opaque1 =/= Opaque2),
454    ?assertEqual(false, ?MODULE:consume_alerts(Opaque1)),
455    ?assertEqual(true, ?MODULE:consume_alerts(Opaque2)),
456    ?assertMatch({[], _}, ?MODULE:fetch_alerts()),
457
458    ?assertEqual(ok, ?MODULE:global_alert(fu, <<"bar">>)),
459    ?assertEqual(ok, ?MODULE:global_alert(fu, <<"bar">>)),
460    ?assertMatch({[<<"bar">>], _}, ?MODULE:fetch_alerts()).
461
462basic_test() ->
463    {ok, Pid} = ?MODULE:start_link(),
464
465    %% return empty alerts configuration so that no attempts to send anything
466    %% are performed
467    ns_config:test_setup([{email_alerts, []}]),
468
469    try
470        run_basic_test_do()
471    after
472        erlang:unlink(Pid),
473        exit(Pid, shutdown),
474        misc:wait_for_process(Pid, infinity)
475    end.
476