1% -*- Mode: Erlang; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3% Licensed under the Apache License, Version 2.0 (the "License"); you may not
4% use this file except in compliance with the License. You may obtain a copy of
5% the License at
6%
7%   http://www.apache.org/licenses/LICENSE-2.0
8%
9% Unless required by applicable law or agreed to in writing, software
10% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12% License for the specific language governing permissions and limitations under
13% the License.
14
15-module(couch_set_view_updater).
16
17-export([update/6]).
18% Exported for the MapReduce specific stuff
19-export([new_sort_file_name/1]).
20
21-include("couch_db.hrl").
22-include_lib("couch_set_view/include/couch_set_view.hrl").
23-include_lib("couch_dcp/include/couch_dcp.hrl").
24
25-define(MAP_QUEUE_SIZE, 256 * 1024).
26-define(WRITE_QUEUE_SIZE, 512 * 1024).
27% The size of the accumulator the emitted key-values are queued up in before
28% they get actually queued. The bigger the value, the less the lock contention,
29% but the higher the possible memory consumption is.
30-define(QUEUE_ACC_BATCH_SIZE, 256 * 1024).
31
32% incremental updates
33-define(INC_MAX_TMP_FILE_SIZE, 31457280).
34-define(MIN_BATCH_SIZE_PER_VIEW, 65536).
35
36% For file sorter and file merger commands.
37-define(PORT_OPTS,
38        [exit_status, use_stdio, stderr_to_stdout, {line, 4096}, binary]).
39
40-record(writer_acc, {
41    parent,
42    owner,
43    group,
44    last_seqs = orddict:new(),
45    part_versions = orddict:new(),
46    compactor_running,
47    write_queue,
48    initial_build,
49    view_empty_kvs,
50    kvs = [],
51    kvs_size = 0,
52    state = updating_active,
53    final_batch = false,
54    max_seqs,
55    stats = #set_view_updater_stats{},
56    tmp_dir = nil,
57    initial_seqs,
58    max_insert_batch_size,
59    tmp_files = dict:new(),
60    throttle = 0,
61    force_flush = false
62}).
63
64
65-spec update(pid(), #set_view_group{},
66             partition_seqs(), boolean(), string(), [term()]) -> no_return().
67update(Owner, Group, CurSeqs, CompactorRunning, TmpDir, Options) ->
68    #set_view_group{
69        set_name = SetName,
70        type = Type,
71        name = DDocId
72    } = Group,
73
74    ActiveParts = couch_set_view_util:decode_bitmask(?set_abitmask(Group)),
75    PassiveParts = couch_set_view_util:decode_bitmask(?set_pbitmask(Group)),
76    NumChanges = couch_set_view_util:missing_changes_count(CurSeqs, ?set_seqs(Group)),
77
78    process_flag(trap_exit, true),
79
80    BeforeEnterTs = os:timestamp(),
81    Parent = self(),
82    BarrierEntryPid = spawn_link(fun() ->
83        DDocIds = couch_set_view_util:get_ddoc_ids_with_sig(SetName, Group),
84        couch_task_status:add_task([
85            {type, blocked_indexer},
86            {set, SetName},
87            {signature, ?l2b(couch_util:to_hex(Group#set_view_group.sig))},
88            {design_documents, DDocIds},
89            {indexer_type, Type}
90        ]),
91        case Type of
92        main ->
93            ok = couch_index_barrier:enter(couch_main_index_barrier, Parent);
94        replica ->
95            ok = couch_index_barrier:enter(couch_replica_index_barrier, Parent)
96        end,
97        Parent ! {done, self(), (timer:now_diff(os:timestamp(), BeforeEnterTs) / 1000000)},
98        receive shutdown -> ok end
99    end),
100
101    BlockedTime = receive
102    {done, BarrierEntryPid, Duration} ->
103        Duration;
104    {'EXIT', _, Reason} ->
105        exit({updater_error, Reason})
106    end,
107
108    CleanupParts = couch_set_view_util:decode_bitmask(?set_cbitmask(Group)),
109    InitialBuild = couch_set_view_util:is_initial_build(Group),
110    ?LOG_INFO("Updater for set view `~s`, ~s group `~s` started~n"
111              "Active partitions:    ~w~n"
112              "Passive partitions:   ~w~n"
113              "Cleanup partitions:   ~w~n"
114              "Replicas to transfer: ~w~n"
115              "Pending transition:   ~n"
116              "    active:           ~w~n"
117              "    passive:          ~w~n"
118              "    unindexable:      ~w~n"
119              "Initial build:        ~s~n"
120              "Compactor running:    ~s~n"
121              "Min # changes:        ~p~n",
122              [SetName, Type, DDocId,
123               ActiveParts,
124               PassiveParts,
125               CleanupParts,
126               ?set_replicas_on_transfer(Group),
127               ?pending_transition_active(?set_pending_transition(Group)),
128               ?pending_transition_passive(?set_pending_transition(Group)),
129               ?pending_transition_unindexable(?set_pending_transition(Group)),
130               InitialBuild,
131               CompactorRunning,
132               NumChanges
133              ]),
134    ?LOG_DEBUG("Updater set view `~s`, ~s group `~s` Partition versions ~w",
135                       [SetName, Type, DDocId, ?set_partition_versions(Group)]),
136
137
138
139    WriterAcc0 = #writer_acc{
140        parent = self(),
141        owner = Owner,
142        group = Group,
143        initial_build = InitialBuild,
144        max_seqs = CurSeqs,
145        part_versions = ?set_partition_versions(Group),
146        tmp_dir = TmpDir,
147        max_insert_batch_size = list_to_integer(
148            couch_config:get("set_views", "indexer_max_insert_batch_size", "1048576"))
149    },
150    update(WriterAcc0, ActiveParts, PassiveParts,
151            BlockedTime, BarrierEntryPid, NumChanges, CompactorRunning, Options).
152
153
154update(WriterAcc, ActiveParts, PassiveParts, BlockedTime,
155       BarrierEntryPid, NumChanges, CompactorRunning, Options) ->
156    #writer_acc{
157        owner = Owner,
158        group = Group
159    } = WriterAcc,
160    #set_view_group{
161        set_name = SetName,
162        type = Type,
163        name = DDocId,
164        sig = GroupSig
165    } = Group,
166
167    StartTime = os:timestamp(),
168
169    MapQueueOptions = [{max_size, ?MAP_QUEUE_SIZE}, {max_items, infinity}],
170    WriteQueueOptions = [{max_size, ?WRITE_QUEUE_SIZE}, {max_items, infinity}],
171    {ok, MapQueue} = couch_work_queue:new(MapQueueOptions),
172    {ok, WriteQueue} = couch_work_queue:new(WriteQueueOptions),
173
174    Parent = self(),
175
176    Mapper = spawn_link(fun() ->
177        try
178            do_maps(Group, MapQueue, WriteQueue)
179        catch _:Error ->
180            Stacktrace = erlang:get_stacktrace(),
181            ?LOG_ERROR("Set view `~s`, ~s group `~s`, mapper error~n"
182                "error:      ~p~n"
183                "stacktrace: ~p~n",
184                [SetName, Type, DDocId, Error, Stacktrace]),
185            exit(Error)
186        end
187    end),
188
189    Writer = spawn_link(fun() ->
190        BarrierEntryPid ! shutdown,
191        ViewEmptyKVs = [{View, []} || View <- Group#set_view_group.views],
192        WriterAcc2 = init_tmp_files(WriterAcc#writer_acc{
193            parent = Parent,
194            group = Group,
195            write_queue = WriteQueue,
196            view_empty_kvs = ViewEmptyKVs,
197            compactor_running = CompactorRunning,
198            initial_seqs = ?set_seqs(Group),
199            throttle = case lists:member(throttle, Options) of
200                true ->
201                    list_to_integer(
202                        couch_config:get("set_views", "throttle_period", "100"));
203                false ->
204                    0
205                end
206        }),
207        ok = couch_set_view_util:open_raw_read_fd(Group),
208        try
209            WriterAcc3 = do_writes(WriterAcc2),
210            receive
211            {doc_loader_finished, {PartVersions0, RealMaxSeqs}} ->
212                WriterAccStats = WriterAcc3#writer_acc.stats,
213                WriterAccGroup = WriterAcc3#writer_acc.group,
214                WriterAccHeader = WriterAccGroup#set_view_group.index_header,
215                PartVersions = lists:ukeymerge(1, PartVersions0,
216                    WriterAccHeader#set_view_index_header.partition_versions),
217                case WriterAcc3#writer_acc.initial_build of
218                true ->
219                    % The doc loader might not load the mutations up to the
220                    % most recent one, but only to a lower one. Update the
221                    % group header and stats with the correct information.
222                    MaxSeqs = lists:ukeymerge(
223                        1, RealMaxSeqs, WriterAccHeader#set_view_index_header.seqs),
224                    Stats = WriterAccStats#set_view_updater_stats{
225                        seqs = lists:sum([S || {_, S} <- RealMaxSeqs])
226                    };
227                false ->
228                    MaxSeqs = WriterAccHeader#set_view_index_header.seqs,
229                    Stats = WriterAcc3#writer_acc.stats
230                end,
231                FinalWriterAcc = WriterAcc3#writer_acc{
232                    stats = Stats,
233                    group = WriterAccGroup#set_view_group{
234                        index_header = WriterAccHeader#set_view_index_header{
235                            partition_versions = PartVersions,
236                            seqs = MaxSeqs
237                        }
238                    }
239                }
240            end,
241            Parent ! {writer_finished, FinalWriterAcc}
242        catch _:Error ->
243            Stacktrace = erlang:get_stacktrace(),
244            ?LOG_ERROR("Set view `~s`, ~s group `~s`, writer error~n"
245                "error:      ~p~n"
246                "stacktrace: ~p~n",
247                [SetName, Type, DDocId, Error, Stacktrace]),
248            exit(Error)
249        after
250            ok = couch_set_view_util:close_raw_read_fd(Group)
251        end
252    end),
253
254    InitialBuild = WriterAcc#writer_acc.initial_build,
255    NumChanges2 = case InitialBuild of
256        true ->
257            couch_set_view_updater_helper:count_items_from_set(Group, ActiveParts ++ PassiveParts);
258        false ->
259            NumChanges
260    end,
261
262    DocLoader = spawn_link(fun() ->
263        DDocIds = couch_set_view_util:get_ddoc_ids_with_sig(SetName, Group),
264        couch_task_status:add_task([
265            {type, indexer},
266            {set, SetName},
267            {signature, ?l2b(couch_util:to_hex(GroupSig))},
268            {design_documents, DDocIds},
269            {indexer_type, Type},
270            {progress, 0},
271            {changes_done, 0},
272            {initial_build, InitialBuild},
273            {total_changes, NumChanges2}
274        ]),
275        couch_task_status:set_update_frequency(5000),
276        case lists:member(pause, Options) of
277        true ->
278            % For reliable unit testing, to verify that adding new partitions
279            % to the passive state doesn't restart the updater and the updater
280            % can be aware of it and index these new partitions in the same run.
281            receive continue -> ok end;
282        false ->
283            ok
284        end,
285        try
286            {PartVersions, MaxSeqs} = load_changes(
287                Owner, Parent, Group, MapQueue, ActiveParts, PassiveParts,
288                WriterAcc#writer_acc.max_seqs,
289                WriterAcc#writer_acc.initial_build),
290            Parent ! {doc_loader_finished, {PartVersions, MaxSeqs}}
291        catch
292        throw:purge ->
293            exit(purge);
294        throw:{rollback, RollbackSeqs} ->
295            exit({rollback, RollbackSeqs});
296        _:Error ->
297            Stacktrace = erlang:get_stacktrace(),
298            ?LOG_ERROR("Set view `~s`, ~s group `~s`, doc loader error~n"
299                "error:      ~p~n"
300                "stacktrace: ~p~n",
301                [SetName, Type, DDocId, Error, Stacktrace]),
302            exit(Error)
303        end,
304        % Since updater progress stats is added from docloader,
305        % this process has to stay till updater has completed.
306        receive
307        updater_finished ->
308            ok
309        end
310    end),
311
312    Result = wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, Group),
313    case Type of
314    main ->
315        ok = couch_index_barrier:leave(couch_main_index_barrier);
316    replica ->
317        ok = couch_index_barrier:leave(couch_replica_index_barrier)
318    end,
319    case Result of
320    {updater_finished, #set_view_updater_result{group = NewGroup}} ->
321        ?LOG_DEBUG("Updater for ~s set view group `~s`, set `~s`, writer finished:~n"
322                   "  start seqs: ~w~n"
323                   "  end seqs:   ~w~n",
324                   [Type, DDocId, SetName, ?set_seqs(Group), ?set_seqs(NewGroup)]);
325    _ ->
326        ok
327    end,
328    DocLoader ! updater_finished,
329    exit(Result).
330
331-spec get_active_dcp_streams(pid()) -> list().
332get_active_dcp_streams(DcpPid) ->
333    case couch_dcp_client:list_streams(DcpPid) of
334    {active_list_streams, ActiveStreams} ->
335        ActiveStreams;
336    {retry_list_streams, Timeout} ->
337        receive
338        after Timeout ->
339            get_active_dcp_streams(DcpPid)
340        end
341    end.
342
343-spec stop_dcp_streams(pid()) -> ok.
344stop_dcp_streams(DcpPid) ->
345    ActiveStreams = get_active_dcp_streams(DcpPid),
346    lists:foreach(fun(ActiveStream) ->
347        case couch_dcp_client:remove_stream(DcpPid, ActiveStream) of
348        ok ->
349            ok;
350        {error, vbucket_stream_not_found} ->
351            ok;
352        Error ->
353            ?LOG_ERROR("Unexpected error for closing stream of partition ~p",
354                [ActiveStream]),
355            throw(Error)
356        end
357    end, ActiveStreams),
358    ok.
359
360wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup) ->
361    #set_view_group{set_name = SetName, name = DDocId, type = Type} = OldGroup,
362    receive
363    {new_passive_partitions, _} = NewPassivePartitions ->
364        Writer ! NewPassivePartitions,
365        DocLoader ! NewPassivePartitions,
366        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
367    continue ->
368        % Used by unit tests.
369        DocLoader ! continue,
370        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
371    {doc_loader_finished, {PartVersions, MaxSeqs}} ->
372        Writer ! {doc_loader_finished, {PartVersions, MaxSeqs}},
373        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
374    {writer_finished, WriterAcc} ->
375        Stats0 = WriterAcc#writer_acc.stats,
376        Result = #set_view_updater_result{
377            group = WriterAcc#writer_acc.group,
378            state = WriterAcc#writer_acc.state,
379            stats = Stats0#set_view_updater_stats{
380                indexing_time = timer:now_diff(os:timestamp(), StartTime) / 1000000,
381                blocked_time = BlockedTime
382            },
383            tmp_file = case WriterAcc#writer_acc.initial_build of
384                true ->
385                    dict:fetch(build_file, WriterAcc#writer_acc.tmp_files);
386                false ->
387                    ""
388                end
389        },
390        {updater_finished, Result};
391    {compactor_started, Pid, Ref} ->
392        ?LOG_INFO("Set view `~s`, ~s group `~s`, updater received "
393                  "compactor ~p notification, ref ~p, writer ~p",
394                   [SetName, Type, DDocId, Pid, Ref, Writer]),
395        Writer ! {compactor_started, self()},
396        erlang:put(compactor_pid, {Pid, Ref}),
397        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
398    {compactor_started_ack, Writer, GroupSnapshot} ->
399        ?LOG_INFO("Set view `~s`, ~s group `~s`, updater received compaction ack"
400                  " from writer ~p", [SetName, Type, DDocId, Writer]),
401        case erlang:erase(compactor_pid) of
402        {Pid, Ref} ->
403            Pid ! {Ref, {ok, GroupSnapshot}};
404        undefined ->
405            ok
406        end,
407        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
408    {'EXIT', _, Reason} when Reason =/= normal ->
409        couch_util:shutdown_sync(DocLoader),
410        couch_util:shutdown_sync(Mapper),
411        couch_util:shutdown_sync(Writer),
412        stop_dcp_streams(OldGroup#set_view_group.dcp_pid),
413        {updater_error, Reason};
414    {native_updater_start, Writer} ->
415        % We need control over spawning native updater process
416        % This helps to terminate native os processes correctly
417        Writer ! {ok, native_updater_start},
418        receive
419        {native_updater_pid, NativeUpdater} ->
420            erlang:put(native_updater, NativeUpdater)
421        end,
422        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
423    stop ->
424        case erlang:erase(native_updater) of
425        undefined ->
426            couch_util:shutdown_sync(DocLoader),
427            couch_util:shutdown_sync(Mapper),
428            couch_util:shutdown_sync(Writer);
429        NativeUpdater ->
430            MRef = erlang:monitor(process, NativeUpdater),
431            NativeUpdater ! stop,
432            receive
433            {'DOWN', MRef, process, NativeUpdater, _} ->
434                couch_util:shutdown_sync(DocLoader),
435                couch_util:shutdown_sync(Mapper),
436                couch_util:shutdown_sync(Writer)
437            end,
438            stop_dcp_streams(OldGroup#set_view_group.dcp_pid)
439        end,
440        exit({updater_error, shutdown});
441    {add_stream, DcpPid, PartId, PartUuid, StartSeq, EndSeq, Flags} ->
442        Result = couch_dcp_client:add_stream(DcpPid, PartId, PartUuid, StartSeq, EndSeq, Flags),
443        DocLoader ! {add_stream_result, Result},
444        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup)
445    end.
446
447
448dcp_marker_to_string(Type) ->
449    case Type band ?DCP_SNAPSHOT_TYPE_MASK of
450    ?DCP_SNAPSHOT_TYPE_DISK ->
451        "on-disk";
452    ?DCP_SNAPSHOT_TYPE_MEMORY ->
453        "in-memory";
454    _ ->
455        io_lib:format("unknown (~w)", [Type])
456    end.
457
458
459load_changes(Owner, Updater, Group, MapQueue, ActiveParts, PassiveParts,
460        EndSeqs, InitialBuild) ->
461    #set_view_group{
462        set_name = SetName,
463        name = DDocId,
464        type = GroupType,
465        index_header = #set_view_index_header{
466            seqs = SinceSeqs,
467            partition_versions = PartVersions0
468        },
469        dcp_pid = DcpPid,
470        category = Category
471    } = Group,
472
473    MaxDocSize = list_to_integer(
474        couch_config:get("set_views", "indexer_max_doc_size", "0")),
475    AddStreamFun = fun(Pid, PartId, PartUuid, StartSeq, EndSeq, Flags) ->
476        Updater ! {add_stream, Pid, PartId, PartUuid, StartSeq, EndSeq, Flags},
477        receive {add_stream_result, Result} -> Result end,
478        Result
479    end,
480    FoldFun = fun({PartId, EndSeq}, {AccCount, AccSeqs, AccVersions, AccRollbacks}) ->
481        case couch_set_view_util:has_part_seq(PartId, ?set_unindexable_seqs(Group))
482            andalso not lists:member(PartId, ?set_replicas_on_transfer(Group)) of
483        true ->
484            {AccCount, AccSeqs, AccVersions, AccRollbacks};
485        false ->
486            Since = couch_util:get_value(PartId, SinceSeqs, 0),
487            PartVersions = couch_util:get_value(PartId, AccVersions),
488            Flags = case InitialBuild of
489            true ->
490                ?DCP_FLAG_DISKONLY;
491            false ->
492                ?DCP_FLAG_NOFLAG
493            end,
494            % For stream request from 0, If a vbucket got reset in the window
495            % of time between seqno was obtained from stats and stream request
496            % was made, the end_seqno may be higher than current vb high seqno.
497            % Use a special flag to tell server to set end_seqno.
498            Flags2 = case PartVersions of
499            [{0, 0}] ->
500                Flags bor ?DCP_FLAG_USELATEST_ENDSEQNO;
501            _ ->
502                Flags
503            end,
504
505            case AccRollbacks of
506            [] ->
507                case EndSeq =:= Since of
508                true ->
509                    {AccCount, AccSeqs, AccVersions, AccRollbacks};
510                false ->
511                    ChangesWrapper = fun
512                        ({part_versions, _} = NewVersions, Acc) ->
513                            queue_doc(
514                                NewVersions, MapQueue, Group,
515                                MaxDocSize, InitialBuild),
516                            Acc;
517                        ({snapshot_marker, {MarkerStartSeq, MarkerEndSeq, MarkerType}}, {Count, _})
518                            when MarkerType band ?DCP_SNAPSHOT_TYPE_MASK =/= 0 ->
519                            ?LOG_DEBUG(
520                                "set view `~s`, ~s (~s) group `~s`: received "
521                                "a snapshot marker (~s) for partition ~p from "
522                                "sequence ~p to ~p",
523                                [SetName, GroupType, Category, DDocId,
524                                    dcp_marker_to_string(MarkerType),
525                                    PartId, MarkerStartSeq, MarkerEndSeq]),
526                            case Count of
527                            % Ignore the snapshot marker that is at the
528                            % beginning of the stream.
529                            % If it wasn't ignored, it would lead to an
530                            % additional forced flush which isn't needed. A
531                            % flush is needed if there are several mutations
532                            % of the same document within one batch. As two
533                            % different partitions can't contain the same
534                            % document ID, we are safe to not force flushing
535                            % between two partitions.
536                            0 ->
537                                {Count, MarkerEndSeq};
538                            _ ->
539                                queue_doc(
540                                    snapshot_marker, MapQueue, Group,
541                                    MaxDocSize, InitialBuild),
542                                {Count, MarkerEndSeq}
543                            end;
544                        ({snapshot_marker, {MarkerStartSeq, MarkerEndSeq, MarkerType}}, Acc) ->
545                            ?LOG_ERROR(
546                                "set view `~s`, ~s (~s) group `~s`: received "
547                                "a snapshot marker (~s) for partition ~p from "
548                                "sequence ~p to ~p",
549                                [SetName, GroupType, Category, DDocId,
550                                    dcp_marker_to_string(MarkerType),
551                                    PartId, MarkerStartSeq, MarkerEndSeq]),
552                            throw({error, unknown_snapshot_marker, MarkerType}),
553                            Acc;
554                        (#dcp_doc{} = Item, {Count, AccEndSeq}) ->
555                            queue_doc(
556                                Item, MapQueue, Group,
557                                MaxDocSize, InitialBuild),
558                            {Count + 1, AccEndSeq}
559                        end,
560                    Result = couch_dcp_client:enum_docs_since(
561                        DcpPid, PartId, PartVersions, Since, EndSeq, Flags2,
562                        ChangesWrapper, {0, 0}, AddStreamFun),
563                    case Result of
564                    {ok, {AccCount2, AccEndSeq}, NewPartVersions} ->
565                        AccSeqs2 = orddict:store(PartId, AccEndSeq, AccSeqs),
566                        AccVersions2 = lists:ukeymerge(
567                            1, [{PartId, NewPartVersions}], AccVersions),
568                        AccRollbacks2 = AccRollbacks;
569                    {rollback, RollbackSeq} ->
570                        AccCount2 = AccCount,
571                        AccSeqs2 = AccSeqs,
572                        AccVersions2 = AccVersions,
573                        AccRollbacks2 = ordsets:add_element(
574                            {PartId, RollbackSeq}, AccRollbacks);
575                    Error ->
576                        AccCount2 = AccCount,
577                        AccSeqs2 = AccSeqs,
578                        AccVersions2 = AccVersions,
579                        AccRollbacks2 = AccRollbacks,
580                        ?LOG_ERROR("set view `~s`, ~s (~s) group `~s` error"
581                            "while loading changes for partition ~p:~n~p~n",
582                            [SetName, GroupType, Category, DDocId, PartId,
583                                Error]),
584                        throw(Error)
585                    end,
586                    {AccCount2, AccSeqs2, AccVersions2, AccRollbacks2}
587                end;
588            _ ->
589                % If there is a rollback needed, don't store any new documents
590                % in the index, but just check for a rollback of another
591                % partition (i.e. a request with start seq == end seq)
592                ChangesWrapper = fun(_, _) -> ok end,
593                Result = couch_dcp_client:enum_docs_since(
594                    DcpPid, PartId, PartVersions, Since, Since, Flags2,
595                    ChangesWrapper, ok, AddStreamFun),
596                case Result of
597                {ok, _, _} ->
598                    AccRollbacks2 = AccRollbacks;
599                {rollback, RollbackSeq} ->
600                    AccRollbacks2 = ordsets:add_element(
601                        {PartId, RollbackSeq}, AccRollbacks)
602                end,
603                {AccCount, AccSeqs, AccVersions, AccRollbacks2}
604            end
605        end
606    end,
607
608    notify_owner(Owner, {state, updating_active}, Updater),
609    case ActiveParts of
610    [] ->
611        ActiveChangesCount = 0,
612        MaxSeqs = orddict:new(),
613        PartVersions = PartVersions0,
614        Rollbacks = [];
615    _ ->
616        ?LOG_INFO("Updater reading changes from active partitions to "
617                  "update ~s set view group `~s` from set `~s`",
618                  [GroupType, DDocId, SetName]),
619        {ActiveChangesCount, MaxSeqs, PartVersions, Rollbacks} = lists:foldl(
620            FoldFun, {0, orddict:new(), PartVersions0, ordsets:new()},
621            couch_set_view_util:filter_seqs(ActiveParts, EndSeqs))
622    end,
623    case PassiveParts of
624    [] ->
625        FinalChangesCount = ActiveChangesCount,
626        MaxSeqs2 = MaxSeqs,
627        PartVersions2 = PartVersions,
628        Rollbacks2 = Rollbacks;
629    _ ->
630        ?LOG_INFO("Updater reading changes from passive partitions to "
631                  "update ~s set view group `~s` from set `~s`",
632                  [GroupType, DDocId, SetName]),
633        {FinalChangesCount, MaxSeqs2, PartVersions2, Rollbacks2} = lists:foldl(
634            FoldFun, {ActiveChangesCount, MaxSeqs, PartVersions, Rollbacks},
635            couch_set_view_util:filter_seqs(PassiveParts, EndSeqs))
636    end,
637    {FinalChangesCount3, MaxSeqs3, PartVersions3, Rollbacks3} =
638        load_changes_from_passive_parts_in_mailbox(DcpPid,
639            Group, FoldFun, FinalChangesCount, MaxSeqs2, PartVersions2, Rollbacks2),
640
641    case Rollbacks3 of
642    [] ->
643        ok;
644    _ ->
645        throw({rollback, Rollbacks3})
646    end,
647
648    couch_work_queue:close(MapQueue),
649    ?LOG_INFO("Updater for ~s set view group `~s`, set `~s` (~s), "
650              "read a total of ~p changes",
651              [GroupType, DDocId, SetName, Category, FinalChangesCount3]),
652    ?LOG_DEBUG("Updater for ~s set view group `~s`, set `~s`, max partition seqs found:~n~w",
653               [GroupType, DDocId, SetName, MaxSeqs3]),
654    {PartVersions3, MaxSeqs3}.
655
656
657load_changes_from_passive_parts_in_mailbox(DcpPid,
658        Group, FoldFun, ChangesCount, MaxSeqs0, PartVersions0, Rollbacks) ->
659    #set_view_group{
660        set_name = SetName,
661        name = DDocId,
662        type = GroupType
663    } = Group,
664    receive
665    {new_passive_partitions, Parts0} ->
666        Parts = get_more_passive_partitions(Parts0),
667        AddPartVersions = [{P, [{0, 0}]} || P <- Parts],
668        {ok, AddMaxSeqs} = couch_dcp_client:get_seqs(DcpPid, Parts),
669        PartVersions = lists:ukeymerge(1, AddPartVersions, PartVersions0),
670
671        MaxSeqs = lists:ukeymerge(1, AddMaxSeqs, MaxSeqs0),
672        ?LOG_INFO("Updater reading changes from new passive partitions ~w to "
673                  "update ~s set view group `~s` from set `~s`",
674                  [Parts, GroupType, DDocId, SetName]),
675        {ChangesCount2, MaxSeqs2, PartVersions2, Rollbacks2} = lists:foldl(
676            FoldFun, {ChangesCount, MaxSeqs, PartVersions, Rollbacks}, AddMaxSeqs),
677        load_changes_from_passive_parts_in_mailbox(DcpPid,
678            Group, FoldFun, ChangesCount2, MaxSeqs2, PartVersions2, Rollbacks2)
679    after 0 ->
680        {ChangesCount, MaxSeqs0, PartVersions0, Rollbacks}
681    end.
682
683
684get_more_passive_partitions(Parts) ->
685    receive
686    {new_passive_partitions, Parts2} ->
687        get_more_passive_partitions(Parts ++ Parts2)
688    after 0 ->
689        lists:sort(Parts)
690    end.
691
692
693notify_owner(Owner, Msg, UpdaterPid) ->
694    Owner ! {updater_info, UpdaterPid, Msg}.
695
696
697queue_doc(snapshot_marker, MapQueue, _Group, _MaxDocSize, _InitialBuild) ->
698    couch_work_queue:queue(MapQueue, snapshot_marker);
699queue_doc({part_versions, _} = PartVersions, MapQueue, _Group, _MaxDocSize,
700    _InitialBuild) ->
701    couch_work_queue:queue(MapQueue, PartVersions);
702queue_doc(Doc, MapQueue, Group, MaxDocSize, InitialBuild) ->
703    Doc2 = case Doc#dcp_doc.deleted of
704    true ->
705        case Group#set_view_group.index_xattr_on_deleted_docs of
706        true ->
707            case Doc#dcp_doc.body of
708            <<>> ->
709                Doc#dcp_doc{deleted = true};
710            _ ->
711                Doc#dcp_doc{deleted = false}
712            end;
713        false ->
714            Doc
715        end;
716    false ->
717        Doc
718    end,
719
720    case Doc2#dcp_doc.deleted of
721    true when InitialBuild ->
722        Entry = nil;
723    true ->
724        Entry = Doc2;
725    false ->
726        #set_view_group{
727           set_name = SetName,
728           name = DDocId,
729           type = GroupType
730        } = Group,
731        case couch_util:validate_utf8(Doc2#dcp_doc.id) of
732        true ->
733            case (MaxDocSize > 0) andalso
734                (iolist_size(Doc2#dcp_doc.body) > MaxDocSize) of
735            true ->
736                ?LOG_MAPREDUCE_ERROR("Bucket `~s`, ~s group `~s`, skipping "
737                    "document with ID `~s`: too large body (~p bytes)",
738                    [SetName, GroupType, DDocId,
739                     ?b2l(Doc2#dcp_doc.id), iolist_size(Doc2#dcp_doc.body)]),
740                Entry = Doc2#dcp_doc{deleted = true};
741            false ->
742                Entry = Doc2
743            end;
744        false ->
745            % If the id isn't utf8 (memcached allows it), then log an error
746            % message and skip the doc. Send it through the queue anyway
747            % so we record the high seq num in case there are a bunch of
748            % these at the end, we want to keep track of the high seq and
749            % not reprocess again.
750            ?LOG_MAPREDUCE_ERROR("Bucket `~s`, ~s group `~s`, skipping "
751                "document with non-utf8 id. Doc id bytes: ~w",
752                [SetName, GroupType, DDocId, ?b2l(Doc2#dcp_doc.id)]),
753            Entry = Doc2#dcp_doc{deleted = true}
754        end
755    end,
756    case Entry of
757    nil ->
758        ok;
759    _ ->
760        couch_work_queue:queue(MapQueue, Entry),
761        update_task(1)
762    end.
763
764-spec accumulate_xattr(binary(), binary(), non_neg_integer(), non_neg_integer()) ->
765    {binary(), binary()}.
766accumulate_xattr(Data, Acc, XATTRSize, AccSum) when AccSum =:= XATTRSize ->
767    {Data, <<Acc/binary, "}">>};
768accumulate_xattr(Body, Acc, XATTRSize, AccSum) ->
769    <<DataSize:32, Rest/binary>> = Body,
770    AccSum2 = AccSum + DataSize + 4,
771    <<Data0:DataSize/binary, Rest2/binary>> = Rest,
772    % Remove last zero value from  XATTR
773    Data = binary:part(Data0, 0, DataSize-1),
774    % Jsonify key and value
775    Data2 = case AccSum2 of
776    XATTRSize ->
777        <<"\"", Data/binary>>;
778    _ ->
779        <<"\"", Data/binary, ",">>
780    end,
781    % Replace zero byte after key with colon
782    Xattr = binary:replace(Data2, <<0>>, <<"\":">>),
783    accumulate_xattr(Rest2, <<Acc/binary, Xattr/binary>>, XATTRSize, AccSum2).
784
785do_maps(Group, MapQueue, WriteQueue) ->
786    #set_view_group{
787        set_name = SetName,
788        name = DDocId,
789        type = Type,
790        mod = Mod
791    } = Group,
792    case couch_work_queue:dequeue(MapQueue) of
793    closed ->
794        couch_work_queue:close(WriteQueue);
795    {ok, Queue, _QueueSize} ->
796        ViewCount = length(Group#set_view_group.views),
797        {Items, _} = lists:foldl(
798            fun(#dcp_doc{deleted = true} = DcpDoc, {Acc, Size}) ->
799                #dcp_doc{
800                    id = Id,
801                    partition = PartId,
802                    seq = Seq
803                } = DcpDoc,
804                Item = {Seq, Id, PartId, []},
805                {[Item | Acc], Size};
806            (#dcp_doc{deleted = false} = DcpDoc, {Acc0, Size0}) ->
807                % When there are a lot of emits per document the memory can
808                % grow almost indefinitely as the queue size is only limited
809                % by the number of documents and not their emits.
810                % In case the accumulator grows huge, queue the items early
811                % into the writer queue. Only take emits into account, the
812                % other cases in this `foldl` won't ever have an significant
813                % size.
814                {Acc, Size} = case Size0 > ?QUEUE_ACC_BATCH_SIZE of
815                true ->
816                    couch_work_queue:queue(WriteQueue, lists:reverse(Acc0)),
817                    {[], 0};
818                false ->
819                    {Acc0, Size0}
820                end,
821                #dcp_doc{
822                    id = Id,
823                    body = Body,
824                    partition = PartId,
825                    rev_seq = RevSeq,
826                    seq = Seq,
827                    cas = Cas,
828                    expiration = Expiration,
829                    flags = Flags,
830                    data_type = DcpDataType
831                } = DcpDoc,
832                {DataType, DocBody, XATTRs} = case DcpDataType of
833                ?DCP_DATA_TYPE_RAW ->
834                    {DocBody2, XATTRs2} = accumulate_xattr(Body, <<"\"xattrs\":{">>, 0, 0),
835                    {?CONTENT_META_NON_JSON_MODE, DocBody2, XATTRs2};
836                ?DCP_DATA_TYPE_JSON ->
837                    {DocBody3, XATTRs3} = accumulate_xattr(Body, <<"\"xattrs\":{">>, 0, 0),
838                    {?CONTENT_META_JSON, DocBody3, XATTRs3};
839                ?DCP_DATA_TYPE_BINARY_XATTR ->
840                    <<XATTRSize:32, Rest/binary>> = Body,
841                    {DocBody4, XATTRs4} = accumulate_xattr(Rest, <<"\"xattrs\":{">>, XATTRSize, 0),
842                    {?CONTENT_META_NON_JSON_MODE, DocBody4, XATTRs4};
843                ?DCP_DATA_TYPE_JSON_XATTR ->
844                    <<XATTRSize:32, Rest/binary>> = Body,
845                    {DocBody5, XATTRs5} = accumulate_xattr(Rest, <<"\"xattrs\":{">>, XATTRSize, 0),
846                    {?CONTENT_META_JSON, DocBody5, XATTRs5}
847                end,
848                Doc = #doc{
849                    id = Id,
850                    rev = {RevSeq, <<Cas:64, Expiration:32, Flags:32>>},
851                    body = DocBody,
852                    content_meta = DataType,
853                    deleted = false
854                },
855                try
856                    {ok, Result, LogList} = couch_set_view_mapreduce:map(
857                        Doc, XATTRs, PartId, Seq, Group),
858                    {Result2, _} = lists:foldr(
859                        fun({error, Reason}, {AccRes, Pos}) ->
860                            ErrorMsg = "Bucket `~s`, ~s group `~s`, error mapping"
861                                    " document `~s` for view `~s`: ~s",
862                            ViewName = Mod:view_name(Group, Pos),
863                            Args = [SetName, Type, DDocId, Id, ViewName,
864                                    couch_util:to_binary(Reason)],
865                            ?LOG_MAPREDUCE_ERROR(ErrorMsg, Args),
866                            {[[] | AccRes], Pos - 1};
867                        (KVs, {AccRes, Pos}) ->
868                            {[KVs | AccRes], Pos - 1}
869                        end,
870                        {[], ViewCount}, Result),
871                    lists:foreach(
872                        fun(Msg) ->
873                            DebugMsg = "Bucket `~s`, ~s group `~s`, map function"
874                                " log for document `~s`: ~s",
875                            Args = [SetName, Type, DDocId, Id, binary_to_list(Msg)],
876                            ?LOG_MAPREDUCE_ERROR(DebugMsg, Args)
877                        end, LogList),
878                    Item = {Seq, Id, PartId, Result2},
879                    {[Item | Acc], Size + erlang:external_size(Result2)}
880                catch _:{error, Reason} ->
881                    ErrorMsg = "Bucket `~s`, ~s group `~s`, error mapping document `~s`: ~s",
882                    Args = [SetName, Type, DDocId, Id, couch_util:to_binary(Reason)],
883                    ?LOG_MAPREDUCE_ERROR(ErrorMsg, Args),
884                    {[{Seq, Id, PartId, []} | Acc], Size}
885                end;
886            (snapshot_marker, {Acc, Size}) ->
887                {[snapshot_marker | Acc], Size};
888            ({part_versions, _} = PartVersions, {Acc, Size}) ->
889                {[PartVersions | Acc], Size}
890            end,
891            {[], 0}, Queue),
892        ok = couch_work_queue:queue(WriteQueue, lists:reverse(Items)),
893        do_maps(Group, MapQueue, WriteQueue)
894    end.
895
896
897do_writes(Acc) ->
898    #writer_acc{
899        kvs = Kvs,
900        kvs_size = KvsSize,
901        write_queue = WriteQueue,
902        throttle = Throttle
903    } = Acc,
904    ok = timer:sleep(Throttle),
905    case couch_work_queue:dequeue(WriteQueue) of
906    closed ->
907        flush_writes(Acc#writer_acc{final_batch = true});
908    {ok, Queue0, QueueSize} ->
909        Queue = lists:flatten(Queue0),
910        Kvs2 = Kvs ++ Queue,
911        KvsSize2 = KvsSize + QueueSize,
912
913        Acc2 = Acc#writer_acc{
914            kvs = Kvs2,
915            kvs_size = KvsSize2
916        },
917        case should_flush_writes(Acc2) of
918        true ->
919            Acc3 = flush_writes(Acc2),
920            Acc4 = Acc3#writer_acc{kvs = [], kvs_size = 0};
921        false ->
922            Acc4 = Acc2
923        end,
924        do_writes(Acc4)
925    end.
926
927should_flush_writes(Acc) ->
928    #writer_acc{
929        view_empty_kvs = ViewEmptyKvs,
930        kvs_size = KvsSize
931    } = Acc,
932    KvsSize >= (?MIN_BATCH_SIZE_PER_VIEW * length(ViewEmptyKvs)).
933
934
935flush_writes(#writer_acc{kvs = [], initial_build = false} = Acc) ->
936    Acc2 = maybe_update_btrees(Acc),
937    checkpoint(Acc2);
938
939flush_writes(#writer_acc{initial_build = false} = Acc0) ->
940    #writer_acc{
941        kvs = Kvs,
942        view_empty_kvs = ViewEmptyKVs,
943        group = Group,
944        parent = Parent,
945        owner = Owner,
946        final_batch = IsFinalBatch,
947        part_versions = PartVersions
948    } = Acc0,
949    Mod = Group#set_view_group.mod,
950    % Only incremental updates can contain multiple snapshots
951    {MultipleSnapshots, Kvs2} = merge_snapshots(Kvs),
952    Acc1 = case MultipleSnapshots of
953    true ->
954        Acc2 = maybe_update_btrees(Acc0#writer_acc{force_flush = true}),
955        checkpoint(Acc2);
956    false ->
957        Acc0
958    end,
959    #writer_acc{last_seqs = LastSeqs} = Acc1,
960    {ViewKVs, DocIdViewIdKeys, NewLastSeqs, NewPartVersions} =
961        process_map_results(Mod, Kvs2, ViewEmptyKVs, LastSeqs, PartVersions),
962    Acc3 = Acc1#writer_acc{last_seqs = NewLastSeqs, part_versions = NewPartVersions},
963    Acc4 = write_to_tmp_batch_files(ViewKVs, DocIdViewIdKeys, Acc3),
964    #writer_acc{group = NewGroup} = Acc4,
965    case ?set_seqs(NewGroup) =/= ?set_seqs(Group) of
966    true ->
967        Acc5 = checkpoint(Acc4),
968        case (Acc4#writer_acc.state =:= updating_active) andalso
969            lists:any(fun({PartId, _}) ->
970                ((1 bsl PartId) band ?set_pbitmask(Group) =/= 0)
971            end, NewLastSeqs) of
972        true ->
973            notify_owner(Owner, {state, updating_passive}, Parent),
974            Acc5#writer_acc{state = updating_passive};
975        false ->
976            Acc5
977        end;
978    false ->
979        case IsFinalBatch of
980        true ->
981            checkpoint(Acc4);
982        false ->
983            Acc4
984        end
985    end;
986
987flush_writes(#writer_acc{initial_build = true} = WriterAcc) ->
988    #writer_acc{
989        kvs = Kvs,
990        view_empty_kvs = ViewEmptyKVs,
991        tmp_files = TmpFiles,
992        tmp_dir = TmpDir,
993        group = Group,
994        final_batch = IsFinalBatch,
995        max_seqs = MaxSeqs,
996        part_versions = PartVersions,
997        stats = Stats
998    } = WriterAcc,
999    #set_view_group{
1000        set_name = SetName,
1001        type = Type,
1002        name = DDocId,
1003        mod = Mod
1004    } = Group,
1005    {ViewKVs, DocIdViewIdKeys, MaxSeqs2, PartVersions2} = process_map_results(
1006        Mod, Kvs, ViewEmptyKVs, MaxSeqs, PartVersions),
1007
1008    IdRecords = lists:foldr(
1009        fun({_DocId, {_PartId, []}}, Acc) ->
1010                Acc;
1011            (Kv, Acc) ->
1012                [{KeyBin, ValBin}] = Mod:convert_back_index_kvs_to_binary([Kv], []),
1013                KvBin = [<<(byte_size(KeyBin)):16>>, KeyBin, ValBin],
1014                [[<<(iolist_size(KvBin)):32/native>>, KvBin] | Acc]
1015        end,
1016        [], DocIdViewIdKeys),
1017    #set_view_tmp_file_info{fd = IdFd} = dict:fetch(ids_index, TmpFiles),
1018    ok = file:write(IdFd, IdRecords),
1019
1020    {InsertKVCount, TmpFiles2} = Mod:write_kvs(Group, TmpFiles, ViewKVs),
1021
1022    Stats2 = Stats#set_view_updater_stats{
1023        inserted_kvs = Stats#set_view_updater_stats.inserted_kvs + InsertKVCount,
1024        inserted_ids = Stats#set_view_updater_stats.inserted_ids + length(DocIdViewIdKeys)
1025    },
1026    case IsFinalBatch of
1027    false ->
1028        WriterAcc#writer_acc{
1029            max_seqs = MaxSeqs2,
1030            stats = Stats2,
1031            tmp_files = TmpFiles2
1032        };
1033    true ->
1034        ?LOG_INFO("Updater for set view `~s`, ~s group `~s`, starting btree "
1035                  "build phase" , [SetName, Type, DDocId]),
1036        {Group2, BuildFd} = Mod:finish_build(Group, TmpFiles2, TmpDir),
1037        WriterAcc#writer_acc{
1038            tmp_files = dict:store(build_file, BuildFd, TmpFiles2),
1039            max_seqs = MaxSeqs2,
1040            part_versions = PartVersions2,
1041            stats = Stats2,
1042            group = Group2
1043        }
1044    end.
1045
1046
1047process_map_results(Mod, Kvs, ViewEmptyKVs, PartSeqs, PartVersions) ->
1048    lists:foldl(
1049        fun({Seq, DocId, PartId, []}, {ViewKVsAcc, DocIdViewIdKeysAcc, PartIdSeqs, PartIdVersions}) ->
1050            PartIdSeqs2 = update_part_seq(Seq, PartId, PartIdSeqs),
1051            {ViewKVsAcc, [{DocId, {PartId, []}} | DocIdViewIdKeysAcc], PartIdSeqs2, PartIdVersions};
1052        ({Seq, DocId, PartId, QueryResults}, {ViewKVsAcc, DocIdViewIdKeysAcc, PartIdSeqs, PartIdVersions}) ->
1053            {NewViewKVs, NewViewIdKeys} = Mod:view_insert_doc_query_results(
1054                    DocId, PartId, QueryResults, ViewKVsAcc, [], []),
1055            PartIdSeqs2 = update_part_seq(Seq, PartId, PartIdSeqs),
1056            {NewViewKVs, [{DocId, {PartId, NewViewIdKeys}} | DocIdViewIdKeysAcc], PartIdSeqs2, PartIdVersions};
1057        (snapshot_marker, _Acc) ->
1058            throw({error,
1059                <<"The multiple snapshots should have been merged, "
1060                  "but theu weren't">>});
1061        ({part_versions, {PartId, NewVersions}}, {ViewKVsAcc, DocIdViewIdKeysAcc, PartIdSeqs, PartIdVersions}) ->
1062            PartIdVersions2 = update_part_versions(NewVersions, PartId, PartIdVersions),
1063            {ViewKVsAcc, DocIdViewIdKeysAcc, PartIdSeqs, PartIdVersions2}
1064        end,
1065        {ViewEmptyKVs, [], PartSeqs, PartVersions}, Kvs).
1066
1067
1068-spec update_transferred_replicas(#set_view_group{},
1069                                  partition_seqs(),
1070                                  partition_seqs()) -> #set_view_group{}.
1071update_transferred_replicas(Group, _MaxSeqs, _PartIdSeqs) when ?set_replicas_on_transfer(Group) =:= [] ->
1072    Group;
1073update_transferred_replicas(Group, MaxSeqs, PartIdSeqs) ->
1074    #set_view_group{index_header = Header} = Group,
1075    RepsTransferred = lists:foldl(
1076        fun({PartId, Seq}, A) ->
1077            case lists:member(PartId, ?set_replicas_on_transfer(Group))
1078                andalso (Seq >= couch_set_view_util:get_part_seq(PartId, MaxSeqs)) of
1079            true ->
1080                ordsets:add_element(PartId, A);
1081            false ->
1082                A
1083            end
1084        end,
1085        ordsets:new(), PartIdSeqs),
1086    ReplicasOnTransfer2 = ordsets:subtract(?set_replicas_on_transfer(Group), RepsTransferred),
1087    {Abitmask2, Pbitmask2} = lists:foldl(
1088        fun(Id, {A, P}) ->
1089            Mask = 1 bsl Id,
1090            Mask = ?set_pbitmask(Group) band Mask,
1091            0 = ?set_abitmask(Group) band Mask,
1092            {A bor Mask, P bxor Mask}
1093        end,
1094        {?set_abitmask(Group), ?set_pbitmask(Group)},
1095        RepsTransferred),
1096    Group#set_view_group{
1097        index_header = Header#set_view_index_header{
1098            abitmask = Abitmask2,
1099            pbitmask = Pbitmask2,
1100            replicas_on_transfer = ReplicasOnTransfer2
1101        }
1102    }.
1103
1104
1105-spec update_part_seq(update_seq(), partition_id(), partition_seqs()) -> partition_seqs().
1106update_part_seq(Seq, PartId, Acc) ->
1107    case couch_set_view_util:find_part_seq(PartId, Acc) of
1108    {ok, Max} when Max >= Seq ->
1109        Acc;
1110    _ ->
1111        orddict:store(PartId, Seq, Acc)
1112    end.
1113
1114
1115-spec update_part_versions(partition_version(), partition_id(), partition_versions()) ->
1116    partition_versions().
1117update_part_versions(NewVersions, PartId, PartVersions) ->
1118    orddict:store(PartId, NewVersions, PartVersions).
1119
1120
1121% Incremental updates.
1122write_to_tmp_batch_files(ViewKeyValuesToAdd, DocIdViewIdKeys, WriterAcc) ->
1123    #writer_acc{
1124        tmp_files = TmpFiles,
1125        group = #set_view_group{
1126            id_btree = IdBtree,
1127            mod = Mod
1128        }
1129    } = WriterAcc,
1130
1131    {AddDocIdViewIdKeys0, RemoveDocIds, LookupDocIds} = lists:foldr(
1132        fun({DocId, {PartId, [] = _ViewIdKeys}}, {A, B, C}) ->
1133                BackKey = make_back_index_key(DocId, PartId),
1134                case is_new_partition(PartId, WriterAcc) of
1135                true ->
1136                    {A, [BackKey | B], C};
1137                false ->
1138                    {A, [BackKey | B], [BackKey | C]}
1139                end;
1140            ({DocId, {PartId, _ViewIdKeys}} = KvPairs, {A, B, C}) ->
1141                BackKey = make_back_index_key(DocId, PartId),
1142                case is_new_partition(PartId, WriterAcc) of
1143                true ->
1144                    {[KvPairs | A], B, C};
1145                false ->
1146                    {[KvPairs | A], B, [BackKey | C]}
1147                end
1148        end,
1149        {[], [], []}, DocIdViewIdKeys),
1150
1151    AddDocIdViewIdKeys = Mod:convert_back_index_kvs_to_binary(
1152        AddDocIdViewIdKeys0, []),
1153
1154    IdsData1 = lists:map(
1155        fun(K) -> couch_set_view_updater_helper:encode_op(remove, K) end,
1156        RemoveDocIds),
1157
1158    IdsData2 = lists:foldl(
1159        fun({K, V}, Acc) ->
1160            Bin = couch_set_view_updater_helper:encode_op(insert, K, V),
1161            [Bin | Acc]
1162        end,
1163        IdsData1,
1164        AddDocIdViewIdKeys),
1165
1166    IdTmpFileInfo = dict:fetch(ids_index, TmpFiles),
1167    case IdTmpFileInfo of
1168    #set_view_tmp_file_info{fd = nil} ->
1169        0 = IdTmpFileInfo#set_view_tmp_file_info.size,
1170        IdTmpFilePath = new_sort_file_name(WriterAcc),
1171        {ok, IdTmpFileFd} = file2:open(IdTmpFilePath, [raw, append, binary]),
1172        IdTmpFileSize = 0;
1173    #set_view_tmp_file_info{
1174            fd = IdTmpFileFd, name = IdTmpFilePath, size = IdTmpFileSize} ->
1175        ok
1176    end,
1177
1178    ok = file:write(IdTmpFileFd, IdsData2),
1179
1180    IdTmpFileInfo2 = IdTmpFileInfo#set_view_tmp_file_info{
1181        fd = IdTmpFileFd,
1182        name = IdTmpFilePath,
1183        size = IdTmpFileSize + iolist_size(IdsData2)
1184    },
1185    TmpFiles2 = dict:store(ids_index, IdTmpFileInfo2, TmpFiles),
1186
1187    case LookupDocIds of
1188    [] ->
1189        LookupResults = [];
1190    _ ->
1191        {ok, LookupResults, IdBtree} =
1192            couch_btree:query_modify(IdBtree, LookupDocIds, [], [])
1193    end,
1194    KeysToRemoveByView = lists:foldl(
1195        fun(LookupResult, KeysToRemoveByViewAcc) ->
1196            case LookupResult of
1197            {ok, {<<_Part:16, DocId/binary>>, <<_Part:16, ViewIdKeys/binary>>}} ->
1198                lists:foldl(
1199                    fun({ViewId, Keys}, KeysToRemoveByViewAcc2) ->
1200                        RemoveKeysDict = lists:foldl(
1201                            fun(Key, RemoveKeysDictAcc) ->
1202                                EncodedKey = Mod:encode_key_docid(Key, DocId),
1203                                dict:store(EncodedKey, nil, RemoveKeysDictAcc)
1204                            end,
1205                        couch_util:dict_find(ViewId, KeysToRemoveByViewAcc2, dict:new()), Keys),
1206                        dict:store(ViewId, RemoveKeysDict, KeysToRemoveByViewAcc2)
1207                    end,
1208                    KeysToRemoveByViewAcc, couch_set_view_util:parse_view_id_keys(ViewIdKeys));
1209            {not_found, _} ->
1210                KeysToRemoveByViewAcc
1211            end
1212        end,
1213        dict:new(), LookupResults),
1214
1215    WriterAcc2 = update_tmp_files(
1216        Mod, WriterAcc#writer_acc{tmp_files = TmpFiles2}, ViewKeyValuesToAdd,
1217        KeysToRemoveByView),
1218    maybe_update_btrees(WriterAcc2).
1219
1220
1221% Update the temporary files with the key-values from the indexer. Return
1222% the updated writer accumulator.
1223-spec update_tmp_files(atom(), #writer_acc{},
1224                       [{#set_view{}, [set_view_key_value()]}],
1225                       dict:dict(non_neg_integer(), dict:dict(binary(), nil)))
1226                      -> #writer_acc{}.
1227update_tmp_files(Mod, WriterAcc, ViewKeyValues, KeysToRemoveByView) ->
1228    #writer_acc{
1229       group = Group,
1230       tmp_files = TmpFiles
1231    } = WriterAcc,
1232    TmpFiles2 = lists:foldl(
1233        fun({#set_view{id_num = ViewId}, AddKeyValues}, AccTmpFiles) ->
1234            AddKeyValuesBinaries = Mod:convert_primary_index_kvs_to_binary(
1235                AddKeyValues, Group, []),
1236            KeysToRemoveDict = couch_util:dict_find(
1237                ViewId, KeysToRemoveByView, dict:new()),
1238
1239            case Mod of
1240            % The b-tree replaces nodes with the same key, hence we don't
1241            % need to delete a node that gets updated anyway
1242            mapreduce_view ->
1243                {KeysToRemoveDict2, BatchData} = lists:foldl(
1244                    fun({K, V}, {KeysToRemoveAcc, BinOpAcc}) ->
1245                        Bin = couch_set_view_updater_helper:encode_op(
1246                            insert, K, V),
1247                        BinOpAcc2 = [Bin | BinOpAcc],
1248                        case dict:find(K, KeysToRemoveAcc) of
1249                        {ok, _} ->
1250                            {dict:erase(K, KeysToRemoveAcc), BinOpAcc2};
1251                        _ ->
1252                            {KeysToRemoveAcc, BinOpAcc2}
1253                        end
1254                    end,
1255                    {KeysToRemoveDict, []}, AddKeyValuesBinaries);
1256            % In r-trees there are multiple possible paths to a key. Hence it's
1257            % not easily possible to replace an existing node with a new one,
1258            % as the insertion could happen in a subtree that is different
1259            % from the subtree of the old value.
1260            spatial_view ->
1261                BatchData = [
1262                    couch_set_view_updater_helper:encode_op(insert, K, V) ||
1263                        {K, V} <- AddKeyValuesBinaries],
1264                KeysToRemoveDict2 = KeysToRemoveDict
1265            end,
1266
1267            BatchData2 = dict:fold(
1268                fun(K, _V, BatchAcc) ->
1269                    Bin = couch_set_view_updater_helper:encode_op(remove, K),
1270                    [Bin | BatchAcc]
1271                end,
1272                BatchData, KeysToRemoveDict2),
1273
1274            ViewTmpFileInfo = dict:fetch(ViewId, TmpFiles),
1275            case ViewTmpFileInfo of
1276            #set_view_tmp_file_info{fd = nil} ->
1277                0 = ViewTmpFileInfo#set_view_tmp_file_info.size,
1278                ViewTmpFilePath = new_sort_file_name(WriterAcc),
1279                {ok, ViewTmpFileFd} = file2:open(
1280                    ViewTmpFilePath, [raw, append, binary]),
1281                ViewTmpFileSize = 0;
1282            #set_view_tmp_file_info{fd = ViewTmpFileFd,
1283                                    size = ViewTmpFileSize,
1284                                    name = ViewTmpFilePath} ->
1285                ok
1286            end,
1287            ok = file:write(ViewTmpFileFd, BatchData2),
1288            ViewTmpFileInfo2 = ViewTmpFileInfo#set_view_tmp_file_info{
1289                fd = ViewTmpFileFd,
1290                name = ViewTmpFilePath,
1291                size = ViewTmpFileSize + iolist_size(BatchData2)
1292            },
1293            dict:store(ViewId, ViewTmpFileInfo2, AccTmpFiles)
1294        end,
1295    TmpFiles, ViewKeyValues),
1296    WriterAcc#writer_acc{
1297        tmp_files = TmpFiles2
1298    }.
1299
1300
1301is_new_partition(PartId, #writer_acc{initial_seqs = InitialSeqs}) ->
1302    couch_util:get_value(PartId, InitialSeqs, 0) == 0.
1303
1304
1305% For incremental index updates.
1306maybe_update_btrees(WriterAcc0) ->
1307    #writer_acc{
1308        view_empty_kvs = ViewEmptyKVs,
1309        tmp_files = TmpFiles,
1310        group = Group0,
1311        final_batch = IsFinalBatch,
1312        owner = Owner,
1313        last_seqs = LastSeqs,
1314        part_versions = PartVersions,
1315        force_flush = ForceFlush
1316    } = WriterAcc0,
1317    IdTmpFileInfo = dict:fetch(ids_index, TmpFiles),
1318    ShouldFlushViews =
1319        lists:any(
1320            fun({#set_view{id_num = Id}, _}) ->
1321                ViewTmpFileInfo = dict:fetch(Id, TmpFiles),
1322                ViewTmpFileInfo#set_view_tmp_file_info.size >= ?INC_MAX_TMP_FILE_SIZE
1323            end, ViewEmptyKVs),
1324    ShouldFlush = IsFinalBatch orelse
1325        ForceFlush orelse
1326        ((IdTmpFileInfo#set_view_tmp_file_info.size >= ?INC_MAX_TMP_FILE_SIZE) andalso
1327        ShouldFlushViews),
1328    case ShouldFlush of
1329    false ->
1330        NewLastSeqs1 = LastSeqs,
1331        case erlang:get(updater_worker) of
1332        undefined ->
1333            WriterAcc = WriterAcc0;
1334        UpdaterWorker when is_reference(UpdaterWorker) ->
1335            receive
1336            {UpdaterWorker, UpGroup, UpStats, CompactFiles} ->
1337                send_log_compact_files(Owner, CompactFiles, ?set_seqs(UpGroup),
1338                    ?set_partition_versions(UpGroup)),
1339                erlang:erase(updater_worker),
1340                WriterAcc = check_if_compactor_started(
1341                    WriterAcc0#writer_acc{group = UpGroup, stats = UpStats})
1342            after 0 ->
1343                WriterAcc = WriterAcc0
1344            end
1345        end;
1346    true ->
1347        ok = close_tmp_fd(IdTmpFileInfo),
1348        ok = lists:foreach(
1349            fun(#set_view{id_num = Id}) ->
1350                ViewTmpFileInfo = dict:fetch(Id, TmpFiles),
1351                ok = close_tmp_fd(ViewTmpFileInfo)
1352            end,
1353            Group0#set_view_group.views),
1354        case erlang:erase(updater_worker) of
1355        undefined ->
1356            WriterAcc1 = WriterAcc0;
1357        UpdaterWorker when is_reference(UpdaterWorker) ->
1358            receive
1359            {UpdaterWorker, UpGroup2, UpStats2, CompactFiles2} ->
1360                send_log_compact_files(Owner, CompactFiles2, ?set_seqs(UpGroup2),
1361                    ?set_partition_versions(UpGroup2)),
1362                WriterAcc1 = check_if_compactor_started(
1363                    WriterAcc0#writer_acc{
1364                        group = UpGroup2,
1365                        stats = UpStats2
1366                    })
1367            end
1368        end,
1369
1370
1371        % MB-11472: There is no id sortfile present and hence this is the final batch
1372        % and nothing left to be updated to the btree
1373        case IdTmpFileInfo#set_view_tmp_file_info.name of
1374        nil ->
1375            WriterAcc = WriterAcc1;
1376        _ ->
1377            WriterAcc2 = check_if_compactor_started(WriterAcc1),
1378            NewUpdaterWorker = spawn_updater_worker(WriterAcc2, LastSeqs, PartVersions),
1379            erlang:put(updater_worker, NewUpdaterWorker),
1380
1381            TmpFiles2 = dict:map(
1382                fun(_, _) -> #set_view_tmp_file_info{} end, TmpFiles),
1383            WriterAcc = WriterAcc2#writer_acc{tmp_files = TmpFiles2}
1384        end,
1385        NewLastSeqs1 = orddict:new()
1386    end,
1387    #writer_acc{
1388        stats = NewStats0,
1389        group = NewGroup0
1390    } = WriterAcc,
1391    case IsFinalBatch of
1392    true ->
1393        case erlang:erase(updater_worker) of
1394        undefined ->
1395            NewGroup = NewGroup0,
1396            NewStats = NewStats0;
1397        UpdaterWorker2 when is_reference(UpdaterWorker2) ->
1398            receive
1399            {UpdaterWorker2, NewGroup, NewStats, CompactFiles3} ->
1400                send_log_compact_files(Owner, CompactFiles3, ?set_seqs(NewGroup),
1401                    ?set_partition_versions(NewGroup))
1402            end
1403        end,
1404        NewLastSeqs = orddict:new();
1405    false ->
1406        NewGroup = NewGroup0,
1407        NewStats = NewStats0,
1408        NewLastSeqs = NewLastSeqs1
1409    end,
1410    NewWriterAcc = WriterAcc#writer_acc{
1411        stats = NewStats,
1412        group = NewGroup,
1413        last_seqs = NewLastSeqs
1414    },
1415    NewWriterAcc.
1416
1417
1418send_log_compact_files(_Owner, [], _Seqs, _PartVersions) ->
1419    ok;
1420send_log_compact_files(Owner, Files, Seqs, PartVersions) ->
1421    Init = case erlang:erase(new_compactor) of
1422    true ->
1423        true;
1424    undefined ->
1425        false
1426    end,
1427    ok = gen_server:cast(Owner, {compact_log_files, Files, Seqs, PartVersions, Init}).
1428
1429
1430-spec spawn_updater_worker(#writer_acc{}, partition_seqs(),
1431                           partition_versions()) -> reference().
1432spawn_updater_worker(WriterAcc, PartIdSeqs, PartVersions) ->
1433    Parent = self(),
1434    Ref = make_ref(),
1435    #writer_acc{
1436        group = Group,
1437        parent = UpdaterPid,
1438        max_seqs = MaxSeqs
1439    } = WriterAcc,
1440    % Wait for main updater process to ack
1441    UpdaterPid ! {native_updater_start, self()},
1442    receive
1443    {ok, native_updater_start} ->
1444        ok
1445    end,
1446    Pid = spawn_link(fun() ->
1447        case ?set_cbitmask(Group) of
1448        0 ->
1449            CleanupStart = 0;
1450        _ ->
1451            CleanupStart = os:timestamp()
1452        end,
1453        {ok, NewGroup0, CleanupCount, NewStats, NewCompactFiles} = update_btrees(WriterAcc),
1454        case ?set_cbitmask(Group) of
1455        0 ->
1456            CleanupTime = 0.0;
1457        _ ->
1458            CleanupTime = timer:now_diff(os:timestamp(), CleanupStart) / 1000000,
1459            #set_view_group{
1460                set_name = SetName,
1461                name = DDocId,
1462                type = GroupType
1463            } = Group,
1464            ?LOG_INFO("Updater for set view `~s`, ~s group `~s`, performed cleanup "
1465                      "of ~p key/value pairs in ~.3f seconds",
1466                      [SetName, GroupType, DDocId, CleanupCount, CleanupTime])
1467        end,
1468        NewSeqs = update_seqs(PartIdSeqs, ?set_seqs(Group)),
1469        NewPartVersions = update_versions(PartVersions, ?set_partition_versions(Group)),
1470        Header = NewGroup0#set_view_group.index_header,
1471        NewHeader = Header#set_view_index_header{
1472            seqs = NewSeqs,
1473            partition_versions = NewPartVersions
1474        },
1475        NewGroup = NewGroup0#set_view_group{
1476            index_header = NewHeader
1477        },
1478        NewGroup2 = update_transferred_replicas(NewGroup, MaxSeqs, PartIdSeqs),
1479        NumChanges = count_seqs_done(Group, NewSeqs),
1480        NewStats2 = NewStats#set_view_updater_stats{
1481           seqs = NewStats#set_view_updater_stats.seqs + NumChanges,
1482           cleanup_time = NewStats#set_view_updater_stats.seqs + CleanupTime,
1483           cleanup_kv_count = NewStats#set_view_updater_stats.cleanup_kv_count + CleanupCount
1484        },
1485        Parent ! {Ref, NewGroup2, NewStats2, NewCompactFiles}
1486    end),
1487    UpdaterPid ! {native_updater_pid, Pid},
1488    Ref.
1489
1490% Update id btree and view btrees with current batch of changes
1491update_btrees(WriterAcc) ->
1492    #writer_acc{
1493        stats = Stats,
1494        group = Group0,
1495        tmp_dir = TmpDir,
1496        tmp_files = TmpFiles,
1497        compactor_running = CompactorRunning,
1498        max_insert_batch_size = MaxBatchSize
1499    } = WriterAcc,
1500
1501    % Prepare list of operation logs for each btree
1502    #set_view_tmp_file_info{name = IdFile} = dict:fetch(ids_index, TmpFiles),
1503    ViewFiles = lists:map(
1504        fun(#set_view{id_num = Id}) ->
1505            #set_view_tmp_file_info{
1506                name = ViewFile
1507            } = dict:fetch(Id, TmpFiles),
1508            ViewFile
1509        end, Group0#set_view_group.views),
1510    % `LogFiles` is supplied to the native updater. For spatial views only the
1511    % ID b-tree is updated.
1512    LogFiles = case Group0#set_view_group.mod of
1513    mapreduce_view ->
1514        [IdFile | ViewFiles];
1515    spatial_view ->
1516        [IdFile]
1517    end,
1518
1519    % Remove spatial views from group
1520    % The native updater can currently handle mapreduce views only, but it
1521    % handles the ID b-tree of the spatial view
1522    Group = couch_set_view_util:remove_group_views(Group0, spatial_view),
1523
1524    {ok, NewGroup0, Stats2} = couch_set_view_updater_helper:update_btrees(
1525        Group, TmpDir, LogFiles, MaxBatchSize, false),
1526    {IdsInserted, IdsDeleted, KVsInserted, KVsDeleted, CleanupCount} = Stats2,
1527
1528    % Add back spatial views
1529    NewGroup = couch_set_view_util:update_group_views(
1530        NewGroup0, Group0, spatial_view),
1531
1532    % The native update for the Id b-tree was run, now it's time to run the
1533    % Erlang updater for the spatial views
1534    NewGroup2 = case NewGroup#set_view_group.mod of
1535    mapreduce_view ->
1536        NewGroup;
1537    spatial_view = Mod ->
1538        ok = couch_file:refresh_eof(NewGroup#set_view_group.fd),
1539        Views = Mod:update_spatial(NewGroup#set_view_group.views, ViewFiles,
1540            MaxBatchSize),
1541        NewGroup#set_view_group{
1542            views = Views,
1543            index_header = (NewGroup#set_view_group.index_header)#set_view_index_header{
1544                view_states = [Mod:get_state(V#set_view.indexer) || V <- Views]
1545            }
1546        }
1547    end,
1548
1549    NewStats = Stats#set_view_updater_stats{
1550     inserted_ids = Stats#set_view_updater_stats.inserted_ids + IdsInserted,
1551     deleted_ids = Stats#set_view_updater_stats.deleted_ids + IdsDeleted,
1552     inserted_kvs = Stats#set_view_updater_stats.inserted_kvs + KVsInserted,
1553     deleted_kvs = Stats#set_view_updater_stats.deleted_kvs + KVsDeleted
1554    },
1555
1556    % Remove files if compactor is not running
1557    % Otherwise send them to compactor to apply deltas
1558    CompactFiles = lists:foldr(
1559        fun(SortedFile, AccCompactFiles) ->
1560            case CompactorRunning of
1561            true ->
1562                case filename:extension(SortedFile) of
1563                ".compact" ->
1564                     [SortedFile | AccCompactFiles];
1565                _ ->
1566                    SortedFile2 = new_sort_file_name(TmpDir, true),
1567                    ok = file2:rename(SortedFile, SortedFile2),
1568                    [SortedFile2 | AccCompactFiles]
1569                end;
1570            false ->
1571                ok = file2:delete(SortedFile),
1572                AccCompactFiles
1573            end
1574        end, [], [IdFile | ViewFiles]),
1575    {ok, NewGroup2, CleanupCount, NewStats, CompactFiles}.
1576
1577
1578update_seqs(PartIdSeqs, Seqs) ->
1579    orddict:fold(
1580        fun(PartId, NewSeq, Acc) ->
1581            OldSeq = couch_util:get_value(PartId, Acc, 0),
1582            case NewSeq > OldSeq of
1583            true ->
1584                ok;
1585            false ->
1586                exit({error, <<"New seq smaller or equal than old seq.">>, PartId, OldSeq, NewSeq})
1587            end,
1588            orddict:store(PartId, NewSeq, Acc)
1589        end,
1590        Seqs, PartIdSeqs).
1591
1592update_versions(PartVersions, AllPartVersions) ->
1593    lists:ukeymerge(1, PartVersions, AllPartVersions).
1594
1595
1596update_task(NumChanges) ->
1597    [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
1598    Changes2 = Changes + NumChanges,
1599    Total2 = erlang:max(Total, Changes2),
1600    Progress = (Changes2 * 100) div Total2,
1601    couch_task_status:update([
1602        {progress, Progress},
1603        {changes_done, Changes2},
1604        {total_changes, Total2}
1605    ]).
1606
1607
1608checkpoint(#writer_acc{owner = Owner, parent = Parent, group = Group} = Acc) ->
1609    #set_view_group{
1610        set_name = SetName,
1611        name = DDocId,
1612        type = Type
1613    } = Group,
1614    ?LOG_INFO("Updater checkpointing set view `~s` update for ~s group `~s`",
1615              [SetName, Type, DDocId]),
1616    NewGroup = maybe_fix_group(Group),
1617    ok = couch_file:refresh_eof(NewGroup#set_view_group.fd),
1618    Owner ! {partial_update, Parent, self(), NewGroup},
1619    receive
1620    update_processed ->
1621        ok;
1622    stop ->
1623        exit(shutdown)
1624    end,
1625    Acc#writer_acc{group = NewGroup}.
1626
1627
1628maybe_fix_group(#set_view_group{index_header = Header} = Group) ->
1629    receive
1630    {new_passive_partitions, Parts} ->
1631        Bitmask = couch_set_view_util:build_bitmask(Parts),
1632        {Seqs, PartVersions} = couch_set_view_util:fix_partitions(Group, Parts),
1633        Group#set_view_group{
1634            index_header = Header#set_view_index_header{
1635                seqs = Seqs,
1636                pbitmask = ?set_pbitmask(Group) bor Bitmask,
1637                partition_versions = PartVersions
1638            }
1639        }
1640    after 0 ->
1641        Group
1642    end.
1643
1644
1645check_if_compactor_started(#writer_acc{group = Group0} = Acc) ->
1646    receive
1647    {compactor_started, Pid} ->
1648        erlang:put(new_compactor, true),
1649        Group = maybe_fix_group(Group0),
1650        Pid ! {compactor_started_ack, self(), Group},
1651        Acc#writer_acc{compactor_running = true, group = Group}
1652    after 0 ->
1653        Acc
1654    end.
1655
1656
1657init_tmp_files(WriterAcc) ->
1658    #writer_acc{
1659        group = Group, initial_build = Init, tmp_dir = TmpDir
1660    } = WriterAcc,
1661    case WriterAcc#writer_acc.compactor_running of
1662    true ->
1663        ok = couch_set_view_util:delete_sort_files(TmpDir, updater);
1664    false ->
1665        ok = couch_set_view_util:delete_sort_files(TmpDir, all)
1666    end,
1667    Ids = [ids_index | [V#set_view.id_num || V <- Group#set_view_group.views]],
1668    Files = case Init of
1669    true ->
1670        [begin
1671             FileName = new_sort_file_name(WriterAcc),
1672             {ok, Fd} = file2:open(FileName, [raw, append, binary]),
1673             {Id, #set_view_tmp_file_info{fd = Fd, name = FileName}}
1674         end || Id <- Ids];
1675    false ->
1676         [{Id, #set_view_tmp_file_info{}} || Id <- Ids]
1677    end,
1678    WriterAcc#writer_acc{tmp_files = dict:from_list(Files)}.
1679
1680
1681new_sort_file_name(#writer_acc{tmp_dir = TmpDir, compactor_running = Cr}) ->
1682    new_sort_file_name(TmpDir, Cr).
1683
1684new_sort_file_name(TmpDir, true) ->
1685    couch_set_view_util:new_sort_file_path(TmpDir, compactor);
1686new_sort_file_name(TmpDir, false) ->
1687    couch_set_view_util:new_sort_file_path(TmpDir, updater).
1688
1689
1690make_back_index_key(DocId, PartId) ->
1691    <<PartId:16, DocId/binary>>.
1692
1693
1694count_seqs_done(Group, NewSeqs) ->
1695    % NewSeqs might have new passive partitions that Group's seqs doesn't
1696    % have yet (will get them after a checkpoint period).
1697    lists:foldl(
1698        fun({PartId, SeqDone}, Acc) ->
1699            SeqBefore = couch_util:get_value(PartId, ?set_seqs(Group), 0),
1700            Acc + (SeqDone - SeqBefore)
1701        end,
1702        0, NewSeqs).
1703
1704
1705close_tmp_fd(#set_view_tmp_file_info{fd = nil}) ->
1706    ok;
1707close_tmp_fd(#set_view_tmp_file_info{fd = Fd}) ->
1708    ok = file:close(Fd).
1709
1710
1711% DCP introduces the concept of snapshots, where a document mutation is only
1712% guaranteed to be unique within a single snapshot. But the flusher expects
1713% unique mutations within the full batch. Merge multiple snapshots (if there
1714% are any) into a single one. The latest mutation wins.
1715merge_snapshots(KVs) ->
1716    merge_snapshots(KVs, false, []).
1717
1718merge_snapshots([], true, Acc) ->
1719    {true, Acc};
1720merge_snapshots([], false, Acc) ->
1721    % The order of the KVs doesn't really matter, but having them sorted the
1722    % same way will make life easier when debugging
1723    {false, lists:reverse(Acc)};
1724merge_snapshots([snapshot_marker | KVs], _MultipleSnapshots, Acc) ->
1725    merge_snapshots(KVs, true, Acc);
1726merge_snapshots([{part_versions, _} = PartVersions | KVs], MultipleSnapshots, Acc) ->
1727    merge_snapshots(KVs, MultipleSnapshots, [PartVersions | Acc]);
1728merge_snapshots([KV | KVs], true, Acc0) ->
1729    {_Seq, DocId, _PartId, _QueryResults} = KV,
1730    Acc = lists:keystore(DocId, 2, Acc0, KV),
1731    merge_snapshots(KVs, true, Acc);
1732merge_snapshots([KV | KVs], false, Acc) ->
1733    merge_snapshots(KVs, false, [KV | Acc]).
1734