1% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2% use this file except in compliance with the License. You may obtain a copy of
3% the License at
4%
5%   http://www.apache.org/licenses/LICENSE-2.0
6%
7% Unless required by applicable law or agreed to in writing, software
8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10% License for the specific language governing permissions and limitations under
11% the License.
12
13-module(couch_work_queue).
14-behaviour(gen_server).
15
16-include("couch_db.hrl").
17
18% public API
19-export([new/1, queue/2, dequeue/1, dequeue/2, close/1, item_count/1, size/1]).
20
21% gen_server callbacks
22-export([init/1, terminate/2]).
23-export([handle_call/3, handle_cast/2, code_change/3, handle_info/2]).
24
25-record(q, {
26    queue = queue:new(),
27    blocked = [],
28    max_size,
29    max_items,
30    items = 0,
31    size = 0,
32    work_waiters = [],
33    close_on_dequeue = false,
34    multi_workers = false
35}).
36
37
38new(Options) ->
39    gen_server:start_link(couch_work_queue, Options, []).
40
41
42queue(Wq, Item) when is_binary(Item) ->
43    gen_server:call(Wq, {queue, Item, byte_size(Item)}, infinity);
44queue(Wq, Item) ->
45    gen_server:call(Wq, {queue, Item, ?term_size(Item)}, infinity).
46
47
48dequeue(Wq) ->
49    dequeue(Wq, all).
50
51
52dequeue(Wq, MaxItems) ->
53    try
54        gen_server:call(Wq, {dequeue, MaxItems}, infinity)
55    catch
56        _:_ -> closed
57    end.
58
59
60item_count(Wq) ->
61    try
62        gen_server:call(Wq, item_count, infinity)
63    catch
64        _:_ -> closed
65    end.
66
67
68size(Wq) ->
69    try
70        gen_server:call(Wq, size, infinity)
71    catch
72        _:_ -> closed
73    end.
74
75
76close(Wq) ->
77    gen_server:cast(Wq, close).
78
79
80init(Options) ->
81    Q = #q{
82        max_size = couch_util:get_value(max_size, Options, nil),
83        max_items = couch_util:get_value(max_items, Options, nil),
84        multi_workers = couch_util:get_value(multi_workers, Options, false)
85    },
86    {ok, Q}.
87
88
89terminate(_Reason, #q{work_waiters=Workers}) ->
90    lists:foreach(fun({W, _}) -> gen_server:reply(W, closed) end, Workers).
91
92
93handle_call({queue, Item, Size}, From, #q{work_waiters = []} = Q0) ->
94    Q = Q0#q{size = Q0#q.size + Size, % increment_queue_size(Q0, Item),
95                items = Q0#q.items + 1,
96                queue = queue:in({Item, Size}, Q0#q.queue)},
97    case (Q#q.size >= Q#q.max_size) orelse
98            (Q#q.items >= Q#q.max_items) of
99    true ->
100        {noreply, Q#q{blocked = [From | Q#q.blocked]}};
101    false ->
102        {reply, ok, Q}
103    end;
104
105handle_call({queue, Item, Size}, _From, #q{work_waiters = [{W, _Max} | Rest]} = Q) ->
106    gen_server:reply(W, {ok, [Item], Size}),
107    {reply, ok, Q#q{work_waiters = Rest}};
108
109handle_call({dequeue, Max}, From, Q) ->
110    #q{work_waiters = Workers, multi_workers = Multi, items = Count} = Q,
111    case {Workers, Multi} of
112    {[_ | _], false} ->
113        exit("Only one caller allowed to wait for this work at a time");
114    {[_ | _], true} ->
115        {noreply, Q#q{work_waiters=Workers ++ [{From, Max}]}};
116    _ ->
117        case Count of
118        0 ->
119            {noreply, Q#q{work_waiters=Workers ++ [{From, Max}]}};
120        C when C > 0 ->
121            deliver_queue_items(Max, Q)
122        end
123    end;
124
125handle_call(item_count, _From, Q) ->
126    {reply, Q#q.items, Q};
127
128handle_call(size, _From, Q) ->
129    {reply, Q#q.size, Q}.
130
131
132deliver_queue_items(Max, Q) ->
133    #q{
134        queue = Queue,
135        items = Count,
136        size = Size,
137        close_on_dequeue = Close,
138        blocked = Blocked
139    } = Q,
140    case (Max =:= all) orelse (Max >= Count) of
141    false ->
142        {Items, Size2, Queue2, Blocked2} = dequeue_items(
143            Max, Size, Queue, Blocked, []),
144        Q2 = Q#q{
145            items = Count - Max, size = Size2, blocked = Blocked2, queue = Queue2
146        },
147        {reply, {ok, Items, Size - Size2}, Q2};
148    true ->
149        lists:foreach(fun(F) -> gen_server:reply(F, ok) end, Blocked),
150        Q2 = Q#q{items = 0, size = 0, blocked = [], queue = queue:new()},
151        Items = [Item || {Item, _} <- queue:to_list(Queue)],
152        case Close of
153        false ->
154            {reply, {ok, Items, Size}, Q2};
155        true ->
156            {stop, normal, {ok, Items, Size}, Q2}
157        end
158    end.
159
160
161dequeue_items(0, Size, Queue, Blocked, DequeuedAcc) ->
162    {lists:reverse(DequeuedAcc), Size, Queue, Blocked};
163
164dequeue_items(NumItems, Size, Queue, Blocked, DequeuedAcc) ->
165    {{value, {Item, ItemSize}}, Queue2} = queue:out(Queue),
166    case Blocked of
167    [] ->
168        Blocked2 = Blocked;
169    [From | Blocked2] ->
170        gen_server:reply(From, ok)
171    end,
172    dequeue_items(
173        NumItems - 1, Size - ItemSize, Queue2, Blocked2, [Item | DequeuedAcc]).
174
175
176handle_cast(close, #q{items = 0} = Q) ->
177    {stop, normal, Q};
178
179handle_cast(close, Q) ->
180    {noreply, Q#q{close_on_dequeue = true}}.
181
182
183code_change(_OldVsn, State, _Extra) ->
184    {ok, State}.
185
186handle_info(X, Q) ->
187    {stop, X, Q}.
188