1% -*- Mode: Erlang; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3% Licensed under the Apache License, Version 2.0 (the "License"); you may not
4% use this file except in compliance with the License. You may obtain a copy of
5% the License at
6%
7%   http://www.apache.org/licenses/LICENSE-2.0
8%
9% Unless required by applicable law or agreed to in writing, software
10% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12% License for the specific language governing permissions and limitations under
13% the License.
14
15-module(couch_set_view_util).
16
17-export([expand_dups/2, expand_dups/3, partitions_map/2]).
18-export([build_bitmask/1, decode_bitmask/1]).
19-export([make_btree_purge_fun/1]).
20-export([get_ddoc_ids_with_sig/2]).
21-export([open_raw_read_fd/1, close_raw_read_fd/1]).
22-export([compute_indexed_bitmap/1, cleanup_group/1]).
23-export([missing_changes_count/2]).
24-export([is_initial_build/1]).
25-export([new_sort_file_path/2, delete_sort_files/2]).
26-export([parse_values/1, parse_reductions/1, parse_view_id_keys/1]).
27-export([split_set_db_name/1]).
28-export([group_to_header_bin/1, header_bin_sig/1, header_bin_to_term/1]).
29-export([get_part_seq/2, has_part_seq/2, find_part_seq/2]).
30-export([check_primary_key_size/5, check_primary_value_size/5]).
31-export([refresh_viewgroup_header/1]).
32-export([shutdown_cleaner/2, shutdown_wait/1]).
33-export([try_read_line/1]).
34-export([send_group_header/2, receive_group_header/3]).
35-export([remove_group_views/2, update_group_views/3]).
36-export([send_group_info/2]).
37-export([filter_seqs/2]).
38-export([log_port_error/3]).
39-export([fix_partitions/2]).
40
41
42-include("couch_db.hrl").
43-include_lib("couch_set_view/include/couch_set_view.hrl").
44
45
46parse_values(Values) ->
47    parse_values(Values, []).
48
49parse_values(<<>>, Acc) ->
50    lists:reverse(Acc);
51parse_values(<<ValLen:24, Val:ValLen/binary, ValueRest/binary>>, Acc) ->
52    parse_values(ValueRest, [Val | Acc]).
53
54
55parse_len_keys(0, Rest, AccKeys) ->
56    {AccKeys, Rest};
57parse_len_keys(NumKeys, <<Len:16, Key:Len/binary, Rest/binary>>, AccKeys) ->
58    parse_len_keys(NumKeys - 1, Rest, [Key | AccKeys]).
59
60
61parse_view_id_keys(<<>>) ->
62    [];
63parse_view_id_keys(<<ViewId:8, NumKeys:16, LenKeys/binary>>) ->
64    {Keys, Rest} = parse_len_keys(NumKeys, LenKeys, []),
65    [{ViewId, Keys} | parse_view_id_keys(Rest)].
66
67
68parse_reductions(<<>>) ->
69    [];
70parse_reductions(<<Size:16, Red:Size/binary, Rest/binary>>) ->
71    [Red | parse_reductions(Rest)].
72
73
74expand_dups([], Acc) ->
75    lists:reverse(Acc);
76expand_dups([KV | Rest], Acc) ->
77    {BinKeyDocId, <<PartId:16, ValuesBin/binary>>} = KV,
78    Vals = parse_values(ValuesBin),
79    Expanded = [{BinKeyDocId, <<PartId:16, Val/binary>>} || Val <- Vals],
80    expand_dups(Rest, Expanded ++ Acc).
81
82
83expand_dups([], _Abitmask, Acc) ->
84    lists:reverse(Acc);
85expand_dups([KV | Rest], Abitmask, Acc) ->
86    {BinKeyDocId, <<PartId:16, ValuesBin/binary>>} = KV,
87    case (1 bsl PartId) band Abitmask of
88    0 ->
89        expand_dups(Rest, Abitmask, Acc);
90    _ ->
91        Values = parse_values(ValuesBin),
92        Expanded = lists:map(fun(Val) ->
93            {BinKeyDocId, <<PartId:16, Val/binary>>}
94        end, Values),
95        expand_dups(Rest, Abitmask, Expanded ++ Acc)
96    end.
97
98
99-spec partitions_map([{term(), {partition_id(), term()}}], bitmask()) -> bitmask().
100partitions_map([], BitMap) ->
101    BitMap;
102partitions_map([{_Key, <<PartitionId:16, _Val/binary>>} | RestKvs], BitMap) ->
103    partitions_map(RestKvs, BitMap bor (1 bsl PartitionId)).
104
105
106-spec build_bitmask([partition_id()]) -> bitmask().
107build_bitmask(ActiveList) ->
108    build_bitmask(ActiveList, 0).
109
110-spec build_bitmask([partition_id()], bitmask()) -> bitmask().
111build_bitmask([], Acc) ->
112    Acc;
113build_bitmask([PartId | Rest], Acc) when is_integer(PartId), PartId >= 0 ->
114    build_bitmask(Rest, (1 bsl PartId) bor Acc).
115
116
117-spec decode_bitmask(bitmask()) -> ordsets:ordset(partition_id()).
118decode_bitmask(Bitmask) ->
119    decode_bitmask(Bitmask, 0).
120
121-spec decode_bitmask(bitmask(), partition_id()) -> [partition_id()].
122decode_bitmask(0, _) ->
123    [];
124decode_bitmask(Bitmask, PartId) ->
125    case Bitmask band 1 of
126    1 ->
127        [PartId | decode_bitmask(Bitmask bsr 1, PartId + 1)];
128    0 ->
129        decode_bitmask(Bitmask bsr 1, PartId + 1)
130    end.
131
132
133-spec make_btree_purge_fun(#set_view_group{}) -> set_view_btree_purge_fun().
134make_btree_purge_fun(Group) when ?set_cbitmask(Group) =/= 0 ->
135    fun(branch, Value, {go, Acc}) ->
136            receive
137            stop ->
138                {stop, {stop, Acc}}
139            after 0 ->
140                btree_purge_fun(branch, Value, {go, Acc}, ?set_cbitmask(Group))
141            end;
142        (value, Value, {go, Acc}) ->
143            btree_purge_fun(value, Value, {go, Acc}, ?set_cbitmask(Group))
144    end.
145
146btree_purge_fun(value, {_K, <<PartId:16, _/binary>>}, {go, Acc}, Cbitmask) ->
147    Mask = 1 bsl PartId,
148    case (Cbitmask band Mask) of
149    Mask ->
150        {purge, {go, Acc + 1}};
151    0 ->
152        {keep, {go, Acc}}
153    end;
154btree_purge_fun(branch, Red, {go, Acc}, Cbitmask) ->
155     <<Count:40, Bitmap:?MAX_NUM_PARTITIONS, _Reds/binary>> = Red,
156    case Bitmap band Cbitmask of
157    0 ->
158        {keep, {go, Acc}};
159    Bitmap ->
160        {purge, {go, Acc + Count}};
161    _ ->
162        {partial_purge, {go, Acc}}
163    end.
164
165
166-spec get_ddoc_ids_with_sig(binary(), #set_view_group{}) -> [binary()].
167get_ddoc_ids_with_sig(SetName, Group) ->
168    #set_view_group{
169        sig = Sig,
170        name = FirstDDocId,
171        category = Category,
172        mod = Mod
173    } = Group,
174    NameToSigEts = Mod:name_to_sig_ets(Category),
175    case ets:match_object(NameToSigEts, {SetName, {'$1', Sig}}) of
176    [] ->
177        % ets just got updated because view group died
178        [FirstDDocId];
179    Matching ->
180        [DDocId || {_SetName, {DDocId, _Sig}} <- Matching]
181    end.
182
183
184-spec open_raw_read_fd(#set_view_group{}) -> 'ok'.
185open_raw_read_fd(Group) ->
186    #set_view_group{
187        fd = FilePid,
188        filepath = FileName,
189        set_name = SetName,
190        type = Type,
191        name = DDocId
192    } = Group,
193    case file2:open(FileName, [read, raw, binary]) of
194    {ok, RawReadFd} ->
195        erlang:put({FilePid, fast_fd_read}, RawReadFd),
196        ok;
197    {error, Reason} ->
198        ?LOG_INFO("Warning, could not open raw fd for fast reads for "
199            "~s view group `~s`, set `~s`: ~s",
200            [Type, DDocId, SetName, file:format_error(Reason)]),
201        ok
202    end.
203
204
205-spec close_raw_read_fd(#set_view_group{}) -> 'ok'.
206close_raw_read_fd(#set_view_group{fd = FilePid}) ->
207    case erlang:erase({FilePid, fast_fd_read}) of
208    undefined ->
209        ok;
210    Fd ->
211        ok = file:close(Fd)
212    end.
213
214
215-spec compute_indexed_bitmap(#set_view_group{}) -> bitmap().
216compute_indexed_bitmap(#set_view_group{id_btree = IdBtree, views = Views, mod = Mod}) ->
217    compute_indexed_bitmap(Mod, IdBtree, Views).
218
219compute_indexed_bitmap(Mod, IdBtree, Views) ->
220    {ok, <<_Count:40, IdBitmap:?MAX_NUM_PARTITIONS>>} = couch_btree:full_reduce(IdBtree),
221    lists:foldl(fun(View, AccMap) ->
222        Bm = Mod:view_bitmap(View#set_view.indexer),
223        AccMap bor Bm
224    end,
225    IdBitmap, Views).
226
227
228-spec cleanup_group(#set_view_group{}) -> {'ok', #set_view_group{}, non_neg_integer()}.
229cleanup_group(Group) when ?set_cbitmask(Group) == 0 ->
230    {ok, Group, 0};
231cleanup_group(Group) ->
232    #set_view_group{mod = Mod} = Group,
233    case Mod of
234    mapreduce_view ->
235        Mod:cleanup_view_group(Group);
236    _ ->
237        cleanup_group(Mod, Group)
238    end.
239
240
241cleanup_group(Mod, Group) ->
242    #set_view_group{
243        index_header = Header,
244        id_btree = IdBtree,
245        views = Views
246    } = Group,
247    PurgeFun = make_btree_purge_fun(Group),
248    ok = couch_set_view_util:open_raw_read_fd(Group),
249    {ok, NewIdBtree, {_Go, IdPurgedCount}} =
250        couch_btree:guided_purge(IdBtree, PurgeFun, {go, 0}),
251    CleanupParts = couch_set_view_util:decode_bitmask(?set_cbitmask(Group)),
252    {ViewsPurgeCount, NewViews} = Mod:clean_views(Views, CleanupParts),
253    TotalPurgedCount = IdPurgedCount + ViewsPurgeCount,
254    ok = couch_set_view_util:close_raw_read_fd(Group),
255    % XXX vmx 2014-07-30: Currently the IndexedBitmap is always 0. This works
256    % for now, but should be investigated later on
257    IndexedBitmap = compute_indexed_bitmap(Mod, NewIdBtree, NewViews),
258    Group2 = Group#set_view_group{
259        id_btree = NewIdBtree,
260        views = NewViews,
261        index_header = Header#set_view_index_header{
262            cbitmask = ?set_cbitmask(Group) band IndexedBitmap,
263            id_btree_state = couch_btree:get_state(NewIdBtree),
264            view_states = [Mod:get_state(V#set_view.indexer) || V <- NewViews]
265        }
266    },
267    ok = couch_file:flush(Group#set_view_group.fd),
268    {ok, Group2, TotalPurgedCount}.
269
270
271-spec missing_changes_count(partition_seqs(), partition_seqs()) -> non_neg_integer().
272missing_changes_count(CurSeqs, NewSeqs) ->
273    missing_changes_count(CurSeqs, NewSeqs, 0).
274
275missing_changes_count([], _NewSeqs, MissingCount) ->
276    MissingCount;
277missing_changes_count([{Part, CurSeq} | RestCur], NewSeqs, Acc) ->
278    NewSeq = couch_util:get_value(Part, NewSeqs, 0),
279    Diff = CurSeq - NewSeq,
280    case Diff > 0 of
281    true ->
282        missing_changes_count(RestCur, NewSeqs, Acc + Diff);
283    false ->
284        missing_changes_count(RestCur, NewSeqs, Acc)
285    end.
286
287
288-spec is_initial_build(#set_view_group{}) -> boolean().
289is_initial_build(Group) ->
290    Predicate = fun({_PartId, Seq}) -> Seq == 0 end,
291    % If there are no persisted items, the initial index build will
292    % create and empty index with partition versions updated. Hence use the
293    % partition versions as well to determine whether it is an initial index
294    % build or not.
295    PartitionVersionsPredicate =
296        fun({_PartId, PartitionVersion}) -> PartitionVersion =:= [{0, 0}] end,
297    lists:all(Predicate, ?set_seqs(Group)) andalso
298        lists:all(Predicate, ?set_unindexable_seqs(Group)) andalso
299        lists:all(PartitionVersionsPredicate, ?set_partition_versions(Group)).
300
301
302-spec new_sort_file_path(string(), 'updater' | 'compactor') -> string().
303new_sort_file_path(RootDir, updater) ->
304    do_new_sort_file_path(RootDir, ".sort");
305new_sort_file_path(RootDir, compactor) ->
306    do_new_sort_file_path(RootDir, ".compact").
307
308do_new_sort_file_path(RootDir, Type) ->
309    Base = ?b2l(couch_uuids:new()) ++ Type,
310    Path = filename:join([RootDir, Base]),
311    ok = file2:ensure_dir(Path),
312    Path.
313
314
315-spec delete_sort_files(string(), 'all' | 'updater' | 'compactor') -> 'ok'.
316delete_sort_files(RootDir, all) ->
317    do_delete_sort_files(RootDir, "");
318delete_sort_files(RootDir, updater) ->
319    do_delete_sort_files(RootDir, ".sort");
320delete_sort_files(RootDir, compactor) ->
321    do_delete_sort_files(RootDir, ".compact").
322
323do_delete_sort_files(RootDir, Suffix) ->
324    WildCard = filename:join([RootDir, "*" ++ Suffix]),
325    lists:foreach(
326        fun(F) ->
327             ?LOG_INFO("Deleting temporary file ~s", [F]),
328            _ = file2:delete(F)
329        end,
330        filelib:wildcard(WildCard)).
331
332
333-spec split_set_db_name(string() | binary()) ->
334                               {'ok', SetName::binary(), Partition::master} |
335                               {'ok', SetName::binary(), Partition::non_neg_integer()} |
336                               'error'.
337split_set_db_name(DbName) when is_binary(DbName) ->
338    split_set_db_name(?b2l(DbName));
339split_set_db_name(DbName) ->
340    Len = length(DbName),
341    case string:rchr(DbName, $/) of
342    Pos when (Pos > 0), (Pos < Len) ->
343        {SetName, [$/ | Partition]} = lists:split(Pos - 1, DbName),
344        case Partition of
345        "master" ->
346            {ok, ?l2b(SetName), master};
347        _ ->
348            case (catch list_to_integer(Partition)) of
349            Id when is_integer(Id), Id >= 0 ->
350                {ok, ?l2b(SetName), Id};
351            _ ->
352                error
353            end
354        end;
355    _ ->
356        error
357    end.
358
359
360-spec group_to_header_bin(#set_view_group{}) -> binary().
361group_to_header_bin(#set_view_group{index_header = Header, sig = Sig}) ->
362    #set_view_index_header{
363        version = Version,
364        num_partitions = NumParts,
365        abitmask = Abitmask,
366        pbitmask = Pbitmask,
367        cbitmask = Cbitmask,
368        seqs = Seqs,
369        id_btree_state = IdBtreeState,
370        view_states = ViewStates,
371        has_replica = HasReplica,
372        replicas_on_transfer = RepsOnTransfer,
373        pending_transition = PendingTrans,
374        unindexable_seqs = Unindexable,
375        partition_versions = PartVersions
376    } = Header,
377    ViewStatesBin = lists:foldl(
378        fun(State, Acc) ->
379            <<Acc/binary, (view_state_to_bin(State))/binary>>
380        end,
381        <<>>, ViewStates),
382    Base = <<
383             Version:8,
384             NumParts:16,
385             Abitmask:?MAX_NUM_PARTITIONS,
386             Pbitmask:?MAX_NUM_PARTITIONS,
387             Cbitmask:?MAX_NUM_PARTITIONS,
388             (length(Seqs)):16, (seqs_to_bin(Seqs, <<>>))/binary,
389             (view_state_to_bin(IdBtreeState))/binary,
390             (length(ViewStates)):8, ViewStatesBin/binary,
391             (bool_to_bin(HasReplica))/binary,
392             (length(RepsOnTransfer)):16, (partitions_to_bin(RepsOnTransfer, <<>>))/binary,
393             (pending_trans_to_bin(PendingTrans))/binary,
394             (length(Unindexable)):16, (seqs_to_bin(Unindexable, <<>>))/binary,
395             (length(PartVersions)):16, (partition_versions_to_bin(PartVersions, <<>>))/binary
396           >>,
397    <<Sig/binary, (couch_compress:compress(Base))/binary>>.
398
399
400-spec header_bin_sig(binary()) -> binary().
401header_bin_sig(<<Sig:16/binary, _/binary>>) ->
402    % signature is a md5 digest, always 16 bytes
403    Sig.
404
405
406-spec header_bin_to_term(binary()) -> #set_view_index_header{}.
407header_bin_to_term(HeaderBin) ->
408    <<_Signature:16/binary, HeaderBaseCompressed/binary>> = HeaderBin,
409    Base = couch_compress:decompress(HeaderBaseCompressed),
410    <<
411      Version0:8,
412      NumParts:16,
413      Abitmask:?MAX_NUM_PARTITIONS,
414      Pbitmask:?MAX_NUM_PARTITIONS,
415      Cbitmask:?MAX_NUM_PARTITIONS,
416      NumSeqs:16,
417      Rest/binary
418    >> = Base,
419    {Seqs, Rest2} = bin_to_seqs(NumSeqs, Rest, []),
420    <<
421      IdBtreeStateSize:16,
422      IdBtreeStateBin:IdBtreeStateSize/binary,
423      NumViewStates:8,
424      Rest3/binary
425    >> = Rest2,
426    IdBtreeState = case IdBtreeStateBin of
427    <<>> ->
428        nil;
429    _ ->
430        IdBtreeStateBin
431    end,
432    {ViewStates, Rest4} = bin_to_view_states(NumViewStates, Rest3, []),
433    <<
434      HasReplica:8,
435      NumReplicasOnTransfer:16,
436      Rest5/binary
437    >> = Rest4,
438    {ReplicasOnTransfer, Rest6} = bin_to_partitions(NumReplicasOnTransfer, Rest5, []),
439    {PendingTrans, Rest7} = bin_to_pending_trans(Rest6),
440    <<
441      UnindexableCount:16,
442      Rest8/binary
443    >> = Rest7,
444    {Unindexable, Rest9} = bin_to_seqs(UnindexableCount, Rest8, []),
445    case Version0 of
446    1 ->
447        ?LOG_INFO("Upgrading index header from Couchbase 2.x to 3.x", []),
448        Version = 2,
449        PartVersions = nil;
450    _ ->
451        Version = Version0,
452        <<NumPartVersions:16, Rest10/binary>> = Rest9,
453        {PartVersions, <<>>} = bin_to_partition_versions(
454            NumPartVersions, Rest10, [])
455    end,
456    #set_view_index_header{
457        version = Version,
458        num_partitions = NumParts,
459        abitmask = Abitmask,
460        pbitmask = Pbitmask,
461        cbitmask = Cbitmask,
462        seqs = Seqs,
463        id_btree_state = IdBtreeState,
464        view_states = ViewStates,
465        has_replica = case HasReplica of 1 -> true; 0 -> false end,
466        replicas_on_transfer = ReplicasOnTransfer,
467        pending_transition = PendingTrans,
468        unindexable_seqs = Unindexable,
469        partition_versions = PartVersions
470    }.
471
472
473view_state_to_bin(nil) ->
474    <<0:16>>;
475view_state_to_bin(BinState) ->
476    StateSize = byte_size(BinState),
477    case StateSize >= (1 bsl 16) of
478    true ->
479        throw({too_large_view_state, StateSize});
480    false ->
481        <<StateSize:16, BinState/binary>>
482    end.
483
484
485bool_to_bin(true) ->
486    <<1:8>>;
487bool_to_bin(false) ->
488    <<0:8>>.
489
490
491seqs_to_bin([], Acc) ->
492    Acc;
493seqs_to_bin([{P, S} | Rest], Acc) ->
494    seqs_to_bin(Rest, <<Acc/binary, P:16, S:48>>).
495
496
497partitions_to_bin([], Acc) ->
498    Acc;
499partitions_to_bin([P | Rest], Acc) ->
500    partitions_to_bin(Rest, <<Acc/binary, P:16>>).
501
502
503pending_trans_to_bin(nil) ->
504    <<0:16, 0:16, 0:16>>;
505pending_trans_to_bin(#set_view_transition{active = A, passive = P, unindexable = U}) ->
506    <<(length(A)):16, (partitions_to_bin(A, <<>>))/binary,
507      (length(P)):16, (partitions_to_bin(P, <<>>))/binary,
508      (length(U)):16, (partitions_to_bin(U, <<>>))/binary>>.
509
510
511partition_versions_to_bin([], Acc) ->
512    Acc;
513partition_versions_to_bin([{P, F} | Rest], Acc0) ->
514    Bin = failoverlog_to_bin(F, <<>>),
515    Acc = <<Acc0/binary, P:16, (length(F)):16, Bin/binary>>,
516    partition_versions_to_bin(Rest, Acc).
517
518failoverlog_to_bin([], Acc) ->
519    Acc;
520failoverlog_to_bin([{Uuid, Seq}| Rest], Acc) ->
521    failoverlog_to_bin(Rest, <<Acc/binary, Uuid:64/integer, Seq:64>>).
522
523
524bin_to_pending_trans(<<NumActive:16, Rest/binary>>) ->
525    {Active, Rest2} = bin_to_partitions(NumActive, Rest, []),
526    <<NumPassive:16, Rest3/binary>> = Rest2,
527    {Passive, Rest4} = bin_to_partitions(NumPassive, Rest3, []),
528    <<NumUnindexable:16, Rest5/binary>> = Rest4,
529    {Unindexable, Rest6} = bin_to_partitions(NumUnindexable, Rest5, []),
530    case (Active == []) andalso (Passive == []) of
531    true ->
532        0 = NumUnindexable,
533        {nil, Rest6};
534    false ->
535        Trans = #set_view_transition{
536            active = Active,
537            passive = Passive,
538            unindexable = Unindexable
539        },
540        {Trans, Rest6}
541    end.
542
543
544bin_to_seqs(0, Rest, Acc) ->
545    {lists:reverse(Acc), Rest};
546bin_to_seqs(N, <<P:16, S:48, Rest/binary>>, Acc) ->
547    bin_to_seqs(N - 1, Rest, [{P, S} | Acc]).
548
549
550bin_to_view_states(0, Rest, Acc) ->
551    {lists:reverse(Acc), Rest};
552bin_to_view_states(NumViewStates, <<Sz:16, State:Sz/binary, Rest/binary>>, Acc) ->
553    case State of
554    <<>> ->
555        bin_to_view_states(NumViewStates - 1, Rest, [nil | Acc]);
556    _ ->
557        bin_to_view_states(NumViewStates - 1, Rest, [State | Acc])
558    end.
559
560
561bin_to_partitions(0, Rest, Acc) ->
562    {lists:reverse(Acc), Rest};
563bin_to_partitions(Count, <<P:16, Rest/binary>>, Acc) ->
564    bin_to_partitions(Count - 1, Rest, [P | Acc]).
565
566
567bin_to_partition_versions(0, Rest, Acc) ->
568    {lists:reverse(Acc), Rest};
569bin_to_partition_versions(Count, <<P:16, NumFailoverLog:16, Rest0/binary>>,
570        Acc) ->
571    {FailoverLog, Rest} = bin_to_failoverlog(NumFailoverLog, Rest0, []),
572    bin_to_partition_versions(Count - 1, Rest, [{P, FailoverLog} | Acc]).
573
574bin_to_failoverlog(0, Rest, Acc) ->
575    {lists:reverse(Acc), Rest};
576bin_to_failoverlog(Count, <<Uuid:64/integer, Seq:64, Rest/binary>>, Acc) ->
577    bin_to_failoverlog(Count - 1, Rest, [{Uuid, Seq} | Acc]).
578
579
580-spec get_part_seq(partition_id(), partition_seqs()) -> update_seq().
581get_part_seq(PartId, Seqs) ->
582    case lists:keyfind(PartId, 1, Seqs) of
583    {PartId, Seq} ->
584        Seq;
585    false ->
586        throw({missing_partition, PartId})
587    end.
588
589
590-spec has_part_seq(partition_id(), partition_seqs()) -> boolean().
591has_part_seq(PartId, Seqs) ->
592    case lists:keyfind(PartId, 1, Seqs) of
593    {PartId, _} ->
594        true;
595    false ->
596        false
597    end.
598
599
600-spec find_part_seq(partition_id(), partition_seqs()) ->
601                           {'ok', update_seq()} | 'not_found'.
602find_part_seq(PartId, Seqs) ->
603    case lists:keyfind(PartId, 1, Seqs) of
604    {PartId, Seq} ->
605        {ok, Seq};
606    false ->
607        not_found
608    end.
609
610
611-spec check_primary_key_size(binary(), pos_integer(), binary() | [number()],
612        binary(), #set_view_group{}) -> ok.
613check_primary_key_size(Bin, Max, Key, DocId, Group) when byte_size(Bin) > Max ->
614    #set_view_group{set_name = SetName, name = DDocId, type = Type} = Group,
615    KeyPrefix = lists:sublist(unicode:characters_to_list(Key), 100),
616    Error = iolist_to_binary(
617        io_lib:format("key emitted for document `~s` is too long: ~s... (~p bytes)",
618                      [DocId, KeyPrefix, byte_size(Bin)])),
619    ?LOG_MAPREDUCE_ERROR("Bucket `~s`, ~s group `~s`, ~s",
620                         [SetName, Type, DDocId, Error]),
621    throw({error, Error});
622check_primary_key_size(_Bin, _Max, _Key, _DocId, _Group) ->
623    ok.
624
625
626-spec check_primary_value_size(binary(), pos_integer(), binary() | [number()],
627        binary(), #set_view_group{}) -> ok.
628check_primary_value_size(Bin, Max, Key, DocId, Group) when byte_size(Bin) > Max ->
629    #set_view_group{set_name = SetName, name = DDocId, type = Type} = Group,
630    Error = iolist_to_binary(
631        io_lib:format("value emitted for key `~s`, document `~s`, is too big"
632                      " (~p bytes)", [Key, DocId, byte_size(Bin)])),
633    ?LOG_MAPREDUCE_ERROR("Bucket `~s`, ~s group `~s`, ~s",
634                         [SetName, Type, DDocId, Error]),
635    throw({error, Error});
636check_primary_value_size(_Bin, _Max, _Key, _DocId, _Group) ->
637    ok.
638
639
640% Read latest header from index file and update viewgroup
641-spec refresh_viewgroup_header(#set_view_group{}) -> {'ok', #set_view_group{}}.
642refresh_viewgroup_header(Group) ->
643    #set_view_group{
644        fd = Fd,
645        mod = Mod,
646        id_btree = IdBtree,
647        views = Views
648    } = Group,
649    ok = couch_file:refresh_eof(Fd),
650    {ok, HeaderBin, NewHeaderPos} = couch_file:read_header_bin(Fd),
651    NewHeader = couch_set_view_util:header_bin_to_term(HeaderBin),
652    #set_view_index_header{
653        id_btree_state = IdBtreeState,
654        view_states = ViewStates
655    } = NewHeader,
656    NewIdBtree = couch_btree:set_state(IdBtree, IdBtreeState),
657    NewViews = lists:zipwith(fun(NewState, SetView) ->
658        View = SetView#set_view.indexer,
659        NewView = Mod:set_state(View, NewState),
660        SetView#set_view{indexer = NewView}
661    end, ViewStates, Views),
662    NewGroup = Group#set_view_group{
663        header_pos = NewHeaderPos,
664        id_btree = NewIdBtree,
665        views = NewViews,
666        index_header = NewHeader
667    },
668    {ok, NewGroup}.
669
670
671% Stop cleaner process synchronously
672-spec shutdown_cleaner(#set_view_group{}, pid()) -> 'ok'.
673shutdown_cleaner(#set_view_group{mod = Mod}, Pid) ->
674    case Mod of
675    % For mapreduce view, we have already sent native process stop message
676    % Just wait for the process to die
677    mapreduce_view ->
678        receive
679        {'EXIT', Pid, _} ->
680            ok;
681        {'DOWN', _, process, Pid, _} ->
682            ok
683        end;
684    _ ->
685        couch_util:shutdown_sync(Pid)
686    end.
687
688
689-spec try_read_line(binary()) -> {binary() | nil, binary()}.
690try_read_line(Data) ->
691    case binary:split(Data, <<"\n">>) of
692    [Line, Rest] ->
693        {Line, Rest};
694    [Rest] ->
695        {nil, Rest}
696    end.
697
698
699% Send binary group header data to a external process via stdin
700-spec send_group_header(#set_view_group{}, port()) -> 'ok'.
701send_group_header(Group, Port) ->
702    HeaderBin = couch_set_view_util:group_to_header_bin(Group),
703    Len = integer_to_list(byte_size(HeaderBin)),
704    true = port_command(Port, [Len, $\n, HeaderBin]),
705    ok.
706
707
708% Read group header from stdout of external process
709-spec receive_group_header(port(), integer(), binary()) ->
710    {'ok', binary(), binary()} | {'error', term(), binary()}.
711receive_group_header(Port, Len, HeaderAcc) ->
712    case byte_size(HeaderAcc) of
713    Sz when Sz >= Len + 1 ->
714        HeaderBin = binary:part(HeaderAcc, 0, Len),
715        % Remaining data excluding a \n character
716        Remaining = binary:part(HeaderAcc, Len + 1, Sz - Len - 1),
717        {ok, HeaderBin, Remaining};
718    _ ->
719        receive
720        {Port, {data, Data}} ->
721            receive_group_header(Port, Len, iolist_to_binary([HeaderAcc, Data]));
722        {Port, {exit_status, 0}} ->
723            self() ! {Port, {exit_status, 0}},
724            receive_group_header(Port, Len, HeaderAcc);
725        {Port, Others} ->
726            {error, {Port, Others}, HeaderAcc}
727        end
728    end.
729
730
731% Send stop message to the process and wait for it to exit gracefully
732% This is similar to couch_util:shutdown_sync(Pid)
733% Instead of force kill, sends stop message
734-spec shutdown_wait(pid() | nil) -> 'ok'.
735shutdown_wait(nil) ->
736    ok;
737shutdown_wait(Pid) ->
738    MRef = erlang:monitor(process, Pid),
739    try
740        unlink(Pid),
741        Pid ! stop,
742        receive
743        {'DOWN', MRef, _, _, _} ->
744            receive
745            {'EXIT', Pid, _} ->
746                ok
747            after 0 ->
748                ok
749            end
750        end
751    after
752        erlang:demonitor(MRef, [flush])
753    end.
754
755
756-spec remove_group_views(#set_view_group{}, atom()) -> #set_view_group{}.
757remove_group_views(#set_view_group{mod = Mod} = Group, Type) ->
758    case Mod of
759    Type ->
760        Group#set_view_group{views = []};
761    _ ->
762        Group
763    end.
764
765
766-spec update_group_views(#set_view_group{},
767                         #set_view_group{}, atom()) -> #set_view_group{}.
768update_group_views(#set_view_group{mod = Mod} = Group, SrcGroup, Type) ->
769    case Mod of
770    Type ->
771        Group#set_view_group{views = SrcGroup#set_view_group.views};
772    _ ->
773        Group
774    end.
775
776
777-spec send_group_info(#set_view_group{}, port()) -> 'ok'.
778send_group_info(Group, Port) ->
779    #set_view_group{
780        views = Views,
781        filepath = IndexFile,
782        header_pos = HeaderPos,
783        mod = Mod
784    } = Group,
785    Mod2 = case Mod of
786    mapreduce_view ->
787        ?COUCHSTORE_VIEW_TYPE_MAPREDUCE;
788    spatial_view ->
789        ?COUCHSTORE_VIEW_TYPE_SPATIAL
790    end,
791    Data1 = [
792        integer_to_list(Mod2), $\n,
793        IndexFile, $\n,
794        integer_to_list(HeaderPos), $\n,
795        integer_to_list(length(Views)), $\n
796    ],
797    true = port_command(Port, Data1),
798    ok = lists:foreach(
799        fun(#set_view{indexer = View}) ->
800            true = port_command(Port, Mod:view_info(View))
801        end,
802        Views).
803
804
805-spec filter_seqs(ordsets:ordset(partition_id()), partition_seqs()) ->
806                                        partition_seqs().
807filter_seqs(SortedParts, Seqs) ->
808    lists:filter(fun({PartId, _Seq}) ->
809        lists:member(PartId, SortedParts)
810    end, Seqs).
811
812-spec log_port_error(binary(), string(), [any()]) -> binary().
813log_port_error(<<"MAPREDUCE ", Msg/binary>>, ErrorMsg, ErrorArgs) ->
814    ?LOG_MAPREDUCE_ERROR(ErrorMsg, ErrorArgs ++ [Msg]),
815    Msg;
816log_port_error(<<"SPATIAL ", Msg/binary>>, ErrorMsg, ErrorArgs) ->
817    % As of now log errors from spatial views into mapreduce_errors.log
818    ?LOG_MAPREDUCE_ERROR(ErrorMsg, ErrorArgs ++ [Msg]),
819    Msg;
820log_port_error(<<"GENERIC ", Msg/binary>>, ErrorMsg, ErrorArgs) ->
821    ?LOG_ERROR(ErrorMsg, ErrorArgs ++ [Msg]),
822    Msg;
823log_port_error(Msg, ErrorMsg, ErrorArgs) ->
824    ?LOG_ERROR(ErrorMsg, ErrorArgs ++ [Msg]),
825    Msg.
826
827-spec fix_partitions(#set_view_group{}, ordsets:ordset(partition_id())) ->
828    {partition_seqs(), partition_versions()}.
829fix_partitions(Group, PartList) ->
830    lists:foldl(
831        fun(PartId, {SeqAcc, PartVersionsAcc} = Acc) ->
832            case has_part_seq(PartId, SeqAcc) of
833            true ->
834                Acc;
835            false ->
836                % Since we are treating this vbucket as a fresh partition, we are resetting
837                % the older partition version and seq number information.
838                {lists:ukeymerge(1, [{PartId, 0}], SeqAcc),
839                 lists:ukeymerge(1, [{PartId, [{0, 0}]}], PartVersionsAcc)}
840            end
841        end,
842        {?set_seqs(Group), ?set_partition_versions(Group)},
843        PartList).
844
845