1% -*- Mode: Erlang; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3% Licensed under the Apache License, Version 2.0 (the "License"); you may not
4% use this file except in compliance with the License. You may obtain a copy of
5% the License at
6%
7%   http://www.apache.org/licenses/LICENSE-2.0
8%
9% Unless required by applicable law or agreed to in writing, software
10% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12% License for the specific language governing permissions and limitations under
13% the License.
14
15-module(couch_set_view_updater).
16
17-export([update/6]).
18% Exported for the MapReduce specific stuff
19-export([new_sort_file_name/1]).
20
21-include("couch_db.hrl").
22-include_lib("couch_set_view/include/couch_set_view.hrl").
23-include_lib("couch_dcp/include/couch_dcp.hrl").
24
25-define(MAP_QUEUE_SIZE, 256 * 1024).
26-define(WRITE_QUEUE_SIZE, 512 * 1024).
27% The size of the accumulator the emitted key-values are queued up in before
28% they get actually queued. The bigger the value, the less the lock contention,
29% but the higher the possible memory consumption is.
30-define(QUEUE_ACC_BATCH_SIZE, 256 * 1024).
31
32% incremental updates
33-define(INC_MAX_TMP_FILE_SIZE, 31457280).
34-define(MIN_BATCH_SIZE_PER_VIEW, 65536).
35
36% For file sorter and file merger commands.
37-define(PORT_OPTS,
38        [exit_status, use_stdio, stderr_to_stdout, {line, 4096}, binary]).
39
40-record(writer_acc, {
41    parent,
42    owner,
43    group,
44    last_seqs = orddict:new(),
45    part_versions = orddict:new(),
46    compactor_running,
47    write_queue,
48    initial_build,
49    view_empty_kvs,
50    kvs = [],
51    kvs_size = 0,
52    state = updating_active,
53    final_batch = false,
54    max_seqs,
55    stats = #set_view_updater_stats{},
56    tmp_dir = nil,
57    initial_seqs,
58    max_insert_batch_size,
59    tmp_files = dict:new(),
60    throttle = 0,
61    force_flush = false
62}).
63
64
65-spec update(pid(), #set_view_group{},
66             partition_seqs(), boolean(), string(), [term()]) -> no_return().
67update(Owner, Group, CurSeqs, CompactorRunning, TmpDir, Options) ->
68    #set_view_group{
69        set_name = SetName,
70        type = Type,
71        name = DDocId
72    } = Group,
73
74    ActiveParts = couch_set_view_util:decode_bitmask(?set_abitmask(Group)),
75    PassiveParts = couch_set_view_util:decode_bitmask(?set_pbitmask(Group)),
76    NumChanges = couch_set_view_util:missing_changes_count(CurSeqs, ?set_seqs(Group)),
77
78    process_flag(trap_exit, true),
79
80    BeforeEnterTs = os:timestamp(),
81    Parent = self(),
82    BarrierEntryPid = spawn_link(fun() ->
83        DDocIds = couch_set_view_util:get_ddoc_ids_with_sig(SetName, Group),
84        couch_task_status:add_task([
85            {type, blocked_indexer},
86            {set, SetName},
87            {signature, ?l2b(couch_util:to_hex(Group#set_view_group.sig))},
88            {design_documents, DDocIds},
89            {indexer_type, Type}
90        ]),
91        case Type of
92        main ->
93            ok = couch_index_barrier:enter(couch_main_index_barrier, Parent);
94        replica ->
95            ok = couch_index_barrier:enter(couch_replica_index_barrier, Parent)
96        end,
97        Parent ! {done, self(), (timer:now_diff(os:timestamp(), BeforeEnterTs) / 1000000)},
98        receive shutdown -> ok end
99    end),
100
101    BlockedTime = receive
102    {done, BarrierEntryPid, Duration} ->
103        Duration;
104    {'EXIT', _, Reason} ->
105        exit({updater_error, Reason})
106    end,
107
108    CleanupParts = couch_set_view_util:decode_bitmask(?set_cbitmask(Group)),
109    InitialBuild = couch_set_view_util:is_initial_build(Group),
110    ?LOG_INFO("Updater for set view `~s`, ~s group `~s` started~n"
111              "Active partitions:    ~w~n"
112              "Passive partitions:   ~w~n"
113              "Cleanup partitions:   ~w~n"
114              "Replicas to transfer: ~w~n"
115              "Pending transition:   ~n"
116              "    active:           ~w~n"
117              "    passive:          ~w~n"
118              "    unindexable:      ~w~n"
119              "Initial build:        ~s~n"
120              "Compactor running:    ~s~n"
121              "Min # changes:        ~p~n"
122              "Partition versions:   ~w~n",
123              [SetName, Type, DDocId,
124               ActiveParts,
125               PassiveParts,
126               CleanupParts,
127               ?set_replicas_on_transfer(Group),
128               ?pending_transition_active(?set_pending_transition(Group)),
129               ?pending_transition_passive(?set_pending_transition(Group)),
130               ?pending_transition_unindexable(?set_pending_transition(Group)),
131               InitialBuild,
132               CompactorRunning,
133               NumChanges,
134               ?set_partition_versions(Group)
135              ]),
136
137    WriterAcc0 = #writer_acc{
138        parent = self(),
139        owner = Owner,
140        group = Group,
141        initial_build = InitialBuild,
142        max_seqs = CurSeqs,
143        part_versions = ?set_partition_versions(Group),
144        tmp_dir = TmpDir,
145        max_insert_batch_size = list_to_integer(
146            couch_config:get("set_views", "indexer_max_insert_batch_size", "1048576"))
147    },
148    update(WriterAcc0, ActiveParts, PassiveParts,
149            BlockedTime, BarrierEntryPid, NumChanges, CompactorRunning, Options).
150
151
152update(WriterAcc, ActiveParts, PassiveParts, BlockedTime,
153       BarrierEntryPid, NumChanges, CompactorRunning, Options) ->
154    #writer_acc{
155        owner = Owner,
156        group = Group
157    } = WriterAcc,
158    #set_view_group{
159        set_name = SetName,
160        type = Type,
161        name = DDocId,
162        sig = GroupSig
163    } = Group,
164
165    StartTime = os:timestamp(),
166
167    MapQueueOptions = [{max_size, ?MAP_QUEUE_SIZE}, {max_items, infinity}],
168    WriteQueueOptions = [{max_size, ?WRITE_QUEUE_SIZE}, {max_items, infinity}],
169    {ok, MapQueue} = couch_work_queue:new(MapQueueOptions),
170    {ok, WriteQueue} = couch_work_queue:new(WriteQueueOptions),
171
172    Parent = self(),
173
174    Mapper = spawn_link(fun() ->
175        try
176            do_maps(Group, MapQueue, WriteQueue)
177        catch _:Error ->
178            Stacktrace = erlang:get_stacktrace(),
179            ?LOG_ERROR("Set view `~s`, ~s group `~s`, mapper error~n"
180                "error:      ~p~n"
181                "stacktrace: ~p~n",
182                [SetName, Type, DDocId, Error, Stacktrace]),
183            exit(Error)
184        end
185    end),
186
187    Writer = spawn_link(fun() ->
188        BarrierEntryPid ! shutdown,
189        ViewEmptyKVs = [{View, []} || View <- Group#set_view_group.views],
190        WriterAcc2 = init_tmp_files(WriterAcc#writer_acc{
191            parent = Parent,
192            group = Group,
193            write_queue = WriteQueue,
194            view_empty_kvs = ViewEmptyKVs,
195            compactor_running = CompactorRunning,
196            initial_seqs = ?set_seqs(Group),
197            throttle = case lists:member(throttle, Options) of
198                true ->
199                    list_to_integer(
200                        couch_config:get("set_views", "throttle_period", "100"));
201                false ->
202                    0
203                end
204        }),
205        ok = couch_set_view_util:open_raw_read_fd(Group),
206        try
207            WriterAcc3 = do_writes(WriterAcc2),
208            receive
209            {doc_loader_finished, {PartVersions0, RealMaxSeqs}} ->
210                WriterAccStats = WriterAcc3#writer_acc.stats,
211                WriterAccGroup = WriterAcc3#writer_acc.group,
212                WriterAccHeader = WriterAccGroup#set_view_group.index_header,
213                PartVersions = lists:ukeymerge(1, PartVersions0,
214                    WriterAccHeader#set_view_index_header.partition_versions),
215                case WriterAcc3#writer_acc.initial_build of
216                true ->
217                    % The doc loader might not load the mutations up to the
218                    % most recent one, but only to a lower one. Update the
219                    % group header and stats with the correct information.
220                    MaxSeqs = lists:ukeymerge(
221                        1, RealMaxSeqs, WriterAccHeader#set_view_index_header.seqs),
222                    Stats = WriterAccStats#set_view_updater_stats{
223                        seqs = lists:sum([S || {_, S} <- RealMaxSeqs])
224                    };
225                false ->
226                    MaxSeqs = WriterAccHeader#set_view_index_header.seqs,
227                    Stats = WriterAcc3#writer_acc.stats
228                end,
229                FinalWriterAcc = WriterAcc3#writer_acc{
230                    stats = Stats,
231                    group = WriterAccGroup#set_view_group{
232                        index_header = WriterAccHeader#set_view_index_header{
233                            partition_versions = PartVersions,
234                            seqs = MaxSeqs
235                        }
236                    }
237                }
238            end,
239            Parent ! {writer_finished, FinalWriterAcc}
240        catch _:Error ->
241            Stacktrace = erlang:get_stacktrace(),
242            ?LOG_ERROR("Set view `~s`, ~s group `~s`, writer error~n"
243                "error:      ~p~n"
244                "stacktrace: ~p~n",
245                [SetName, Type, DDocId, Error, Stacktrace]),
246            exit(Error)
247        after
248            ok = couch_set_view_util:close_raw_read_fd(Group)
249        end
250    end),
251
252    InitialBuild = WriterAcc#writer_acc.initial_build,
253    NumChanges2 = case InitialBuild of
254        true ->
255            couch_set_view_updater_helper:count_items_from_set(Group, ActiveParts ++ PassiveParts);
256        false ->
257            NumChanges
258    end,
259
260    DocLoader = spawn_link(fun() ->
261        DDocIds = couch_set_view_util:get_ddoc_ids_with_sig(SetName, Group),
262        couch_task_status:add_task([
263            {type, indexer},
264            {set, SetName},
265            {signature, ?l2b(couch_util:to_hex(GroupSig))},
266            {design_documents, DDocIds},
267            {indexer_type, Type},
268            {progress, 0},
269            {changes_done, 0},
270            {initial_build, InitialBuild},
271            {total_changes, NumChanges2}
272        ]),
273        couch_task_status:set_update_frequency(5000),
274        case lists:member(pause, Options) of
275        true ->
276            % For reliable unit testing, to verify that adding new partitions
277            % to the passive state doesn't restart the updater and the updater
278            % can be aware of it and index these new partitions in the same run.
279            receive continue -> ok end;
280        false ->
281            ok
282        end,
283        try
284            {PartVersions, MaxSeqs} = load_changes(
285                Owner, Parent, Group, MapQueue, ActiveParts, PassiveParts,
286                WriterAcc#writer_acc.max_seqs,
287                WriterAcc#writer_acc.initial_build),
288            Parent ! {doc_loader_finished, {PartVersions, MaxSeqs}}
289        catch
290        throw:purge ->
291            exit(purge);
292        throw:{rollback, RollbackSeqs} ->
293            exit({rollback, RollbackSeqs});
294        _:Error ->
295            Stacktrace = erlang:get_stacktrace(),
296            ?LOG_ERROR("Set view `~s`, ~s group `~s`, doc loader error~n"
297                "error:      ~p~n"
298                "stacktrace: ~p~n",
299                [SetName, Type, DDocId, Error, Stacktrace]),
300            exit(Error)
301        end,
302        % Since updater progress stats is added from docloader,
303        % this process has to stay till updater has completed.
304        receive
305        updater_finished ->
306            ok
307        end
308    end),
309
310    Result = wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, Group),
311    case Type of
312    main ->
313        ok = couch_index_barrier:leave(couch_main_index_barrier);
314    replica ->
315        ok = couch_index_barrier:leave(couch_replica_index_barrier)
316    end,
317    case Result of
318    {updater_finished, #set_view_updater_result{group = NewGroup}} ->
319        ?LOG_DEBUG("Updater for ~s set view group `~s`, set `~s`, writer finished:~n"
320                   "  start seqs: ~w~n"
321                   "  end seqs:   ~w~n",
322                   [Type, DDocId, SetName, ?set_seqs(Group), ?set_seqs(NewGroup)]);
323    _ ->
324        ok
325    end,
326    DocLoader ! updater_finished,
327    exit(Result).
328
329-spec get_active_dcp_streams(pid()) -> list().
330get_active_dcp_streams(DcpPid) ->
331    case couch_dcp_client:list_streams(DcpPid) of
332    {active_list_streams, ActiveStreams} ->
333        ActiveStreams;
334    {retry_list_streams, Timeout} ->
335        receive
336        after Timeout ->
337            get_active_dcp_streams(DcpPid)
338        end
339    end.
340
341-spec stop_dcp_streams(pid()) -> ok.
342stop_dcp_streams(DcpPid) ->
343    ActiveStreams = get_active_dcp_streams(DcpPid),
344    lists:foreach(fun(ActiveStream) ->
345        case couch_dcp_client:remove_stream(DcpPid, ActiveStream) of
346        ok ->
347            ok;
348        {error, vbucket_stream_not_found} ->
349            ok;
350        Error ->
351            ?LOG_ERROR("Unexpected error for closing stream of partition ~p",
352                [ActiveStream]),
353            throw(Error)
354        end
355    end, ActiveStreams),
356    ok.
357
358wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup) ->
359    #set_view_group{set_name = SetName, name = DDocId, type = Type} = OldGroup,
360    receive
361    {new_passive_partitions, _} = NewPassivePartitions ->
362        Writer ! NewPassivePartitions,
363        DocLoader ! NewPassivePartitions,
364        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
365    continue ->
366        % Used by unit tests.
367        DocLoader ! continue,
368        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
369    {doc_loader_finished, {PartVersions, MaxSeqs}} ->
370        Writer ! {doc_loader_finished, {PartVersions, MaxSeqs}},
371        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
372    {writer_finished, WriterAcc} ->
373        Stats0 = WriterAcc#writer_acc.stats,
374        Result = #set_view_updater_result{
375            group = WriterAcc#writer_acc.group,
376            state = WriterAcc#writer_acc.state,
377            stats = Stats0#set_view_updater_stats{
378                indexing_time = timer:now_diff(os:timestamp(), StartTime) / 1000000,
379                blocked_time = BlockedTime
380            },
381            tmp_file = case WriterAcc#writer_acc.initial_build of
382                true ->
383                    dict:fetch(build_file, WriterAcc#writer_acc.tmp_files);
384                false ->
385                    ""
386                end
387        },
388        {updater_finished, Result};
389    {compactor_started, Pid, Ref} ->
390        ?LOG_INFO("Set view `~s`, ~s group `~s`, updater received "
391                  "compactor ~p notification, ref ~p, writer ~p",
392                   [SetName, Type, DDocId, Pid, Ref, Writer]),
393        Writer ! {compactor_started, self()},
394        erlang:put(compactor_pid, {Pid, Ref}),
395        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
396    {compactor_started_ack, Writer, GroupSnapshot} ->
397        ?LOG_INFO("Set view `~s`, ~s group `~s`, updater received compaction ack"
398                  " from writer ~p", [SetName, Type, DDocId, Writer]),
399        case erlang:erase(compactor_pid) of
400        {Pid, Ref} ->
401            Pid ! {Ref, {ok, GroupSnapshot}};
402        undefined ->
403            ok
404        end,
405        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
406    {'EXIT', _, Reason} when Reason =/= normal ->
407        couch_util:shutdown_sync(DocLoader),
408        couch_util:shutdown_sync(Mapper),
409        couch_util:shutdown_sync(Writer),
410        stop_dcp_streams(OldGroup#set_view_group.dcp_pid),
411        {updater_error, Reason};
412    {native_updater_start, Writer} ->
413        % We need control over spawning native updater process
414        % This helps to terminate native os processes correctly
415        Writer ! {ok, native_updater_start},
416        receive
417        {native_updater_pid, NativeUpdater} ->
418            erlang:put(native_updater, NativeUpdater)
419        end,
420        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
421    stop ->
422        case erlang:erase(native_updater) of
423        undefined ->
424            couch_util:shutdown_sync(DocLoader),
425            couch_util:shutdown_sync(Mapper),
426            couch_util:shutdown_sync(Writer);
427        NativeUpdater ->
428            MRef = erlang:monitor(process, NativeUpdater),
429            NativeUpdater ! stop,
430            receive
431            {'DOWN', MRef, process, NativeUpdater, _} ->
432                couch_util:shutdown_sync(DocLoader),
433                couch_util:shutdown_sync(Mapper),
434                couch_util:shutdown_sync(Writer)
435            end,
436            stop_dcp_streams(OldGroup#set_view_group.dcp_pid)
437        end,
438        exit({updater_error, shutdown});
439    {add_stream, DcpPid, PartId, PartUuid, StartSeq, EndSeq, Flags} ->
440        Result = couch_dcp_client:add_stream(DcpPid, PartId, PartUuid, StartSeq, EndSeq, Flags),
441        DocLoader ! {add_stream_result, Result},
442        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup)
443    end.
444
445
446dcp_marker_to_string(Type) ->
447    case Type band ?DCP_SNAPSHOT_TYPE_MASK of
448    ?DCP_SNAPSHOT_TYPE_DISK ->
449        "on-disk";
450    ?DCP_SNAPSHOT_TYPE_MEMORY ->
451        "in-memory";
452    _ ->
453        io_lib:format("unknown (~w)", [Type])
454    end.
455
456
457load_changes(Owner, Updater, Group, MapQueue, ActiveParts, PassiveParts,
458        EndSeqs, InitialBuild) ->
459    #set_view_group{
460        set_name = SetName,
461        name = DDocId,
462        type = GroupType,
463        index_header = #set_view_index_header{
464            seqs = SinceSeqs,
465            partition_versions = PartVersions0
466        },
467        dcp_pid = DcpPid,
468        category = Category
469    } = Group,
470
471    MaxDocSize = list_to_integer(
472        couch_config:get("set_views", "indexer_max_doc_size", "0")),
473    AddStreamFun = fun(Pid, PartId, PartUuid, StartSeq, EndSeq, Flags) ->
474        Updater ! {add_stream, Pid, PartId, PartUuid, StartSeq, EndSeq, Flags},
475        receive {add_stream_result, Result} -> Result end,
476        Result
477    end,
478    FoldFun = fun({PartId, EndSeq}, {AccCount, AccSeqs, AccVersions, AccRollbacks}) ->
479        case couch_set_view_util:has_part_seq(PartId, ?set_unindexable_seqs(Group))
480            andalso not lists:member(PartId, ?set_replicas_on_transfer(Group)) of
481        true ->
482            {AccCount, AccSeqs, AccVersions, AccRollbacks};
483        false ->
484            Since = couch_util:get_value(PartId, SinceSeqs, 0),
485            PartVersions = couch_util:get_value(PartId, AccVersions),
486            Flags = case InitialBuild of
487            true ->
488                ?DCP_FLAG_DISKONLY;
489            false ->
490                ?DCP_FLAG_NOFLAG
491            end,
492            % For stream request from 0, If a vbucket got reset in the window
493            % of time between seqno was obtained from stats and stream request
494            % was made, the end_seqno may be higher than current vb high seqno.
495            % Use a special flag to tell server to set end_seqno.
496            Flags2 = case PartVersions of
497            [{0, 0}] ->
498                Flags bor ?DCP_FLAG_USELATEST_ENDSEQNO;
499            _ ->
500                Flags
501            end,
502
503            case AccRollbacks of
504            [] ->
505                case EndSeq =:= Since of
506                true ->
507                    {AccCount, AccSeqs, AccVersions, AccRollbacks};
508                false ->
509                    ChangesWrapper = fun
510                        ({part_versions, _} = NewVersions, Acc) ->
511                            queue_doc(
512                                NewVersions, MapQueue, Group,
513                                MaxDocSize, InitialBuild),
514                            Acc;
515                        ({snapshot_marker, {MarkerStartSeq, MarkerEndSeq, MarkerType}}, {Count, _})
516                            when MarkerType band ?DCP_SNAPSHOT_TYPE_MASK =/= 0 ->
517                            ?LOG_INFO(
518                                "set view `~s`, ~s (~s) group `~s`: received "
519                                "a snapshot marker (~s) for partition ~p from "
520                                "sequence ~p to ~p",
521                                [SetName, GroupType, Category, DDocId,
522                                    dcp_marker_to_string(MarkerType),
523                                    PartId, MarkerStartSeq, MarkerEndSeq]),
524                            case Count of
525                            % Ignore the snapshot marker that is at the
526                            % beginning of the stream.
527                            % If it wasn't ignored, it would lead to an
528                            % additional forced flush which isn't needed. A
529                            % flush is needed if there are several mutations
530                            % of the same document within one batch. As two
531                            % different partitions can't contain the same
532                            % document ID, we are safe to not force flushing
533                            % between two partitions.
534                            0 ->
535                                {Count, MarkerEndSeq};
536                            _ ->
537                                queue_doc(
538                                    snapshot_marker, MapQueue, Group,
539                                    MaxDocSize, InitialBuild),
540                                {Count, MarkerEndSeq}
541                            end;
542                        ({snapshot_marker, {MarkerStartSeq, MarkerEndSeq, MarkerType}}, Acc) ->
543                            ?LOG_ERROR(
544                                "set view `~s`, ~s (~s) group `~s`: received "
545                                "a snapshot marker (~s) for partition ~p from "
546                                "sequence ~p to ~p",
547                                [SetName, GroupType, Category, DDocId,
548                                    dcp_marker_to_string(MarkerType),
549                                    PartId, MarkerStartSeq, MarkerEndSeq]),
550                            throw({error, unknown_snapshot_marker, MarkerType}),
551                            Acc;
552                        (#dcp_doc{} = Item, {Count, AccEndSeq}) ->
553                            queue_doc(
554                                Item, MapQueue, Group,
555                                MaxDocSize, InitialBuild),
556                            {Count + 1, AccEndSeq}
557                        end,
558                    Result = couch_dcp_client:enum_docs_since(
559                        DcpPid, PartId, PartVersions, Since, EndSeq, Flags2,
560                        ChangesWrapper, {0, 0}, AddStreamFun),
561                    case Result of
562                    {ok, {AccCount2, AccEndSeq}, NewPartVersions} ->
563                        AccSeqs2 = orddict:store(PartId, AccEndSeq, AccSeqs),
564                        AccVersions2 = lists:ukeymerge(
565                            1, [{PartId, NewPartVersions}], AccVersions),
566                        AccRollbacks2 = AccRollbacks;
567                    {rollback, RollbackSeq} ->
568                        AccCount2 = AccCount,
569                        AccSeqs2 = AccSeqs,
570                        AccVersions2 = AccVersions,
571                        AccRollbacks2 = ordsets:add_element(
572                            {PartId, RollbackSeq}, AccRollbacks);
573                    Error ->
574                        AccCount2 = AccCount,
575                        AccSeqs2 = AccSeqs,
576                        AccVersions2 = AccVersions,
577                        AccRollbacks2 = AccRollbacks,
578                        ?LOG_ERROR("set view `~s`, ~s (~s) group `~s` error"
579                            "while loading changes for partition ~p:~n~p~n",
580                            [SetName, GroupType, Category, DDocId, PartId,
581                                Error]),
582                        throw(Error)
583                    end,
584                    {AccCount2, AccSeqs2, AccVersions2, AccRollbacks2}
585                end;
586            _ ->
587                % If there is a rollback needed, don't store any new documents
588                % in the index, but just check for a rollback of another
589                % partition (i.e. a request with start seq == end seq)
590                ChangesWrapper = fun(_, _) -> ok end,
591                Result = couch_dcp_client:enum_docs_since(
592                    DcpPid, PartId, PartVersions, Since, Since, Flags2,
593                    ChangesWrapper, ok, AddStreamFun),
594                case Result of
595                {ok, _, _} ->
596                    AccRollbacks2 = AccRollbacks;
597                {rollback, RollbackSeq} ->
598                    AccRollbacks2 = ordsets:add_element(
599                        {PartId, RollbackSeq}, AccRollbacks)
600                end,
601                {AccCount, AccSeqs, AccVersions, AccRollbacks2}
602            end
603        end
604    end,
605
606    notify_owner(Owner, {state, updating_active}, Updater),
607    case ActiveParts of
608    [] ->
609        ActiveChangesCount = 0,
610        MaxSeqs = orddict:new(),
611        PartVersions = PartVersions0,
612        Rollbacks = [];
613    _ ->
614        ?LOG_INFO("Updater reading changes from active partitions to "
615                  "update ~s set view group `~s` from set `~s`",
616                  [GroupType, DDocId, SetName]),
617        {ActiveChangesCount, MaxSeqs, PartVersions, Rollbacks} = lists:foldl(
618            FoldFun, {0, orddict:new(), PartVersions0, ordsets:new()},
619            couch_set_view_util:filter_seqs(ActiveParts, EndSeqs))
620    end,
621    case PassiveParts of
622    [] ->
623        FinalChangesCount = ActiveChangesCount,
624        MaxSeqs2 = MaxSeqs,
625        PartVersions2 = PartVersions,
626        Rollbacks2 = Rollbacks;
627    _ ->
628        ?LOG_INFO("Updater reading changes from passive partitions to "
629                  "update ~s set view group `~s` from set `~s`",
630                  [GroupType, DDocId, SetName]),
631        {FinalChangesCount, MaxSeqs2, PartVersions2, Rollbacks2} = lists:foldl(
632            FoldFun, {ActiveChangesCount, MaxSeqs, PartVersions, Rollbacks},
633            couch_set_view_util:filter_seqs(PassiveParts, EndSeqs))
634    end,
635    {FinalChangesCount3, MaxSeqs3, PartVersions3, Rollbacks3} =
636        load_changes_from_passive_parts_in_mailbox(DcpPid,
637            Group, FoldFun, FinalChangesCount, MaxSeqs2, PartVersions2, Rollbacks2),
638
639    case Rollbacks3 of
640    [] ->
641        ok;
642    _ ->
643        throw({rollback, Rollbacks3})
644    end,
645
646    couch_work_queue:close(MapQueue),
647    ?LOG_INFO("Updater for ~s set view group `~s`, set `~s` (~s), "
648              "read a total of ~p changes",
649              [GroupType, DDocId, SetName, Category, FinalChangesCount3]),
650    ?LOG_DEBUG("Updater for ~s set view group `~s`, set `~s`, max partition seqs found:~n~w",
651               [GroupType, DDocId, SetName, MaxSeqs3]),
652    {PartVersions3, MaxSeqs3}.
653
654
655load_changes_from_passive_parts_in_mailbox(DcpPid,
656        Group, FoldFun, ChangesCount, MaxSeqs0, PartVersions0, Rollbacks) ->
657    #set_view_group{
658        set_name = SetName,
659        name = DDocId,
660        type = GroupType
661    } = Group,
662    receive
663    {new_passive_partitions, Parts0} ->
664        Parts = get_more_passive_partitions(Parts0),
665        AddPartVersions = [{P, [{0, 0}]} || P <- Parts],
666        {ok, AddMaxSeqs} = couch_dcp_client:get_seqs(DcpPid, Parts),
667        PartVersions = lists:ukeymerge(1, AddPartVersions, PartVersions0),
668
669        MaxSeqs = lists:ukeymerge(1, AddMaxSeqs, MaxSeqs0),
670        ?LOG_INFO("Updater reading changes from new passive partitions ~w to "
671                  "update ~s set view group `~s` from set `~s`",
672                  [Parts, GroupType, DDocId, SetName]),
673        {ChangesCount2, MaxSeqs2, PartVersions2, Rollbacks2} = lists:foldl(
674            FoldFun, {ChangesCount, MaxSeqs, PartVersions, Rollbacks}, AddMaxSeqs),
675        load_changes_from_passive_parts_in_mailbox(DcpPid,
676            Group, FoldFun, ChangesCount2, MaxSeqs2, PartVersions2, Rollbacks2)
677    after 0 ->
678        {ChangesCount, MaxSeqs0, PartVersions0, Rollbacks}
679    end.
680
681
682get_more_passive_partitions(Parts) ->
683    receive
684    {new_passive_partitions, Parts2} ->
685        get_more_passive_partitions(Parts ++ Parts2)
686    after 0 ->
687        lists:sort(Parts)
688    end.
689
690
691notify_owner(Owner, Msg, UpdaterPid) ->
692    Owner ! {updater_info, UpdaterPid, Msg}.
693
694
695queue_doc(snapshot_marker, MapQueue, _Group, _MaxDocSize, _InitialBuild) ->
696    couch_work_queue:queue(MapQueue, snapshot_marker);
697queue_doc({part_versions, _} = PartVersions, MapQueue, _Group, _MaxDocSize,
698    _InitialBuild) ->
699    couch_work_queue:queue(MapQueue, PartVersions);
700queue_doc(Doc, MapQueue, Group, MaxDocSize, InitialBuild) ->
701    Doc2 = case Doc#dcp_doc.deleted of
702    true ->
703        case Group#set_view_group.index_xattr_on_deleted_docs of
704        true ->
705            case Doc#dcp_doc.body of
706            <<>> ->
707                Doc#dcp_doc{deleted = true};
708            _ ->
709                Doc#dcp_doc{deleted = false}
710            end;
711        false ->
712            Doc
713        end;
714    false ->
715        Doc
716    end,
717
718    case Doc2#dcp_doc.deleted of
719    true when InitialBuild ->
720        Entry = nil;
721    true ->
722        Entry = Doc2;
723    false ->
724        #set_view_group{
725           set_name = SetName,
726           name = DDocId,
727           type = GroupType
728        } = Group,
729        case couch_util:validate_utf8(Doc2#dcp_doc.id) of
730        true ->
731            case (MaxDocSize > 0) andalso
732                (iolist_size(Doc2#dcp_doc.body) > MaxDocSize) of
733            true ->
734                ?LOG_MAPREDUCE_ERROR("Bucket `~s`, ~s group `~s`, skipping "
735                    "document with ID `~s`: too large body (~p bytes)",
736                    [SetName, GroupType, DDocId,
737                     ?b2l(Doc2#dcp_doc.id), iolist_size(Doc2#dcp_doc.body)]),
738                Entry = Doc2#dcp_doc{deleted = true};
739            false ->
740                Entry = Doc2
741            end;
742        false ->
743            % If the id isn't utf8 (memcached allows it), then log an error
744            % message and skip the doc. Send it through the queue anyway
745            % so we record the high seq num in case there are a bunch of
746            % these at the end, we want to keep track of the high seq and
747            % not reprocess again.
748            ?LOG_MAPREDUCE_ERROR("Bucket `~s`, ~s group `~s`, skipping "
749                "document with non-utf8 id. Doc id bytes: ~w",
750                [SetName, GroupType, DDocId, ?b2l(Doc2#dcp_doc.id)]),
751            Entry = Doc2#dcp_doc{deleted = true}
752        end
753    end,
754    case Entry of
755    nil ->
756        ok;
757    _ ->
758        couch_work_queue:queue(MapQueue, Entry),
759        update_task(1)
760    end.
761
762-spec accumulate_xattr(binary(), binary(), non_neg_integer(), non_neg_integer()) ->
763    {binary(), binary()}.
764accumulate_xattr(Data, Acc, XATTRSize, AccSum) when AccSum =:= XATTRSize ->
765    {Data, <<Acc/binary, "}">>};
766accumulate_xattr(Body, Acc, XATTRSize, AccSum) ->
767    <<DataSize:32, Rest/binary>> = Body,
768    AccSum2 = AccSum + DataSize + 4,
769    <<Data0:DataSize/binary, Rest2/binary>> = Rest,
770    % Remove last zero value from  XATTR
771    Data = binary:part(Data0, 0, DataSize-1),
772    % Jsonify key and value
773    Data2 = case AccSum2 of
774    XATTRSize ->
775        <<"\"", Data/binary>>;
776    _ ->
777        <<"\"", Data/binary, ",">>
778    end,
779    % Replace zero byte after key with colon
780    Xattr = binary:replace(Data2, <<0>>, <<"\":">>),
781    accumulate_xattr(Rest2, <<Acc/binary, Xattr/binary>>, XATTRSize, AccSum2).
782
783do_maps(Group, MapQueue, WriteQueue) ->
784    #set_view_group{
785        set_name = SetName,
786        name = DDocId,
787        type = Type,
788        mod = Mod
789    } = Group,
790    case couch_work_queue:dequeue(MapQueue) of
791    closed ->
792        couch_work_queue:close(WriteQueue);
793    {ok, Queue, _QueueSize} ->
794        ViewCount = length(Group#set_view_group.views),
795        {Items, _} = lists:foldl(
796            fun(#dcp_doc{deleted = true} = DcpDoc, {Acc, Size}) ->
797                #dcp_doc{
798                    id = Id,
799                    partition = PartId,
800                    seq = Seq
801                } = DcpDoc,
802                Item = {Seq, Id, PartId, []},
803                {[Item | Acc], Size};
804            (#dcp_doc{deleted = false} = DcpDoc, {Acc0, Size0}) ->
805                % When there are a lot of emits per document the memory can
806                % grow almost indefinitely as the queue size is only limited
807                % by the number of documents and not their emits.
808                % In case the accumulator grows huge, queue the items early
809                % into the writer queue. Only take emits into account, the
810                % other cases in this `foldl` won't ever have an significant
811                % size.
812                {Acc, Size} = case Size0 > ?QUEUE_ACC_BATCH_SIZE of
813                true ->
814                    couch_work_queue:queue(WriteQueue, lists:reverse(Acc0)),
815                    {[], 0};
816                false ->
817                    {Acc0, Size0}
818                end,
819                #dcp_doc{
820                    id = Id,
821                    body = Body,
822                    partition = PartId,
823                    rev_seq = RevSeq,
824                    seq = Seq,
825                    cas = Cas,
826                    expiration = Expiration,
827                    flags = Flags,
828                    data_type = DcpDataType
829                } = DcpDoc,
830                {DataType, DocBody, XATTRs} = case DcpDataType of
831                ?DCP_DATA_TYPE_RAW ->
832                    {DocBody2, XATTRs2} = accumulate_xattr(Body, <<"\"xattrs\":{">>, 0, 0),
833                    {?CONTENT_META_NON_JSON_MODE, DocBody2, XATTRs2};
834                ?DCP_DATA_TYPE_JSON ->
835                    {DocBody3, XATTRs3} = accumulate_xattr(Body, <<"\"xattrs\":{">>, 0, 0),
836                    {?CONTENT_META_JSON, DocBody3, XATTRs3};
837                ?DCP_DATA_TYPE_BINARY_XATTR ->
838                    <<XATTRSize:32, Rest/binary>> = Body,
839                    {DocBody4, XATTRs4} = accumulate_xattr(Rest, <<"\"xattrs\":{">>, XATTRSize, 0),
840                    {?CONTENT_META_NON_JSON_MODE, DocBody4, XATTRs4};
841                ?DCP_DATA_TYPE_JSON_XATTR ->
842                    <<XATTRSize:32, Rest/binary>> = Body,
843                    {DocBody5, XATTRs5} = accumulate_xattr(Rest, <<"\"xattrs\":{">>, XATTRSize, 0),
844                    {?CONTENT_META_JSON, DocBody5, XATTRs5}
845                end,
846                Doc = #doc{
847                    id = Id,
848                    rev = {RevSeq, <<Cas:64, Expiration:32, Flags:32>>},
849                    body = DocBody,
850                    content_meta = DataType,
851                    deleted = false
852                },
853                try
854                    {ok, Result, LogList} = couch_set_view_mapreduce:map(
855                        Doc, XATTRs, PartId, Seq, Group),
856                    {Result2, _} = lists:foldr(
857                        fun({error, Reason}, {AccRes, Pos}) ->
858                            ErrorMsg = "Bucket `~s`, ~s group `~s`, error mapping"
859                                    " document `~s` for view `~s`: ~s",
860                            ViewName = Mod:view_name(Group, Pos),
861                            Args = [SetName, Type, DDocId, Id, ViewName,
862                                    couch_util:to_binary(Reason)],
863                            ?LOG_MAPREDUCE_ERROR(ErrorMsg, Args),
864                            {[[] | AccRes], Pos - 1};
865                        (KVs, {AccRes, Pos}) ->
866                            {[KVs | AccRes], Pos - 1}
867                        end,
868                        {[], ViewCount}, Result),
869                    lists:foreach(
870                        fun(Msg) ->
871                            DebugMsg = "Bucket `~s`, ~s group `~s`, map function"
872                                " log for document `~s`: ~s",
873                            Args = [SetName, Type, DDocId, Id, binary_to_list(Msg)],
874                            ?LOG_MAPREDUCE_ERROR(DebugMsg, Args)
875                        end, LogList),
876                    Item = {Seq, Id, PartId, Result2},
877                    {[Item | Acc], Size + erlang:external_size(Result2)}
878                catch _:{error, Reason} ->
879                    ErrorMsg = "Bucket `~s`, ~s group `~s`, error mapping document `~s`: ~s",
880                    Args = [SetName, Type, DDocId, Id, couch_util:to_binary(Reason)],
881                    ?LOG_MAPREDUCE_ERROR(ErrorMsg, Args),
882                    {[{Seq, Id, PartId, []} | Acc], Size}
883                end;
884            (snapshot_marker, {Acc, Size}) ->
885                {[snapshot_marker | Acc], Size};
886            ({part_versions, _} = PartVersions, {Acc, Size}) ->
887                {[PartVersions | Acc], Size}
888            end,
889            {[], 0}, Queue),
890        ok = couch_work_queue:queue(WriteQueue, lists:reverse(Items)),
891        do_maps(Group, MapQueue, WriteQueue)
892    end.
893
894
895do_writes(Acc) ->
896    #writer_acc{
897        kvs = Kvs,
898        kvs_size = KvsSize,
899        write_queue = WriteQueue,
900        throttle = Throttle
901    } = Acc,
902    ok = timer:sleep(Throttle),
903    case couch_work_queue:dequeue(WriteQueue) of
904    closed ->
905        flush_writes(Acc#writer_acc{final_batch = true});
906    {ok, Queue0, QueueSize} ->
907        Queue = lists:flatten(Queue0),
908        Kvs2 = Kvs ++ Queue,
909        KvsSize2 = KvsSize + QueueSize,
910
911        Acc2 = Acc#writer_acc{
912            kvs = Kvs2,
913            kvs_size = KvsSize2
914        },
915        case should_flush_writes(Acc2) of
916        true ->
917            Acc3 = flush_writes(Acc2),
918            Acc4 = Acc3#writer_acc{kvs = [], kvs_size = 0};
919        false ->
920            Acc4 = Acc2
921        end,
922        do_writes(Acc4)
923    end.
924
925should_flush_writes(Acc) ->
926    #writer_acc{
927        view_empty_kvs = ViewEmptyKvs,
928        kvs_size = KvsSize
929    } = Acc,
930    KvsSize >= (?MIN_BATCH_SIZE_PER_VIEW * length(ViewEmptyKvs)).
931
932
933flush_writes(#writer_acc{kvs = [], initial_build = false} = Acc) ->
934    Acc2 = maybe_update_btrees(Acc),
935    checkpoint(Acc2);
936
937flush_writes(#writer_acc{initial_build = false} = Acc0) ->
938    #writer_acc{
939        kvs = Kvs,
940        view_empty_kvs = ViewEmptyKVs,
941        group = Group,
942        parent = Parent,
943        owner = Owner,
944        final_batch = IsFinalBatch,
945        part_versions = PartVersions
946    } = Acc0,
947    Mod = Group#set_view_group.mod,
948    % Only incremental updates can contain multiple snapshots
949    {MultipleSnapshots, Kvs2} = merge_snapshots(Kvs),
950    Acc1 = case MultipleSnapshots of
951    true ->
952        Acc2 = maybe_update_btrees(Acc0#writer_acc{force_flush = true}),
953        checkpoint(Acc2);
954    false ->
955        Acc0
956    end,
957    #writer_acc{last_seqs = LastSeqs} = Acc1,
958    {ViewKVs, DocIdViewIdKeys, NewLastSeqs, NewPartVersions} =
959        process_map_results(Mod, Kvs2, ViewEmptyKVs, LastSeqs, PartVersions),
960    Acc3 = Acc1#writer_acc{last_seqs = NewLastSeqs, part_versions = NewPartVersions},
961    Acc4 = write_to_tmp_batch_files(ViewKVs, DocIdViewIdKeys, Acc3),
962    #writer_acc{group = NewGroup} = Acc4,
963    case ?set_seqs(NewGroup) =/= ?set_seqs(Group) of
964    true ->
965        Acc5 = checkpoint(Acc4),
966        case (Acc4#writer_acc.state =:= updating_active) andalso
967            lists:any(fun({PartId, _}) ->
968                ((1 bsl PartId) band ?set_pbitmask(Group) =/= 0)
969            end, NewLastSeqs) of
970        true ->
971            notify_owner(Owner, {state, updating_passive}, Parent),
972            Acc5#writer_acc{state = updating_passive};
973        false ->
974            Acc5
975        end;
976    false ->
977        case IsFinalBatch of
978        true ->
979            checkpoint(Acc4);
980        false ->
981            Acc4
982        end
983    end;
984
985flush_writes(#writer_acc{initial_build = true} = WriterAcc) ->
986    #writer_acc{
987        kvs = Kvs,
988        view_empty_kvs = ViewEmptyKVs,
989        tmp_files = TmpFiles,
990        tmp_dir = TmpDir,
991        group = Group,
992        final_batch = IsFinalBatch,
993        max_seqs = MaxSeqs,
994        part_versions = PartVersions,
995        stats = Stats
996    } = WriterAcc,
997    #set_view_group{
998        set_name = SetName,
999        type = Type,
1000        name = DDocId,
1001        mod = Mod
1002    } = Group,
1003    {ViewKVs, DocIdViewIdKeys, MaxSeqs2, PartVersions2} = process_map_results(
1004        Mod, Kvs, ViewEmptyKVs, MaxSeqs, PartVersions),
1005
1006    IdRecords = lists:foldr(
1007        fun({_DocId, {_PartId, []}}, Acc) ->
1008                Acc;
1009            (Kv, Acc) ->
1010                [{KeyBin, ValBin}] = Mod:convert_back_index_kvs_to_binary([Kv], []),
1011                KvBin = [<<(byte_size(KeyBin)):16>>, KeyBin, ValBin],
1012                [[<<(iolist_size(KvBin)):32/native>>, KvBin] | Acc]
1013        end,
1014        [], DocIdViewIdKeys),
1015    #set_view_tmp_file_info{fd = IdFd} = dict:fetch(ids_index, TmpFiles),
1016    ok = file:write(IdFd, IdRecords),
1017
1018    {InsertKVCount, TmpFiles2} = Mod:write_kvs(Group, TmpFiles, ViewKVs),
1019
1020    Stats2 = Stats#set_view_updater_stats{
1021        inserted_kvs = Stats#set_view_updater_stats.inserted_kvs + InsertKVCount,
1022        inserted_ids = Stats#set_view_updater_stats.inserted_ids + length(DocIdViewIdKeys)
1023    },
1024    case IsFinalBatch of
1025    false ->
1026        WriterAcc#writer_acc{
1027            max_seqs = MaxSeqs2,
1028            stats = Stats2,
1029            tmp_files = TmpFiles2
1030        };
1031    true ->
1032        ?LOG_INFO("Updater for set view `~s`, ~s group `~s`, starting btree "
1033                  "build phase" , [SetName, Type, DDocId]),
1034        {Group2, BuildFd} = Mod:finish_build(Group, TmpFiles2, TmpDir),
1035        WriterAcc#writer_acc{
1036            tmp_files = dict:store(build_file, BuildFd, TmpFiles2),
1037            max_seqs = MaxSeqs2,
1038            part_versions = PartVersions2,
1039            stats = Stats2,
1040            group = Group2
1041        }
1042    end.
1043
1044
1045process_map_results(Mod, Kvs, ViewEmptyKVs, PartSeqs, PartVersions) ->
1046    lists:foldl(
1047        fun({Seq, DocId, PartId, []}, {ViewKVsAcc, DocIdViewIdKeysAcc, PartIdSeqs, PartIdVersions}) ->
1048            PartIdSeqs2 = update_part_seq(Seq, PartId, PartIdSeqs),
1049            {ViewKVsAcc, [{DocId, {PartId, []}} | DocIdViewIdKeysAcc], PartIdSeqs2, PartIdVersions};
1050        ({Seq, DocId, PartId, QueryResults}, {ViewKVsAcc, DocIdViewIdKeysAcc, PartIdSeqs, PartIdVersions}) ->
1051            {NewViewKVs, NewViewIdKeys} = Mod:view_insert_doc_query_results(
1052                    DocId, PartId, QueryResults, ViewKVsAcc, [], []),
1053            PartIdSeqs2 = update_part_seq(Seq, PartId, PartIdSeqs),
1054            {NewViewKVs, [{DocId, {PartId, NewViewIdKeys}} | DocIdViewIdKeysAcc], PartIdSeqs2, PartIdVersions};
1055        (snapshot_marker, _Acc) ->
1056            throw({error,
1057                <<"The multiple snapshots should have been merged, "
1058                  "but theu weren't">>});
1059        ({part_versions, {PartId, NewVersions}}, {ViewKVsAcc, DocIdViewIdKeysAcc, PartIdSeqs, PartIdVersions}) ->
1060            PartIdVersions2 = update_part_versions(NewVersions, PartId, PartIdVersions),
1061            {ViewKVsAcc, DocIdViewIdKeysAcc, PartIdSeqs, PartIdVersions2}
1062        end,
1063        {ViewEmptyKVs, [], PartSeqs, PartVersions}, Kvs).
1064
1065
1066-spec update_transferred_replicas(#set_view_group{},
1067                                  partition_seqs(),
1068                                  partition_seqs()) -> #set_view_group{}.
1069update_transferred_replicas(Group, _MaxSeqs, _PartIdSeqs) when ?set_replicas_on_transfer(Group) =:= [] ->
1070    Group;
1071update_transferred_replicas(Group, MaxSeqs, PartIdSeqs) ->
1072    #set_view_group{index_header = Header} = Group,
1073    RepsTransferred = lists:foldl(
1074        fun({PartId, Seq}, A) ->
1075            case lists:member(PartId, ?set_replicas_on_transfer(Group))
1076                andalso (Seq >= couch_set_view_util:get_part_seq(PartId, MaxSeqs)) of
1077            true ->
1078                ordsets:add_element(PartId, A);
1079            false ->
1080                A
1081            end
1082        end,
1083        ordsets:new(), PartIdSeqs),
1084    ReplicasOnTransfer2 = ordsets:subtract(?set_replicas_on_transfer(Group), RepsTransferred),
1085    {Abitmask2, Pbitmask2} = lists:foldl(
1086        fun(Id, {A, P}) ->
1087            Mask = 1 bsl Id,
1088            Mask = ?set_pbitmask(Group) band Mask,
1089            0 = ?set_abitmask(Group) band Mask,
1090            {A bor Mask, P bxor Mask}
1091        end,
1092        {?set_abitmask(Group), ?set_pbitmask(Group)},
1093        RepsTransferred),
1094    Group#set_view_group{
1095        index_header = Header#set_view_index_header{
1096            abitmask = Abitmask2,
1097            pbitmask = Pbitmask2,
1098            replicas_on_transfer = ReplicasOnTransfer2
1099        }
1100    }.
1101
1102
1103-spec update_part_seq(update_seq(), partition_id(), partition_seqs()) -> partition_seqs().
1104update_part_seq(Seq, PartId, Acc) ->
1105    case couch_set_view_util:find_part_seq(PartId, Acc) of
1106    {ok, Max} when Max >= Seq ->
1107        Acc;
1108    _ ->
1109        orddict:store(PartId, Seq, Acc)
1110    end.
1111
1112
1113-spec update_part_versions(partition_version(), partition_id(), partition_versions()) ->
1114    partition_versions().
1115update_part_versions(NewVersions, PartId, PartVersions) ->
1116    orddict:store(PartId, NewVersions, PartVersions).
1117
1118
1119% Incremental updates.
1120write_to_tmp_batch_files(ViewKeyValuesToAdd, DocIdViewIdKeys, WriterAcc) ->
1121    #writer_acc{
1122        tmp_files = TmpFiles,
1123        group = #set_view_group{
1124            id_btree = IdBtree,
1125            mod = Mod
1126        }
1127    } = WriterAcc,
1128
1129    {AddDocIdViewIdKeys0, RemoveDocIds, LookupDocIds} = lists:foldr(
1130        fun({DocId, {PartId, [] = _ViewIdKeys}}, {A, B, C}) ->
1131                BackKey = make_back_index_key(DocId, PartId),
1132                case is_new_partition(PartId, WriterAcc) of
1133                true ->
1134                    {A, [BackKey | B], C};
1135                false ->
1136                    {A, [BackKey | B], [BackKey | C]}
1137                end;
1138            ({DocId, {PartId, _ViewIdKeys}} = KvPairs, {A, B, C}) ->
1139                BackKey = make_back_index_key(DocId, PartId),
1140                case is_new_partition(PartId, WriterAcc) of
1141                true ->
1142                    {[KvPairs | A], B, C};
1143                false ->
1144                    {[KvPairs | A], B, [BackKey | C]}
1145                end
1146        end,
1147        {[], [], []}, DocIdViewIdKeys),
1148
1149    AddDocIdViewIdKeys = Mod:convert_back_index_kvs_to_binary(
1150        AddDocIdViewIdKeys0, []),
1151
1152    IdsData1 = lists:map(
1153        fun(K) -> couch_set_view_updater_helper:encode_op(remove, K) end,
1154        RemoveDocIds),
1155
1156    IdsData2 = lists:foldl(
1157        fun({K, V}, Acc) ->
1158            Bin = couch_set_view_updater_helper:encode_op(insert, K, V),
1159            [Bin | Acc]
1160        end,
1161        IdsData1,
1162        AddDocIdViewIdKeys),
1163
1164    IdTmpFileInfo = dict:fetch(ids_index, TmpFiles),
1165    case IdTmpFileInfo of
1166    #set_view_tmp_file_info{fd = nil} ->
1167        0 = IdTmpFileInfo#set_view_tmp_file_info.size,
1168        IdTmpFilePath = new_sort_file_name(WriterAcc),
1169        {ok, IdTmpFileFd} = file2:open(IdTmpFilePath, [raw, append, binary]),
1170        IdTmpFileSize = 0;
1171    #set_view_tmp_file_info{
1172            fd = IdTmpFileFd, name = IdTmpFilePath, size = IdTmpFileSize} ->
1173        ok
1174    end,
1175
1176    ok = file:write(IdTmpFileFd, IdsData2),
1177
1178    IdTmpFileInfo2 = IdTmpFileInfo#set_view_tmp_file_info{
1179        fd = IdTmpFileFd,
1180        name = IdTmpFilePath,
1181        size = IdTmpFileSize + iolist_size(IdsData2)
1182    },
1183    TmpFiles2 = dict:store(ids_index, IdTmpFileInfo2, TmpFiles),
1184
1185    case LookupDocIds of
1186    [] ->
1187        LookupResults = [];
1188    _ ->
1189        {ok, LookupResults, IdBtree} =
1190            couch_btree:query_modify(IdBtree, LookupDocIds, [], [])
1191    end,
1192    KeysToRemoveByView = lists:foldl(
1193        fun(LookupResult, KeysToRemoveByViewAcc) ->
1194            case LookupResult of
1195            {ok, {<<_Part:16, DocId/binary>>, <<_Part:16, ViewIdKeys/binary>>}} ->
1196                lists:foldl(
1197                    fun({ViewId, Keys}, KeysToRemoveByViewAcc2) ->
1198                        RemoveKeysDict = lists:foldl(
1199                            fun(Key, RemoveKeysDictAcc) ->
1200                                EncodedKey = Mod:encode_key_docid(Key, DocId),
1201                                dict:store(EncodedKey, nil, RemoveKeysDictAcc)
1202                            end,
1203                        couch_util:dict_find(ViewId, KeysToRemoveByViewAcc2, dict:new()), Keys),
1204                        dict:store(ViewId, RemoveKeysDict, KeysToRemoveByViewAcc2)
1205                    end,
1206                    KeysToRemoveByViewAcc, couch_set_view_util:parse_view_id_keys(ViewIdKeys));
1207            {not_found, _} ->
1208                KeysToRemoveByViewAcc
1209            end
1210        end,
1211        dict:new(), LookupResults),
1212
1213    WriterAcc2 = update_tmp_files(
1214        Mod, WriterAcc#writer_acc{tmp_files = TmpFiles2}, ViewKeyValuesToAdd,
1215        KeysToRemoveByView),
1216    maybe_update_btrees(WriterAcc2).
1217
1218
1219% Update the temporary files with the key-values from the indexer. Return
1220% the updated writer accumulator.
1221-spec update_tmp_files(atom(), #writer_acc{},
1222                       [{#set_view{}, [set_view_key_value()]}],
1223                       dict:dict(non_neg_integer(), dict:dict(binary(), nil)))
1224                      -> #writer_acc{}.
1225update_tmp_files(Mod, WriterAcc, ViewKeyValues, KeysToRemoveByView) ->
1226    #writer_acc{
1227       group = Group,
1228       tmp_files = TmpFiles
1229    } = WriterAcc,
1230    TmpFiles2 = lists:foldl(
1231        fun({#set_view{id_num = ViewId}, AddKeyValues}, AccTmpFiles) ->
1232            AddKeyValuesBinaries = Mod:convert_primary_index_kvs_to_binary(
1233                AddKeyValues, Group, []),
1234            KeysToRemoveDict = couch_util:dict_find(
1235                ViewId, KeysToRemoveByView, dict:new()),
1236
1237            case Mod of
1238            % The b-tree replaces nodes with the same key, hence we don't
1239            % need to delete a node that gets updated anyway
1240            mapreduce_view ->
1241                {KeysToRemoveDict2, BatchData} = lists:foldl(
1242                    fun({K, V}, {KeysToRemoveAcc, BinOpAcc}) ->
1243                        Bin = couch_set_view_updater_helper:encode_op(
1244                            insert, K, V),
1245                        BinOpAcc2 = [Bin | BinOpAcc],
1246                        case dict:find(K, KeysToRemoveAcc) of
1247                        {ok, _} ->
1248                            {dict:erase(K, KeysToRemoveAcc), BinOpAcc2};
1249                        _ ->
1250                            {KeysToRemoveAcc, BinOpAcc2}
1251                        end
1252                    end,
1253                    {KeysToRemoveDict, []}, AddKeyValuesBinaries);
1254            % In r-trees there are multiple possible paths to a key. Hence it's
1255            % not easily possible to replace an existing node with a new one,
1256            % as the insertion could happen in a subtree that is different
1257            % from the subtree of the old value.
1258            spatial_view ->
1259                BatchData = [
1260                    couch_set_view_updater_helper:encode_op(insert, K, V) ||
1261                        {K, V} <- AddKeyValuesBinaries],
1262                KeysToRemoveDict2 = KeysToRemoveDict
1263            end,
1264
1265            BatchData2 = dict:fold(
1266                fun(K, _V, BatchAcc) ->
1267                    Bin = couch_set_view_updater_helper:encode_op(remove, K),
1268                    [Bin | BatchAcc]
1269                end,
1270                BatchData, KeysToRemoveDict2),
1271
1272            ViewTmpFileInfo = dict:fetch(ViewId, TmpFiles),
1273            case ViewTmpFileInfo of
1274            #set_view_tmp_file_info{fd = nil} ->
1275                0 = ViewTmpFileInfo#set_view_tmp_file_info.size,
1276                ViewTmpFilePath = new_sort_file_name(WriterAcc),
1277                {ok, ViewTmpFileFd} = file2:open(
1278                    ViewTmpFilePath, [raw, append, binary]),
1279                ViewTmpFileSize = 0;
1280            #set_view_tmp_file_info{fd = ViewTmpFileFd,
1281                                    size = ViewTmpFileSize,
1282                                    name = ViewTmpFilePath} ->
1283                ok
1284            end,
1285            ok = file:write(ViewTmpFileFd, BatchData2),
1286            ViewTmpFileInfo2 = ViewTmpFileInfo#set_view_tmp_file_info{
1287                fd = ViewTmpFileFd,
1288                name = ViewTmpFilePath,
1289                size = ViewTmpFileSize + iolist_size(BatchData2)
1290            },
1291            dict:store(ViewId, ViewTmpFileInfo2, AccTmpFiles)
1292        end,
1293    TmpFiles, ViewKeyValues),
1294    WriterAcc#writer_acc{
1295        tmp_files = TmpFiles2
1296    }.
1297
1298
1299is_new_partition(PartId, #writer_acc{initial_seqs = InitialSeqs}) ->
1300    couch_util:get_value(PartId, InitialSeqs, 0) == 0.
1301
1302
1303% For incremental index updates.
1304maybe_update_btrees(WriterAcc0) ->
1305    #writer_acc{
1306        view_empty_kvs = ViewEmptyKVs,
1307        tmp_files = TmpFiles,
1308        group = Group0,
1309        final_batch = IsFinalBatch,
1310        owner = Owner,
1311        last_seqs = LastSeqs,
1312        part_versions = PartVersions,
1313        force_flush = ForceFlush
1314    } = WriterAcc0,
1315    IdTmpFileInfo = dict:fetch(ids_index, TmpFiles),
1316    ShouldFlushViews =
1317        lists:any(
1318            fun({#set_view{id_num = Id}, _}) ->
1319                ViewTmpFileInfo = dict:fetch(Id, TmpFiles),
1320                ViewTmpFileInfo#set_view_tmp_file_info.size >= ?INC_MAX_TMP_FILE_SIZE
1321            end, ViewEmptyKVs),
1322    ShouldFlush = IsFinalBatch orelse
1323        ForceFlush orelse
1324        ((IdTmpFileInfo#set_view_tmp_file_info.size >= ?INC_MAX_TMP_FILE_SIZE) andalso
1325        ShouldFlushViews),
1326    case ShouldFlush of
1327    false ->
1328        NewLastSeqs1 = LastSeqs,
1329        case erlang:get(updater_worker) of
1330        undefined ->
1331            WriterAcc = WriterAcc0;
1332        UpdaterWorker when is_reference(UpdaterWorker) ->
1333            receive
1334            {UpdaterWorker, UpGroup, UpStats, CompactFiles} ->
1335                send_log_compact_files(Owner, CompactFiles, ?set_seqs(UpGroup),
1336                    ?set_partition_versions(UpGroup)),
1337                erlang:erase(updater_worker),
1338                WriterAcc = check_if_compactor_started(
1339                    WriterAcc0#writer_acc{group = UpGroup, stats = UpStats})
1340            after 0 ->
1341                WriterAcc = WriterAcc0
1342            end
1343        end;
1344    true ->
1345        ok = close_tmp_fd(IdTmpFileInfo),
1346        ok = lists:foreach(
1347            fun(#set_view{id_num = Id}) ->
1348                ViewTmpFileInfo = dict:fetch(Id, TmpFiles),
1349                ok = close_tmp_fd(ViewTmpFileInfo)
1350            end,
1351            Group0#set_view_group.views),
1352        case erlang:erase(updater_worker) of
1353        undefined ->
1354            WriterAcc1 = WriterAcc0;
1355        UpdaterWorker when is_reference(UpdaterWorker) ->
1356            receive
1357            {UpdaterWorker, UpGroup2, UpStats2, CompactFiles2} ->
1358                send_log_compact_files(Owner, CompactFiles2, ?set_seqs(UpGroup2),
1359                    ?set_partition_versions(UpGroup2)),
1360                WriterAcc1 = check_if_compactor_started(
1361                    WriterAcc0#writer_acc{
1362                        group = UpGroup2,
1363                        stats = UpStats2
1364                    })
1365            end
1366        end,
1367
1368
1369        % MB-11472: There is no id sortfile present and hence this is the final batch
1370        % and nothing left to be updated to the btree
1371        case IdTmpFileInfo#set_view_tmp_file_info.name of
1372        nil ->
1373            WriterAcc = WriterAcc1;
1374        _ ->
1375            WriterAcc2 = check_if_compactor_started(WriterAcc1),
1376            NewUpdaterWorker = spawn_updater_worker(WriterAcc2, LastSeqs, PartVersions),
1377            erlang:put(updater_worker, NewUpdaterWorker),
1378
1379            TmpFiles2 = dict:map(
1380                fun(_, _) -> #set_view_tmp_file_info{} end, TmpFiles),
1381            WriterAcc = WriterAcc2#writer_acc{tmp_files = TmpFiles2}
1382        end,
1383        NewLastSeqs1 = orddict:new()
1384    end,
1385    #writer_acc{
1386        stats = NewStats0,
1387        group = NewGroup0
1388    } = WriterAcc,
1389    case IsFinalBatch of
1390    true ->
1391        case erlang:erase(updater_worker) of
1392        undefined ->
1393            NewGroup = NewGroup0,
1394            NewStats = NewStats0;
1395        UpdaterWorker2 when is_reference(UpdaterWorker2) ->
1396            receive
1397            {UpdaterWorker2, NewGroup, NewStats, CompactFiles3} ->
1398                send_log_compact_files(Owner, CompactFiles3, ?set_seqs(NewGroup),
1399                    ?set_partition_versions(NewGroup))
1400            end
1401        end,
1402        NewLastSeqs = orddict:new();
1403    false ->
1404        NewGroup = NewGroup0,
1405        NewStats = NewStats0,
1406        NewLastSeqs = NewLastSeqs1
1407    end,
1408    NewWriterAcc = WriterAcc#writer_acc{
1409        stats = NewStats,
1410        group = NewGroup,
1411        last_seqs = NewLastSeqs
1412    },
1413    NewWriterAcc.
1414
1415
1416send_log_compact_files(_Owner, [], _Seqs, _PartVersions) ->
1417    ok;
1418send_log_compact_files(Owner, Files, Seqs, PartVersions) ->
1419    Init = case erlang:erase(new_compactor) of
1420    true ->
1421        true;
1422    undefined ->
1423        false
1424    end,
1425    ok = gen_server:cast(Owner, {compact_log_files, Files, Seqs, PartVersions, Init}).
1426
1427
1428-spec spawn_updater_worker(#writer_acc{}, partition_seqs(),
1429                           partition_versions()) -> reference().
1430spawn_updater_worker(WriterAcc, PartIdSeqs, PartVersions) ->
1431    Parent = self(),
1432    Ref = make_ref(),
1433    #writer_acc{
1434        group = Group,
1435        parent = UpdaterPid,
1436        max_seqs = MaxSeqs
1437    } = WriterAcc,
1438    % Wait for main updater process to ack
1439    UpdaterPid ! {native_updater_start, self()},
1440    receive
1441    {ok, native_updater_start} ->
1442        ok
1443    end,
1444    Pid = spawn_link(fun() ->
1445        case ?set_cbitmask(Group) of
1446        0 ->
1447            CleanupStart = 0;
1448        _ ->
1449            CleanupStart = os:timestamp()
1450        end,
1451        {ok, NewGroup0, CleanupCount, NewStats, NewCompactFiles} = update_btrees(WriterAcc),
1452        case ?set_cbitmask(Group) of
1453        0 ->
1454            CleanupTime = 0.0;
1455        _ ->
1456            CleanupTime = timer:now_diff(os:timestamp(), CleanupStart) / 1000000,
1457            #set_view_group{
1458                set_name = SetName,
1459                name = DDocId,
1460                type = GroupType
1461            } = Group,
1462            ?LOG_INFO("Updater for set view `~s`, ~s group `~s`, performed cleanup "
1463                      "of ~p key/value pairs in ~.3f seconds",
1464                      [SetName, GroupType, DDocId, CleanupCount, CleanupTime])
1465        end,
1466        NewSeqs = update_seqs(PartIdSeqs, ?set_seqs(Group)),
1467        NewPartVersions = update_versions(PartVersions, ?set_partition_versions(Group)),
1468        Header = NewGroup0#set_view_group.index_header,
1469        NewHeader = Header#set_view_index_header{
1470            seqs = NewSeqs,
1471            partition_versions = NewPartVersions
1472        },
1473        NewGroup = NewGroup0#set_view_group{
1474            index_header = NewHeader
1475        },
1476        NewGroup2 = update_transferred_replicas(NewGroup, MaxSeqs, PartIdSeqs),
1477        NumChanges = count_seqs_done(Group, NewSeqs),
1478        NewStats2 = NewStats#set_view_updater_stats{
1479           seqs = NewStats#set_view_updater_stats.seqs + NumChanges,
1480           cleanup_time = NewStats#set_view_updater_stats.seqs + CleanupTime,
1481           cleanup_kv_count = NewStats#set_view_updater_stats.cleanup_kv_count + CleanupCount
1482        },
1483        Parent ! {Ref, NewGroup2, NewStats2, NewCompactFiles}
1484    end),
1485    UpdaterPid ! {native_updater_pid, Pid},
1486    Ref.
1487
1488% Update id btree and view btrees with current batch of changes
1489update_btrees(WriterAcc) ->
1490    #writer_acc{
1491        stats = Stats,
1492        group = Group0,
1493        tmp_dir = TmpDir,
1494        tmp_files = TmpFiles,
1495        compactor_running = CompactorRunning,
1496        max_insert_batch_size = MaxBatchSize
1497    } = WriterAcc,
1498
1499    % Prepare list of operation logs for each btree
1500    #set_view_tmp_file_info{name = IdFile} = dict:fetch(ids_index, TmpFiles),
1501    ViewFiles = lists:map(
1502        fun(#set_view{id_num = Id}) ->
1503            #set_view_tmp_file_info{
1504                name = ViewFile
1505            } = dict:fetch(Id, TmpFiles),
1506            ViewFile
1507        end, Group0#set_view_group.views),
1508    % `LogFiles` is supplied to the native updater. For spatial views only the
1509    % ID b-tree is updated.
1510    LogFiles = case Group0#set_view_group.mod of
1511    mapreduce_view ->
1512        [IdFile | ViewFiles];
1513    spatial_view ->
1514        [IdFile]
1515    end,
1516
1517    % Remove spatial views from group
1518    % The native updater can currently handle mapreduce views only, but it
1519    % handles the ID b-tree of the spatial view
1520    Group = couch_set_view_util:remove_group_views(Group0, spatial_view),
1521
1522    {ok, NewGroup0, Stats2} = couch_set_view_updater_helper:update_btrees(
1523        Group, TmpDir, LogFiles, MaxBatchSize, false),
1524    {IdsInserted, IdsDeleted, KVsInserted, KVsDeleted, CleanupCount} = Stats2,
1525
1526    % Add back spatial views
1527    NewGroup = couch_set_view_util:update_group_views(
1528        NewGroup0, Group0, spatial_view),
1529
1530    % The native update for the Id b-tree was run, now it's time to run the
1531    % Erlang updater for the spatial views
1532    NewGroup2 = case NewGroup#set_view_group.mod of
1533    mapreduce_view ->
1534        NewGroup;
1535    spatial_view = Mod ->
1536        ok = couch_file:refresh_eof(NewGroup#set_view_group.fd),
1537        Views = Mod:update_spatial(NewGroup#set_view_group.views, ViewFiles,
1538            MaxBatchSize),
1539        NewGroup#set_view_group{
1540            views = Views,
1541            index_header = (NewGroup#set_view_group.index_header)#set_view_index_header{
1542                view_states = [Mod:get_state(V#set_view.indexer) || V <- Views]
1543            }
1544        }
1545    end,
1546
1547    NewStats = Stats#set_view_updater_stats{
1548     inserted_ids = Stats#set_view_updater_stats.inserted_ids + IdsInserted,
1549     deleted_ids = Stats#set_view_updater_stats.deleted_ids + IdsDeleted,
1550     inserted_kvs = Stats#set_view_updater_stats.inserted_kvs + KVsInserted,
1551     deleted_kvs = Stats#set_view_updater_stats.deleted_kvs + KVsDeleted
1552    },
1553
1554    % Remove files if compactor is not running
1555    % Otherwise send them to compactor to apply deltas
1556    CompactFiles = lists:foldr(
1557        fun(SortedFile, AccCompactFiles) ->
1558            case CompactorRunning of
1559            true ->
1560                case filename:extension(SortedFile) of
1561                ".compact" ->
1562                     [SortedFile | AccCompactFiles];
1563                _ ->
1564                    SortedFile2 = new_sort_file_name(TmpDir, true),
1565                    ok = file2:rename(SortedFile, SortedFile2),
1566                    [SortedFile2 | AccCompactFiles]
1567                end;
1568            false ->
1569                ok = file2:delete(SortedFile),
1570                AccCompactFiles
1571            end
1572        end, [], [IdFile | ViewFiles]),
1573    {ok, NewGroup2, CleanupCount, NewStats, CompactFiles}.
1574
1575
1576update_seqs(PartIdSeqs, Seqs) ->
1577    orddict:fold(
1578        fun(PartId, NewSeq, Acc) ->
1579            OldSeq = couch_util:get_value(PartId, Acc, 0),
1580            case NewSeq > OldSeq of
1581            true ->
1582                ok;
1583            false ->
1584                exit({error, <<"New seq smaller or equal than old seq.">>, PartId, OldSeq, NewSeq})
1585            end,
1586            orddict:store(PartId, NewSeq, Acc)
1587        end,
1588        Seqs, PartIdSeqs).
1589
1590update_versions(PartVersions, AllPartVersions) ->
1591    lists:ukeymerge(1, PartVersions, AllPartVersions).
1592
1593
1594update_task(NumChanges) ->
1595    [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
1596    Changes2 = Changes + NumChanges,
1597    Total2 = erlang:max(Total, Changes2),
1598    Progress = (Changes2 * 100) div Total2,
1599    couch_task_status:update([
1600        {progress, Progress},
1601        {changes_done, Changes2},
1602        {total_changes, Total2}
1603    ]).
1604
1605
1606checkpoint(#writer_acc{owner = Owner, parent = Parent, group = Group} = Acc) ->
1607    #set_view_group{
1608        set_name = SetName,
1609        name = DDocId,
1610        type = Type
1611    } = Group,
1612    ?LOG_INFO("Updater checkpointing set view `~s` update for ~s group `~s`",
1613              [SetName, Type, DDocId]),
1614    NewGroup = maybe_fix_group(Group),
1615    ok = couch_file:refresh_eof(NewGroup#set_view_group.fd),
1616    Owner ! {partial_update, Parent, self(), NewGroup},
1617    receive
1618    update_processed ->
1619        ok;
1620    stop ->
1621        exit(shutdown)
1622    end,
1623    Acc#writer_acc{group = NewGroup}.
1624
1625
1626maybe_fix_group(#set_view_group{index_header = Header} = Group) ->
1627    receive
1628    {new_passive_partitions, Parts} ->
1629        Bitmask = couch_set_view_util:build_bitmask(Parts),
1630        {Seqs, PartVersions} = couch_set_view_util:fix_partitions(Group, Parts),
1631        Group#set_view_group{
1632            index_header = Header#set_view_index_header{
1633                seqs = Seqs,
1634                pbitmask = ?set_pbitmask(Group) bor Bitmask,
1635                partition_versions = PartVersions
1636            }
1637        }
1638    after 0 ->
1639        Group
1640    end.
1641
1642
1643check_if_compactor_started(#writer_acc{group = Group0} = Acc) ->
1644    receive
1645    {compactor_started, Pid} ->
1646        erlang:put(new_compactor, true),
1647        Group = maybe_fix_group(Group0),
1648        Pid ! {compactor_started_ack, self(), Group},
1649        Acc#writer_acc{compactor_running = true, group = Group}
1650    after 0 ->
1651        Acc
1652    end.
1653
1654
1655init_tmp_files(WriterAcc) ->
1656    #writer_acc{
1657        group = Group, initial_build = Init, tmp_dir = TmpDir
1658    } = WriterAcc,
1659    case WriterAcc#writer_acc.compactor_running of
1660    true ->
1661        ok = couch_set_view_util:delete_sort_files(TmpDir, updater);
1662    false ->
1663        ok = couch_set_view_util:delete_sort_files(TmpDir, all)
1664    end,
1665    Ids = [ids_index | [V#set_view.id_num || V <- Group#set_view_group.views]],
1666    Files = case Init of
1667    true ->
1668        [begin
1669             FileName = new_sort_file_name(WriterAcc),
1670             {ok, Fd} = file2:open(FileName, [raw, append, binary]),
1671             {Id, #set_view_tmp_file_info{fd = Fd, name = FileName}}
1672         end || Id <- Ids];
1673    false ->
1674         [{Id, #set_view_tmp_file_info{}} || Id <- Ids]
1675    end,
1676    WriterAcc#writer_acc{tmp_files = dict:from_list(Files)}.
1677
1678
1679new_sort_file_name(#writer_acc{tmp_dir = TmpDir, compactor_running = Cr}) ->
1680    new_sort_file_name(TmpDir, Cr).
1681
1682new_sort_file_name(TmpDir, true) ->
1683    couch_set_view_util:new_sort_file_path(TmpDir, compactor);
1684new_sort_file_name(TmpDir, false) ->
1685    couch_set_view_util:new_sort_file_path(TmpDir, updater).
1686
1687
1688make_back_index_key(DocId, PartId) ->
1689    <<PartId:16, DocId/binary>>.
1690
1691
1692count_seqs_done(Group, NewSeqs) ->
1693    % NewSeqs might have new passive partitions that Group's seqs doesn't
1694    % have yet (will get them after a checkpoint period).
1695    lists:foldl(
1696        fun({PartId, SeqDone}, Acc) ->
1697            SeqBefore = couch_util:get_value(PartId, ?set_seqs(Group), 0),
1698            Acc + (SeqDone - SeqBefore)
1699        end,
1700        0, NewSeqs).
1701
1702
1703close_tmp_fd(#set_view_tmp_file_info{fd = nil}) ->
1704    ok;
1705close_tmp_fd(#set_view_tmp_file_info{fd = Fd}) ->
1706    ok = file:close(Fd).
1707
1708
1709% DCP introduces the concept of snapshots, where a document mutation is only
1710% guaranteed to be unique within a single snapshot. But the flusher expects
1711% unique mutations within the full batch. Merge multiple snapshots (if there
1712% are any) into a single one. The latest mutation wins.
1713merge_snapshots(KVs) ->
1714    merge_snapshots(KVs, false, []).
1715
1716merge_snapshots([], true, Acc) ->
1717    {true, Acc};
1718merge_snapshots([], false, Acc) ->
1719    % The order of the KVs doesn't really matter, but having them sorted the
1720    % same way will make life easier when debugging
1721    {false, lists:reverse(Acc)};
1722merge_snapshots([snapshot_marker | KVs], _MultipleSnapshots, Acc) ->
1723    merge_snapshots(KVs, true, Acc);
1724merge_snapshots([{part_versions, _} = PartVersions | KVs], MultipleSnapshots, Acc) ->
1725    merge_snapshots(KVs, MultipleSnapshots, [PartVersions | Acc]);
1726merge_snapshots([KV | KVs], true, Acc0) ->
1727    {_Seq, DocId, _PartId, _QueryResults} = KV,
1728    Acc = lists:keystore(DocId, 2, Acc0, KV),
1729    merge_snapshots(KVs, true, Acc);
1730merge_snapshots([KV | KVs], false, Acc) ->
1731    merge_snapshots(KVs, false, [KV | Acc]).
1732