1% -*- Mode: Erlang; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3% Licensed under the Apache License, Version 2.0 (the "License"); you may not
4% use this file except in compliance with the License. You may obtain a copy of
5% the License at
6%
7%   http://www.apache.org/licenses/LICENSE-2.0
8%
9% Unless required by applicable law or agreed to in writing, software
10% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12% License for the specific language governing permissions and limitations under
13% the License.
14
15-module(couch_set_view_updater).
16
17-export([update/6]).
18% Exported for the MapReduce specific stuff
19-export([new_sort_file_name/1]).
20
21% Imported for log cleanup
22-import(couch_set_view_util, [condense/1]).
23
24-include("couch_db.hrl").
25-include_lib("couch_set_view/include/couch_set_view.hrl").
26-include_lib("couch_dcp/include/couch_dcp.hrl").
27
28-define(MAP_QUEUE_SIZE, 256 * 1024).
29-define(WRITE_QUEUE_SIZE, 512 * 1024).
30% The size of the accumulator the emitted key-values are queued up in before
31% they get actually queued. The bigger the value, the less the lock contention,
32% but the higher the possible memory consumption is.
33-define(QUEUE_ACC_BATCH_SIZE, 256 * 1024).
34
35% incremental updates
36-define(INC_MAX_TMP_FILE_SIZE, 31457280).
37-define(MIN_BATCH_SIZE_PER_VIEW, 65536).
38
39% For file sorter and file merger commands.
40-define(PORT_OPTS,
41        [exit_status, use_stdio, stderr_to_stdout, {line, 4096}, binary]).
42
43-record(writer_acc, {
44    parent,
45    owner,
46    group,
47    last_seqs = orddict:new(),
48    part_versions = orddict:new(),
49    compactor_running,
50    write_queue,
51    initial_build,
52    view_empty_kvs,
53    kvs = [],
54    kvs_size = 0,
55    state = updating_active,
56    final_batch = false,
57    max_seqs,
58    stats = #set_view_updater_stats{},
59    tmp_dir = nil,
60    initial_seqs,
61    max_insert_batch_size,
62    tmp_files = dict:new(),
63    throttle = 0,
64    force_flush = false
65}).
66
67
68-spec update(pid(), #set_view_group{},
69             partition_seqs(), boolean(), string(), [term()]) -> no_return().
70update(Owner, Group, CurSeqs, CompactorRunning, TmpDir, Options) ->
71    #set_view_group{
72        set_name = SetName,
73        type = Type,
74        name = DDocId
75    } = Group,
76
77    ActiveParts = couch_set_view_util:decode_bitmask(?set_abitmask(Group)),
78    PassiveParts = couch_set_view_util:decode_bitmask(?set_pbitmask(Group)),
79    NumChanges = couch_set_view_util:missing_changes_count(CurSeqs, ?set_seqs(Group)),
80
81    process_flag(trap_exit, true),
82
83    BeforeEnterTs = os:timestamp(),
84    Parent = self(),
85    BarrierEntryPid = spawn_link(fun() ->
86        DDocIds = couch_set_view_util:get_ddoc_ids_with_sig(SetName, Group),
87        couch_task_status:add_task([
88            {type, blocked_indexer},
89            {set, SetName},
90            {signature, ?l2b(couch_util:to_hex(Group#set_view_group.sig))},
91            {design_documents, DDocIds},
92            {indexer_type, Type}
93        ]),
94        case Type of
95        main ->
96            ok = couch_index_barrier:enter(couch_main_index_barrier, Parent);
97        replica ->
98            ok = couch_index_barrier:enter(couch_replica_index_barrier, Parent)
99        end,
100        Parent ! {done, self(), (timer:now_diff(os:timestamp(), BeforeEnterTs) / 1000000)},
101        receive shutdown -> ok end
102    end),
103
104    BlockedTime = receive
105    {done, BarrierEntryPid, Duration} ->
106        Duration;
107    {'EXIT', _, Reason} ->
108        exit({updater_error, Reason})
109    end,
110
111    CleanupParts = couch_set_view_util:decode_bitmask(?set_cbitmask(Group)),
112    InitialBuild = couch_set_view_util:is_initial_build(Group),
113    ?LOG_INFO("Updater for set view `~s`, ~s group `~s` started~n"
114              "Active partitions:    ~s~n"
115              "Passive partitions:   ~s~n"
116              "Cleanup partitions:   ~s~n"
117              "Replicas to transfer: ~s~n"
118              "Pending transition:   ~n"
119              "    active:           ~s~n"
120              "    passive:          ~s~n"
121              "    unindexable:      ~s~n"
122              "Initial build:        ~s~n"
123              "Compactor running:    ~s~n"
124              "Min # changes:        ~p~n",
125              [?LOG_USERDATA(SetName), Type, ?LOG_USERDATA(DDocId),
126               condense(ActiveParts),
127               condense(PassiveParts),
128               condense(CleanupParts),
129               condense(?set_replicas_on_transfer(Group)),
130               condense(?pending_transition_active(?set_pending_transition(Group))),
131               condense(?pending_transition_passive(?set_pending_transition(Group))),
132               condense(?pending_transition_unindexable(?set_pending_transition(Group))),
133               InitialBuild,
134               CompactorRunning,
135               NumChanges
136              ]),
137    ?LOG_DEBUG("Updater set view `~s`, ~s group `~s` Partition versions ~w",
138                       [?LOG_USERDATA(SetName), Type, ?LOG_USERDATA(DDocId), ?set_partition_versions(Group)]),
139
140
141
142    WriterAcc0 = #writer_acc{
143        parent = self(),
144        owner = Owner,
145        group = Group,
146        initial_build = InitialBuild,
147        max_seqs = CurSeqs,
148        part_versions = ?set_partition_versions(Group),
149        tmp_dir = TmpDir,
150        max_insert_batch_size = list_to_integer(
151            couch_config:get("set_views", "indexer_max_insert_batch_size", "1048576"))
152    },
153    update(WriterAcc0, ActiveParts, PassiveParts,
154            BlockedTime, BarrierEntryPid, NumChanges, CompactorRunning, Options).
155
156
157update(WriterAcc, ActiveParts, PassiveParts, BlockedTime,
158       BarrierEntryPid, NumChanges, CompactorRunning, Options) ->
159    #writer_acc{
160        owner = Owner,
161        group = Group
162    } = WriterAcc,
163    #set_view_group{
164        set_name = SetName,
165        type = Type,
166        name = DDocId,
167        sig = GroupSig
168    } = Group,
169
170    StartTime = os:timestamp(),
171
172    MapQueueOptions = [{max_size, ?MAP_QUEUE_SIZE}, {max_items, infinity}],
173    WriteQueueOptions = [{max_size, ?WRITE_QUEUE_SIZE}, {max_items, infinity}],
174    {ok, MapQueue} = couch_work_queue:new(MapQueueOptions),
175    {ok, WriteQueue} = couch_work_queue:new(WriteQueueOptions),
176
177    Parent = self(),
178
179    Mapper = spawn_link(fun() ->
180        try
181            do_maps(Group, MapQueue, WriteQueue)
182        catch _:Error ->
183            Stacktrace = erlang:get_stacktrace(),
184            ?LOG_ERROR("Set view `~s`, ~s group `~s`, mapper error~n"
185                "error:      ~p~n"
186                "stacktrace: ~s~n",
187                [?LOG_USERDATA(SetName), Type, ?LOG_USERDATA(DDocId), Error, ?LOG_USERDATA(Stacktrace)]),
188            exit(Error)
189        end
190    end),
191
192    Writer = spawn_link(fun() ->
193        BarrierEntryPid ! shutdown,
194        ViewEmptyKVs = [{View, []} || View <- Group#set_view_group.views],
195        WriterAcc2 = init_tmp_files(WriterAcc#writer_acc{
196            parent = Parent,
197            group = Group,
198            write_queue = WriteQueue,
199            view_empty_kvs = ViewEmptyKVs,
200            compactor_running = CompactorRunning,
201            initial_seqs = ?set_seqs(Group),
202            throttle = case lists:member(throttle, Options) of
203                true ->
204                    list_to_integer(
205                        couch_config:get("set_views", "throttle_period", "100"));
206                false ->
207                    0
208                end
209        }),
210        ok = couch_set_view_util:open_raw_read_fd(Group),
211        try
212            WriterAcc3 = do_writes(WriterAcc2),
213            receive
214            {doc_loader_finished, {PartVersions0, RealMaxSeqs}} ->
215                WriterAccStats = WriterAcc3#writer_acc.stats,
216                WriterAccGroup = WriterAcc3#writer_acc.group,
217                WriterAccHeader = WriterAccGroup#set_view_group.index_header,
218                PartVersions = lists:ukeymerge(1, PartVersions0,
219                    WriterAccHeader#set_view_index_header.partition_versions),
220                case WriterAcc3#writer_acc.initial_build of
221                true ->
222                    % The doc loader might not load the mutations up to the
223                    % most recent one, but only to a lower one. Update the
224                    % group header and stats with the correct information.
225                    MaxSeqs = lists:ukeymerge(
226                        1, RealMaxSeqs, WriterAccHeader#set_view_index_header.seqs),
227                    Stats = WriterAccStats#set_view_updater_stats{
228                        seqs = lists:sum([S || {_, S} <- RealMaxSeqs])
229                    };
230                false ->
231                    MaxSeqs = WriterAccHeader#set_view_index_header.seqs,
232                    Stats = WriterAcc3#writer_acc.stats
233                end,
234                FinalWriterAcc = WriterAcc3#writer_acc{
235                    stats = Stats,
236                    group = WriterAccGroup#set_view_group{
237                        index_header = WriterAccHeader#set_view_index_header{
238                            partition_versions = PartVersions,
239                            seqs = MaxSeqs
240                        }
241                    }
242                }
243            end,
244            Parent ! {writer_finished, FinalWriterAcc}
245        catch _:Error ->
246            Stacktrace = erlang:get_stacktrace(),
247            ?LOG_ERROR("Set view `~s`, ~s group `~s`, writer error~n"
248                "error:      ~p~n"
249                "stacktrace: ~s~n",
250                [?LOG_USERDATA(SetName), Type, ?LOG_USERDATA(DDocId), ?LOG_USERDATA(Error), ?LOG_USERDATA(Stacktrace)]),
251            exit(Error)
252        after
253            ok = couch_set_view_util:close_raw_read_fd(Group)
254        end
255    end),
256
257    InitialBuild = WriterAcc#writer_acc.initial_build,
258    NumChanges2 = case InitialBuild of
259        true ->
260            couch_set_view_updater_helper:count_items_from_set(Group, ActiveParts ++ PassiveParts);
261        false ->
262            NumChanges
263    end,
264
265    DocLoader = spawn_link(fun() ->
266        DDocIds = couch_set_view_util:get_ddoc_ids_with_sig(SetName, Group),
267        couch_task_status:add_task([
268            {type, indexer},
269            {set, SetName},
270            {signature, ?l2b(couch_util:to_hex(GroupSig))},
271            {design_documents, DDocIds},
272            {indexer_type, Type},
273            {progress, 0},
274            {changes_done, 0},
275            {initial_build, InitialBuild},
276            {total_changes, NumChanges2}
277        ]),
278        couch_task_status:set_update_frequency(5000),
279        case lists:member(pause, Options) of
280        true ->
281            % For reliable unit testing, to verify that adding new partitions
282            % to the passive state doesn't restart the updater and the updater
283            % can be aware of it and index these new partitions in the same run.
284            receive continue -> ok end;
285        false ->
286            ok
287        end,
288        try
289            {PartVersions, MaxSeqs} = load_changes(
290                Owner, Parent, Group, MapQueue, ActiveParts, PassiveParts,
291                WriterAcc#writer_acc.max_seqs,
292                WriterAcc#writer_acc.initial_build),
293            Parent ! {doc_loader_finished, {PartVersions, MaxSeqs}}
294        catch
295        throw:purge ->
296            exit(purge);
297        throw:{rollback, RollbackSeqs} ->
298            exit({rollback, RollbackSeqs});
299        _:Error ->
300            Stacktrace = erlang:get_stacktrace(),
301            ?LOG_ERROR("Set view `~s`, ~s group `~s`, doc loader error~n"
302                "error:      ~p~n"
303                "stacktrace: ~s~n",
304                [?LOG_USERDATA(SetName), Type, ?LOG_USERDATA(DDocId), Error, ?LOG_USERDATA(Stacktrace)]),
305            exit(Error)
306        end,
307        % Since updater progress stats is added from docloader,
308        % this process has to stay till updater has completed.
309        receive
310        updater_finished ->
311            ok
312        end
313    end),
314
315    Result = wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, Group),
316    case Type of
317    main ->
318        ok = couch_index_barrier:leave(couch_main_index_barrier);
319    replica ->
320        ok = couch_index_barrier:leave(couch_replica_index_barrier)
321    end,
322    case Result of
323    {updater_finished, #set_view_updater_result{group = NewGroup}} ->
324        ?LOG_DEBUG("Updater for ~s set view group `~s`, set `~s`, writer finished:~n"
325                   "  start seqs: ~w~n"
326                   "  end seqs:   ~w~n",
327                   [Type, ?LOG_USERDATA(DDocId), ?LOG_USERDATA(SetName), ?set_seqs(Group), ?set_seqs(NewGroup)]);
328    _ ->
329        ok
330    end,
331    DocLoader ! updater_finished,
332    exit(Result).
333
334-spec get_active_dcp_streams(pid()) -> list().
335get_active_dcp_streams(DcpPid) ->
336    case couch_dcp_client:list_streams(DcpPid) of
337    {active_list_streams, ActiveStreams} ->
338        ActiveStreams;
339    {retry_list_streams, Timeout} ->
340        receive
341        after Timeout ->
342            get_active_dcp_streams(DcpPid)
343        end
344    end.
345
346-spec stop_dcp_streams(pid()) -> ok.
347stop_dcp_streams(DcpPid) ->
348    ActiveStreams = get_active_dcp_streams(DcpPid),
349    lists:foreach(fun(ActiveStream) ->
350        case couch_dcp_client:remove_stream(DcpPid, ActiveStream) of
351        ok ->
352            ok;
353        {error, vbucket_stream_not_found} ->
354            ok;
355        Error ->
356            ?LOG_ERROR("Unexpected error for closing stream of partition ~p",
357                [ActiveStream]),
358            throw(Error)
359        end
360    end, ActiveStreams),
361    ok.
362
363wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup) ->
364    #set_view_group{set_name = SetName, name = DDocId, type = Type} = OldGroup,
365    receive
366    {new_passive_partitions, _} = NewPassivePartitions ->
367        Writer ! NewPassivePartitions,
368        DocLoader ! NewPassivePartitions,
369        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
370    continue ->
371        % Used by unit tests.
372        DocLoader ! continue,
373        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
374    {doc_loader_finished, {PartVersions, MaxSeqs}} ->
375        Writer ! {doc_loader_finished, {PartVersions, MaxSeqs}},
376        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
377    {writer_finished, WriterAcc} ->
378        Stats0 = WriterAcc#writer_acc.stats,
379        Result = #set_view_updater_result{
380            group = WriterAcc#writer_acc.group,
381            state = WriterAcc#writer_acc.state,
382            stats = Stats0#set_view_updater_stats{
383                indexing_time = timer:now_diff(os:timestamp(), StartTime) / 1000000,
384                blocked_time = BlockedTime
385            },
386            tmp_file = case WriterAcc#writer_acc.initial_build of
387                true ->
388                    dict:fetch(build_file, WriterAcc#writer_acc.tmp_files);
389                false ->
390                    ""
391                end
392        },
393        {updater_finished, Result};
394    {compactor_started, Pid, Ref} ->
395        ?LOG_INFO("Set view `~s`, ~s group `~s`, updater received "
396                  "compactor ~p notification, ref ~p, writer ~p",
397                   [?LOG_USERDATA(SetName), Type, ?LOG_USERDATA(DDocId), Pid, Ref, Writer]),
398        Writer ! {compactor_started, self()},
399        erlang:put(compactor_pid, {Pid, Ref}),
400        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
401    {compactor_started_ack, Writer, GroupSnapshot} ->
402        ?LOG_INFO("Set view `~s`, ~s group `~s`, updater received compaction ack"
403                  " from writer ~p", [?LOG_USERDATA(SetName), Type, ?LOG_USERDATA(DDocId), Writer]),
404        case erlang:erase(compactor_pid) of
405        {Pid, Ref} ->
406            Pid ! {Ref, {ok, GroupSnapshot}};
407        undefined ->
408            ok
409        end,
410        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
411    {'EXIT', _, Reason} when Reason =/= normal ->
412        couch_util:shutdown_sync(DocLoader),
413        couch_util:shutdown_sync(Mapper),
414        couch_util:shutdown_sync(Writer),
415        stop_dcp_streams(OldGroup#set_view_group.dcp_pid),
416        {updater_error, Reason};
417    {native_updater_start, Writer} ->
418        % We need control over spawning native updater process
419        % This helps to terminate native os processes correctly
420        Writer ! {ok, native_updater_start},
421        receive
422        {native_updater_pid, NativeUpdater} ->
423            erlang:put(native_updater, NativeUpdater)
424        end,
425        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
426    stop ->
427        case erlang:erase(native_updater) of
428        undefined ->
429            couch_util:shutdown_sync(DocLoader),
430            couch_util:shutdown_sync(Mapper),
431            couch_util:shutdown_sync(Writer);
432        NativeUpdater ->
433            MRef = erlang:monitor(process, NativeUpdater),
434            NativeUpdater ! stop,
435            receive
436            {'DOWN', MRef, process, NativeUpdater, _} ->
437                couch_util:shutdown_sync(DocLoader),
438                couch_util:shutdown_sync(Mapper),
439                couch_util:shutdown_sync(Writer)
440            end
441        end,
442        stop_dcp_streams(OldGroup#set_view_group.dcp_pid),
443        exit({updater_error, shutdown});
444    {add_stream, DcpPid, PartId, PartUuid, StartSeq, EndSeq, Flags} ->
445        Result = couch_dcp_client:add_stream(DcpPid, PartId, PartUuid, StartSeq, EndSeq, Flags),
446        DocLoader ! {add_stream_result, Result},
447        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup)
448    end.
449
450
451dcp_marker_to_string(Type) ->
452    case Type band ?DCP_SNAPSHOT_TYPE_MASK of
453    ?DCP_SNAPSHOT_TYPE_DISK ->
454        "on-disk";
455    ?DCP_SNAPSHOT_TYPE_MEMORY ->
456        "in-memory";
457    _ ->
458        io_lib:format("unknown (~w)", [Type])
459    end.
460
461
462load_changes(Owner, Updater, Group, MapQueue, ActiveParts, PassiveParts,
463        EndSeqs, InitialBuild) ->
464    #set_view_group{
465        set_name = SetName,
466        name = DDocId,
467        type = GroupType,
468        index_header = #set_view_index_header{
469            seqs = SinceSeqs,
470            partition_versions = PartVersions0
471        },
472        dcp_pid = DcpPid,
473        category = Category
474    } = Group,
475
476    MaxDocSize = list_to_integer(
477        couch_config:get("set_views", "indexer_max_doc_size", "0")),
478    AddStreamFun = fun(Pid, PartId, PartUuid, StartSeq, EndSeq, Flags) ->
479        Updater ! {add_stream, Pid, PartId, PartUuid, StartSeq, EndSeq, Flags},
480        receive {add_stream_result, Result} -> Result end,
481        Result
482    end,
483    FoldFun = fun({PartId, EndSeq}, {AccCount, AccSeqs, AccVersions, AccRollbacks}) ->
484        case couch_set_view_util:has_part_seq(PartId, ?set_unindexable_seqs(Group))
485            andalso not lists:member(PartId, ?set_replicas_on_transfer(Group)) of
486        true ->
487            {AccCount, AccSeqs, AccVersions, AccRollbacks};
488        false ->
489            Since = couch_util:get_value(PartId, SinceSeqs, 0),
490            PartVersions = couch_util:get_value(PartId, AccVersions),
491            Flags = case InitialBuild of
492            true ->
493                ?DCP_FLAG_DISKONLY;
494            false ->
495                ?DCP_FLAG_NOFLAG
496            end,
497            % For stream request from 0, If a vbucket got reset in the window
498            % of time between seqno was obtained from stats and stream request
499            % was made, the end_seqno may be higher than current vb high seqno.
500            % Use a special flag to tell server to set end_seqno.
501            Flags2 = case PartVersions of
502            [{0, 0}] ->
503                Flags bor ?DCP_FLAG_USELATEST_ENDSEQNO;
504            _ ->
505                Flags
506            end,
507
508            case AccRollbacks of
509            [] ->
510                case EndSeq =:= Since of
511                true ->
512                    {AccCount, AccSeqs, AccVersions, AccRollbacks};
513                false ->
514                    ChangesWrapper = fun
515                        ({part_versions, _} = NewVersions, Acc) ->
516                            queue_doc(
517                                NewVersions, MapQueue, Group,
518                                MaxDocSize, InitialBuild),
519                            Acc;
520                        ({snapshot_marker, {MarkerStartSeq, MarkerEndSeq, MarkerType}}, {Count, _})
521                            when MarkerType band ?DCP_SNAPSHOT_TYPE_MASK =/= 0 ->
522                            ?LOG_DEBUG(
523                                "set view `~s`, ~s (~s) group `~s`: received "
524                                "a snapshot marker (~s) for partition ~p from "
525                                "sequence ~p to ~p",
526                                [?LOG_USERDATA(SetName), GroupType, Category, ?LOG_USERDATA(DDocId),
527                                    dcp_marker_to_string(MarkerType),
528                                    PartId, MarkerStartSeq, MarkerEndSeq]),
529                            case Count of
530                            % Ignore the snapshot marker that is at the
531                            % beginning of the stream.
532                            % If it wasn't ignored, it would lead to an
533                            % additional forced flush which isn't needed. A
534                            % flush is needed if there are several mutations
535                            % of the same document within one batch. As two
536                            % different partitions can't contain the same
537                            % document ID, we are safe to not force flushing
538                            % between two partitions.
539                            0 ->
540                                {Count, MarkerEndSeq};
541                            _ ->
542                                queue_doc(
543                                    snapshot_marker, MapQueue, Group,
544                                    MaxDocSize, InitialBuild),
545                                {Count, MarkerEndSeq}
546                            end;
547                        ({snapshot_marker, {MarkerStartSeq, MarkerEndSeq, MarkerType}}, Acc) ->
548                            ?LOG_ERROR(
549                                "set view `~s`, ~s (~s) group `~s`: received "
550                                "a snapshot marker (~s) for partition ~p from "
551                                "sequence ~p to ~p",
552                                [?LOG_USERDATA(SetName), GroupType, Category, ?LOG_USERDATA(DDocId),
553                                    dcp_marker_to_string(MarkerType),
554                                    PartId, MarkerStartSeq, MarkerEndSeq]),
555                            throw({error, unknown_snapshot_marker, MarkerType}),
556                            Acc;
557                        (#dcp_doc{} = Item, {Count, AccEndSeq}) ->
558                            queue_doc(
559                                Item, MapQueue, Group,
560                                MaxDocSize, InitialBuild),
561                            {Count + 1, AccEndSeq}
562                        end,
563                    Result = couch_dcp_client:enum_docs_since(
564                        DcpPid, PartId, PartVersions, Since, EndSeq, Flags2,
565                        ChangesWrapper, {0, 0}, AddStreamFun),
566                    case Result of
567                    {ok, {AccCount2, AccEndSeq}, NewPartVersions} ->
568                        AccSeqs2 = orddict:store(PartId, AccEndSeq, AccSeqs),
569                        AccVersions2 = lists:ukeymerge(
570                            1, [{PartId, NewPartVersions}], AccVersions),
571                        AccRollbacks2 = AccRollbacks;
572                    {rollback, RollbackSeq} ->
573                        AccCount2 = AccCount,
574                        AccSeqs2 = AccSeqs,
575                        AccVersions2 = AccVersions,
576                        AccRollbacks2 = ordsets:add_element(
577                            {PartId, RollbackSeq}, AccRollbacks);
578                    Error ->
579                        AccCount2 = AccCount,
580                        AccSeqs2 = AccSeqs,
581                        AccVersions2 = AccVersions,
582                        AccRollbacks2 = AccRollbacks,
583                        ?LOG_ERROR("set view `~s`, ~s (~s) group `~s` error"
584                            " while loading changes for partition ~p:~n~p~n",
585                            [?LOG_USERDATA(SetName), GroupType, Category, ?LOG_USERDATA(DDocId), PartId,
586                                Error]),
587                        throw(Error)
588                    end,
589                    {AccCount2, AccSeqs2, AccVersions2, AccRollbacks2}
590                end;
591            _ ->
592                % If there is a rollback needed, don't store any new documents
593                % in the index, but just check for a rollback of another
594                % partition (i.e. a request with start seq == end seq)
595                ChangesWrapper = fun(_, _) -> ok end,
596                Result = couch_dcp_client:enum_docs_since(
597                    DcpPid, PartId, PartVersions, Since, Since, Flags2,
598                    ChangesWrapper, ok, AddStreamFun),
599                case Result of
600                {ok, _, _} ->
601                    AccRollbacks2 = AccRollbacks;
602                {rollback, RollbackSeq} ->
603                    AccRollbacks2 = ordsets:add_element(
604                        {PartId, RollbackSeq}, AccRollbacks)
605                end,
606                {AccCount, AccSeqs, AccVersions, AccRollbacks2}
607            end
608        end
609    end,
610
611    notify_owner(Owner, {state, updating_active}, Updater),
612    case ActiveParts of
613    [] ->
614        ActiveChangesCount = 0,
615        MaxSeqs = orddict:new(),
616        PartVersions = PartVersions0,
617        Rollbacks = [];
618    _ ->
619        ?LOG_INFO("Updater reading changes from active partitions to "
620                  "update ~s set view group `~s` from set `~s`",
621                  [GroupType, ?LOG_USERDATA(DDocId), ?LOG_USERDATA(SetName)]),
622        {ActiveChangesCount, MaxSeqs, PartVersions, Rollbacks} = lists:foldl(
623            FoldFun, {0, orddict:new(), PartVersions0, ordsets:new()},
624            couch_set_view_util:filter_seqs(ActiveParts, EndSeqs))
625    end,
626    case PassiveParts of
627    [] ->
628        FinalChangesCount = ActiveChangesCount,
629        MaxSeqs2 = MaxSeqs,
630        PartVersions2 = PartVersions,
631        Rollbacks2 = Rollbacks;
632    _ ->
633        ?LOG_INFO("Updater reading changes from passive partitions to "
634                  "update ~s set view group `~s` from set `~s`",
635                  [GroupType, ?LOG_USERDATA(DDocId), ?LOG_USERDATA(SetName)]),
636        {FinalChangesCount, MaxSeqs2, PartVersions2, Rollbacks2} = lists:foldl(
637            FoldFun, {ActiveChangesCount, MaxSeqs, PartVersions, Rollbacks},
638            couch_set_view_util:filter_seqs(PassiveParts, EndSeqs))
639    end,
640    {FinalChangesCount3, MaxSeqs3, PartVersions3, Rollbacks3} =
641        load_changes_from_passive_parts_in_mailbox(DcpPid,
642            Group, FoldFun, FinalChangesCount, MaxSeqs2, PartVersions2, Rollbacks2),
643
644    case Rollbacks3 of
645    [] ->
646        ok;
647    _ ->
648        throw({rollback, Rollbacks3})
649    end,
650
651    couch_work_queue:close(MapQueue),
652    ?LOG_INFO("Updater for ~s set view group `~s`, set `~s` (~s), "
653              "read a total of ~p changes",
654              [GroupType, ?LOG_USERDATA(DDocId), ?LOG_USERDATA(SetName), Category, FinalChangesCount3]),
655    ?LOG_DEBUG("Updater for ~s set view group `~s`, set `~s`, max partition seqs found:~n~w",
656               [GroupType, ?LOG_USERDATA(DDocId), ?LOG_USERDATA(SetName), MaxSeqs3]),
657    {PartVersions3, MaxSeqs3}.
658
659
660load_changes_from_passive_parts_in_mailbox(DcpPid,
661        Group, FoldFun, ChangesCount, MaxSeqs0, PartVersions0, Rollbacks) ->
662    #set_view_group{
663        set_name = SetName,
664        name = DDocId,
665        type = GroupType
666    } = Group,
667    receive
668    {new_passive_partitions, Parts0} ->
669        Parts = get_more_passive_partitions(Parts0),
670        AddPartVersions = [{P, [{0, 0}]} || P <- Parts],
671        {ok, AddMaxSeqs} = couch_dcp_client:get_seqs(DcpPid, Parts),
672        PartVersions = lists:ukeymerge(1, AddPartVersions, PartVersions0),
673
674        MaxSeqs = lists:ukeymerge(1, AddMaxSeqs, MaxSeqs0),
675        ?LOG_INFO("Updater reading changes from new passive partitions ~s to "
676                  "update ~s set view group `~s` from set `~s`",
677                  [condense(Parts), GroupType, ?LOG_USERDATA(DDocId), ?LOG_USERDATA(SetName)]),
678        {ChangesCount2, MaxSeqs2, PartVersions2, Rollbacks2} = lists:foldl(
679            FoldFun, {ChangesCount, MaxSeqs, PartVersions, Rollbacks}, AddMaxSeqs),
680        load_changes_from_passive_parts_in_mailbox(DcpPid,
681            Group, FoldFun, ChangesCount2, MaxSeqs2, PartVersions2, Rollbacks2)
682    after 0 ->
683        {ChangesCount, MaxSeqs0, PartVersions0, Rollbacks}
684    end.
685
686
687get_more_passive_partitions(Parts) ->
688    receive
689    {new_passive_partitions, Parts2} ->
690        get_more_passive_partitions(Parts ++ Parts2)
691    after 0 ->
692        lists:sort(Parts)
693    end.
694
695
696notify_owner(Owner, Msg, UpdaterPid) ->
697    Owner ! {updater_info, UpdaterPid, Msg}.
698
699
700queue_doc(snapshot_marker, MapQueue, _Group, _MaxDocSize, _InitialBuild) ->
701    couch_work_queue:queue(MapQueue, snapshot_marker);
702queue_doc({part_versions, _} = PartVersions, MapQueue, _Group, _MaxDocSize,
703    _InitialBuild) ->
704    couch_work_queue:queue(MapQueue, PartVersions);
705queue_doc(Doc, MapQueue, Group, MaxDocSize, InitialBuild) ->
706    Doc2 = case Doc#dcp_doc.deleted of
707    true ->
708        case Group#set_view_group.index_xattr_on_deleted_docs of
709        true ->
710            case Doc#dcp_doc.body of
711            <<>> ->
712                Doc#dcp_doc{deleted = true};
713            _ ->
714                Doc#dcp_doc{deleted = false}
715            end;
716        false ->
717            Doc
718        end;
719    false ->
720        Doc
721    end,
722
723    case Doc2#dcp_doc.deleted of
724    true when InitialBuild ->
725        Entry = nil;
726    true ->
727        Entry = Doc2;
728    false ->
729        #set_view_group{
730           set_name = SetName,
731           name = DDocId,
732           type = GroupType
733        } = Group,
734        case couch_util:validate_utf8(Doc2#dcp_doc.id) of
735        true ->
736            case (MaxDocSize > 0) andalso
737                (iolist_size(Doc2#dcp_doc.body) > MaxDocSize) of
738            true ->
739                ?LOG_MAPREDUCE_ERROR("Bucket `~s`, ~s group `~s`, skipping "
740                    "document with ID `~s`: too large body (~p bytes)",
741                    [?LOG_USERDATA(SetName), GroupType, ?LOG_USERDATA(DDocId),
742                     ?LOG_USERDATA(?b2l(Doc2#dcp_doc.id)), iolist_size(Doc2#dcp_doc.body)]),
743                Entry = Doc2#dcp_doc{deleted = true};
744            false ->
745                Entry = Doc2
746            end;
747        false ->
748            % If the id isn't utf8 (memcached allows it), then log an error
749            % message and skip the doc. Send it through the queue anyway
750            % so we record the high seq num in case there are a bunch of
751            % these at the end, we want to keep track of the high seq and
752            % not reprocess again.
753            ?LOG_MAPREDUCE_ERROR("Bucket `~s`, ~s group `~s`, skipping "
754                "document with non-utf8 id. Doc id bytes: ~w",
755                [?LOG_USERDATA(SetName), GroupType, ?LOG_USERDATA(DDocId), ?LOG_USERDATA(?b2l(Doc2#dcp_doc.id))]),
756            Entry = Doc2#dcp_doc{deleted = true}
757        end
758    end,
759    case Entry of
760    nil ->
761        ok;
762    _ ->
763        couch_work_queue:queue(MapQueue, Entry),
764        update_task(1)
765    end.
766
767-spec accumulate_xattr(binary(), binary(), non_neg_integer(), non_neg_integer()) ->
768    {binary(), binary()}.
769accumulate_xattr(Data, Acc, XATTRSize, AccSum) when AccSum =:= XATTRSize ->
770    {Data, <<Acc/binary, "}">>};
771accumulate_xattr(Body, Acc, XATTRSize, AccSum) ->
772    <<DataSize:32, Rest/binary>> = Body,
773    AccSum2 = AccSum + DataSize + 4,
774    <<Data0:DataSize/binary, Rest2/binary>> = Rest,
775    % Remove last zero value from  XATTR
776    Data = binary:part(Data0, 0, DataSize-1),
777    % Jsonify key and value
778    Data2 = case AccSum2 of
779    XATTRSize ->
780        <<"\"", Data/binary>>;
781    _ ->
782        <<"\"", Data/binary, ",">>
783    end,
784    % Replace zero byte after key with colon
785    Xattr = binary:replace(Data2, <<0>>, <<"\":">>),
786    accumulate_xattr(Rest2, <<Acc/binary, Xattr/binary>>, XATTRSize, AccSum2).
787
788do_maps(Group, MapQueue, WriteQueue) ->
789    #set_view_group{
790        set_name = SetName,
791        name = DDocId,
792        type = Type,
793        mod = Mod
794    } = Group,
795    case couch_work_queue:dequeue(MapQueue) of
796    closed ->
797        couch_work_queue:close(WriteQueue);
798    {ok, Queue, _QueueSize} ->
799        ViewCount = length(Group#set_view_group.views),
800        {Items, _} = lists:foldl(
801            fun(#dcp_doc{deleted = true} = DcpDoc, {Acc, Size}) ->
802                #dcp_doc{
803                    id = Id,
804                    partition = PartId,
805                    seq = Seq
806                } = DcpDoc,
807                Item = {Seq, Id, PartId, []},
808                {[Item | Acc], Size};
809            (#dcp_doc{deleted = false} = DcpDoc, {Acc0, Size0}) ->
810                % When there are a lot of emits per document the memory can
811                % grow almost indefinitely as the queue size is only limited
812                % by the number of documents and not their emits.
813                % In case the accumulator grows huge, queue the items early
814                % into the writer queue. Only take emits into account, the
815                % other cases in this `foldl` won't ever have an significant
816                % size.
817                {Acc, Size} = case Size0 > ?QUEUE_ACC_BATCH_SIZE of
818                true ->
819                    couch_work_queue:queue(WriteQueue, lists:reverse(Acc0)),
820                    {[], 0};
821                false ->
822                    {Acc0, Size0}
823                end,
824                #dcp_doc{
825                    id = Id,
826                    body = Body,
827                    partition = PartId,
828                    rev_seq = RevSeq,
829                    seq = Seq,
830                    cas = Cas,
831                    expiration = Expiration,
832                    flags = Flags,
833                    data_type = DcpDataType
834                } = DcpDoc,
835                {DataType, DocBody, XATTRs} = case DcpDataType of
836                ?DCP_DATA_TYPE_RAW ->
837                    {DocBody2, XATTRs2} = accumulate_xattr(Body, <<"\"xattrs\":{">>, 0, 0),
838                    {?CONTENT_META_NON_JSON_MODE, DocBody2, XATTRs2};
839                ?DCP_DATA_TYPE_JSON ->
840                    {DocBody3, XATTRs3} = accumulate_xattr(Body, <<"\"xattrs\":{">>, 0, 0),
841                    {?CONTENT_META_JSON, DocBody3, XATTRs3};
842                ?DCP_DATA_TYPE_BINARY_XATTR ->
843                    <<XATTRSize:32, Rest/binary>> = Body,
844                    {DocBody4, XATTRs4} = accumulate_xattr(Rest, <<"\"xattrs\":{">>, XATTRSize, 0),
845                    {?CONTENT_META_NON_JSON_MODE, DocBody4, XATTRs4};
846                ?DCP_DATA_TYPE_JSON_XATTR ->
847                    <<XATTRSize:32, Rest/binary>> = Body,
848                    {DocBody5, XATTRs5} = accumulate_xattr(Rest, <<"\"xattrs\":{">>, XATTRSize, 0),
849                    {?CONTENT_META_JSON, DocBody5, XATTRs5}
850                end,
851                Doc = #doc{
852                    id = Id,
853                    rev = {RevSeq, <<Cas:64, Expiration:32, Flags:32>>},
854                    body = DocBody,
855                    content_meta = DataType,
856                    deleted = false
857                },
858                try
859                    {ok, Result, LogList} = couch_set_view_mapreduce:map(
860                        Doc, XATTRs, PartId, Seq, Group),
861                    {Result2, _} = lists:foldr(
862                        fun({error, Reason}, {AccRes, Pos}) ->
863                            ErrorMsg = "Bucket `~s`, ~s group `~s`, error mapping"
864                                    " document `~s` for view `~s`: ~s",
865                            ViewName = Mod:view_name(Group, Pos),
866                            Args = [?LOG_USERDATA(SetName), Type, ?LOG_USERDATA(DDocId),
867                                    ?LOG_USERDATA(Id), ?LOG_USERDATA(ViewName),
868                                    ?LOG_USERDATA(couch_util:to_binary(Reason))],
869                            ?LOG_MAPREDUCE_ERROR(ErrorMsg, Args),
870                            {[[] | AccRes], Pos - 1};
871                        (KVs, {AccRes, Pos}) ->
872                            {[KVs | AccRes], Pos - 1}
873                        end,
874                        {[], ViewCount}, Result),
875                    lists:foreach(
876                        fun(Msg) ->
877                            DebugMsg = "Bucket `~s`, ~s group `~s`, map function"
878                                " log for document `~s`: ~s",
879                            Args = [?LOG_USERDATA(SetName), Type, ?LOG_USERDATA(DDocId),
880                                    ?LOG_USERDATA(Id), binary_to_list(Msg)],
881                            ?LOG_MAPREDUCE_ERROR(DebugMsg, Args)
882                        end, LogList),
883                    Item = {Seq, Id, PartId, Result2},
884                    {[Item | Acc], Size + erlang:external_size(Result2)}
885                catch _:{error, Reason} ->
886                    ErrorMsg = "Bucket `~s`, ~s group `~s`, error mapping document `~s`: ~s",
887                    Args = [?LOG_USERDATA(SetName), Type, ?LOG_USERDATA(DDocId),
888                            ?LOG_USERDATA(Id), ?LOG_USERDATA(couch_util:to_binary(Reason))],
889                    ?LOG_MAPREDUCE_ERROR(ErrorMsg, Args),
890                    {[{Seq, Id, PartId, []} | Acc], Size}
891                end;
892            (snapshot_marker, {Acc, Size}) ->
893                {[snapshot_marker | Acc], Size};
894            ({part_versions, _} = PartVersions, {Acc, Size}) ->
895                {[PartVersions | Acc], Size}
896            end,
897            {[], 0}, Queue),
898        ok = couch_work_queue:queue(WriteQueue, lists:reverse(Items)),
899        do_maps(Group, MapQueue, WriteQueue)
900    end.
901
902
903do_writes(Acc) ->
904    #writer_acc{
905        kvs = Kvs,
906        kvs_size = KvsSize,
907        write_queue = WriteQueue,
908        throttle = Throttle
909    } = Acc,
910    ok = timer:sleep(Throttle),
911    case couch_work_queue:dequeue(WriteQueue) of
912    closed ->
913        flush_writes(Acc#writer_acc{final_batch = true});
914    {ok, Queue0, QueueSize} ->
915        Queue = lists:flatten(Queue0),
916        Kvs2 = Kvs ++ Queue,
917        KvsSize2 = KvsSize + QueueSize,
918
919        Acc2 = Acc#writer_acc{
920            kvs = Kvs2,
921            kvs_size = KvsSize2
922        },
923        case should_flush_writes(Acc2) of
924        true ->
925            Acc3 = flush_writes(Acc2),
926            Acc4 = Acc3#writer_acc{kvs = [], kvs_size = 0};
927        false ->
928            Acc4 = Acc2
929        end,
930        do_writes(Acc4)
931    end.
932
933should_flush_writes(Acc) ->
934    #writer_acc{
935        view_empty_kvs = ViewEmptyKvs,
936        kvs_size = KvsSize
937    } = Acc,
938    KvsSize >= (?MIN_BATCH_SIZE_PER_VIEW * length(ViewEmptyKvs)).
939
940
941flush_writes(#writer_acc{kvs = [], initial_build = false} = Acc) ->
942    Acc2 = maybe_update_btrees(Acc),
943    checkpoint(Acc2);
944
945flush_writes(#writer_acc{initial_build = false} = Acc0) ->
946    #writer_acc{
947        kvs = Kvs,
948        view_empty_kvs = ViewEmptyKVs,
949        group = Group,
950        parent = Parent,
951        owner = Owner,
952        final_batch = IsFinalBatch,
953        part_versions = PartVersions
954    } = Acc0,
955    Mod = Group#set_view_group.mod,
956    % Only incremental updates can contain multiple snapshots
957    {MultipleSnapshots, Kvs2} = merge_snapshots(Kvs),
958    Acc1 = case MultipleSnapshots of
959    true ->
960        Acc2 = maybe_update_btrees(Acc0#writer_acc{force_flush = true}),
961        checkpoint(Acc2);
962    false ->
963        Acc0
964    end,
965    #writer_acc{last_seqs = LastSeqs} = Acc1,
966    {ViewKVs, DocIdViewIdKeys, NewLastSeqs, NewPartVersions} =
967        process_map_results(Mod, Kvs2, ViewEmptyKVs, LastSeqs, PartVersions),
968    Acc3 = Acc1#writer_acc{last_seqs = NewLastSeqs, part_versions = NewPartVersions},
969    Acc4 = write_to_tmp_batch_files(ViewKVs, DocIdViewIdKeys, Acc3),
970    #writer_acc{group = NewGroup} = Acc4,
971    case ?set_seqs(NewGroup) =/= ?set_seqs(Group) of
972    true ->
973        Acc5 = checkpoint(Acc4),
974        case (Acc4#writer_acc.state =:= updating_active) andalso
975            lists:any(fun({PartId, _}) ->
976                ((1 bsl PartId) band ?set_pbitmask(Group) =/= 0)
977            end, NewLastSeqs) of
978        true ->
979            notify_owner(Owner, {state, updating_passive}, Parent),
980            Acc5#writer_acc{state = updating_passive};
981        false ->
982            Acc5
983        end;
984    false ->
985        case IsFinalBatch of
986        true ->
987            checkpoint(Acc4);
988        false ->
989            Acc4
990        end
991    end;
992
993flush_writes(#writer_acc{initial_build = true} = WriterAcc) ->
994    #writer_acc{
995        kvs = Kvs,
996        view_empty_kvs = ViewEmptyKVs,
997        tmp_files = TmpFiles,
998        tmp_dir = TmpDir,
999        group = Group,
1000        final_batch = IsFinalBatch,
1001        max_seqs = MaxSeqs,
1002        part_versions = PartVersions,
1003        stats = Stats
1004    } = WriterAcc,
1005    #set_view_group{
1006        set_name = SetName,
1007        type = Type,
1008        name = DDocId,
1009        mod = Mod
1010    } = Group,
1011    {ViewKVs, DocIdViewIdKeys, MaxSeqs2, PartVersions2} = process_map_results(
1012        Mod, Kvs, ViewEmptyKVs, MaxSeqs, PartVersions),
1013
1014    IdRecords = lists:foldr(
1015        fun({_DocId, {_PartId, []}}, Acc) ->
1016                Acc;
1017            (Kv, Acc) ->
1018                [{KeyBin, ValBin}] = Mod:convert_back_index_kvs_to_binary([Kv], []),
1019                KvBin = [<<(byte_size(KeyBin)):16>>, KeyBin, ValBin],
1020                [[<<(iolist_size(KvBin)):32/native>>, KvBin] | Acc]
1021        end,
1022        [], DocIdViewIdKeys),
1023    #set_view_tmp_file_info{fd = IdFd} = dict:fetch(ids_index, TmpFiles),
1024    ok = file:write(IdFd, IdRecords),
1025
1026    {InsertKVCount, TmpFiles2} = Mod:write_kvs(Group, TmpFiles, ViewKVs),
1027
1028    Stats2 = Stats#set_view_updater_stats{
1029        inserted_kvs = Stats#set_view_updater_stats.inserted_kvs + InsertKVCount,
1030        inserted_ids = Stats#set_view_updater_stats.inserted_ids + length(DocIdViewIdKeys)
1031    },
1032    case IsFinalBatch of
1033    false ->
1034        WriterAcc#writer_acc{
1035            max_seqs = MaxSeqs2,
1036            stats = Stats2,
1037            tmp_files = TmpFiles2
1038        };
1039    true ->
1040        ?LOG_INFO("Updater for set view `~s`, ~s group `~s`, starting btree "
1041                  "build phase" , [?LOG_USERDATA(SetName), Type, ?LOG_USERDATA(DDocId)]),
1042        {Group2, BuildFd} = Mod:finish_build(Group, TmpFiles2, TmpDir),
1043        WriterAcc#writer_acc{
1044            tmp_files = dict:store(build_file, BuildFd, TmpFiles2),
1045            max_seqs = MaxSeqs2,
1046            part_versions = PartVersions2,
1047            stats = Stats2,
1048            group = Group2
1049        }
1050    end.
1051
1052
1053process_map_results(Mod, Kvs, ViewEmptyKVs, PartSeqs, PartVersions) ->
1054    lists:foldl(
1055        fun({Seq, DocId, PartId, []}, {ViewKVsAcc, DocIdViewIdKeysAcc, PartIdSeqs, PartIdVersions}) ->
1056            PartIdSeqs2 = update_part_seq(Seq, PartId, PartIdSeqs),
1057            {ViewKVsAcc, [{DocId, {PartId, []}} | DocIdViewIdKeysAcc], PartIdSeqs2, PartIdVersions};
1058        ({Seq, DocId, PartId, QueryResults}, {ViewKVsAcc, DocIdViewIdKeysAcc, PartIdSeqs, PartIdVersions}) ->
1059            {NewViewKVs, NewViewIdKeys} = Mod:view_insert_doc_query_results(
1060                    DocId, PartId, QueryResults, ViewKVsAcc, [], []),
1061            PartIdSeqs2 = update_part_seq(Seq, PartId, PartIdSeqs),
1062            {NewViewKVs, [{DocId, {PartId, NewViewIdKeys}} | DocIdViewIdKeysAcc], PartIdSeqs2, PartIdVersions};
1063        (snapshot_marker, _Acc) ->
1064            throw({error,
1065                <<"The multiple snapshots should have been merged, "
1066                  "but theu weren't">>});
1067        ({part_versions, {PartId, NewVersions}}, {ViewKVsAcc, DocIdViewIdKeysAcc, PartIdSeqs, PartIdVersions}) ->
1068            PartIdVersions2 = update_part_versions(NewVersions, PartId, PartIdVersions),
1069            {ViewKVsAcc, DocIdViewIdKeysAcc, PartIdSeqs, PartIdVersions2}
1070        end,
1071        {ViewEmptyKVs, [], PartSeqs, PartVersions}, Kvs).
1072
1073
1074-spec update_transferred_replicas(#set_view_group{},
1075                                  partition_seqs(),
1076                                  partition_seqs()) -> #set_view_group{}.
1077update_transferred_replicas(Group, _MaxSeqs, _PartIdSeqs) when ?set_replicas_on_transfer(Group) =:= [] ->
1078    Group;
1079update_transferred_replicas(Group, MaxSeqs, PartIdSeqs) ->
1080    #set_view_group{index_header = Header} = Group,
1081    RepsTransferred = lists:foldl(
1082        fun({PartId, Seq}, A) ->
1083            case lists:member(PartId, ?set_replicas_on_transfer(Group))
1084                andalso (Seq >= couch_set_view_util:get_part_seq(PartId, MaxSeqs)) of
1085            true ->
1086                ordsets:add_element(PartId, A);
1087            false ->
1088                A
1089            end
1090        end,
1091        ordsets:new(), PartIdSeqs),
1092    ReplicasOnTransfer2 = ordsets:subtract(?set_replicas_on_transfer(Group), RepsTransferred),
1093    {Abitmask2, Pbitmask2} = lists:foldl(
1094        fun(Id, {A, P}) ->
1095            Mask = 1 bsl Id,
1096            Mask = ?set_pbitmask(Group) band Mask,
1097            0 = ?set_abitmask(Group) band Mask,
1098            {A bor Mask, P bxor Mask}
1099        end,
1100        {?set_abitmask(Group), ?set_pbitmask(Group)},
1101        RepsTransferred),
1102    Group#set_view_group{
1103        index_header = Header#set_view_index_header{
1104            abitmask = Abitmask2,
1105            pbitmask = Pbitmask2,
1106            replicas_on_transfer = ReplicasOnTransfer2
1107        }
1108    }.
1109
1110
1111-spec update_part_seq(update_seq(), partition_id(), partition_seqs()) -> partition_seqs().
1112update_part_seq(Seq, PartId, Acc) ->
1113    case couch_set_view_util:find_part_seq(PartId, Acc) of
1114    {ok, Max} when Max >= Seq ->
1115        Acc;
1116    _ ->
1117        orddict:store(PartId, Seq, Acc)
1118    end.
1119
1120
1121-spec update_part_versions(partition_version(), partition_id(), partition_versions()) ->
1122    partition_versions().
1123update_part_versions(NewVersions, PartId, PartVersions) ->
1124    orddict:store(PartId, NewVersions, PartVersions).
1125
1126
1127% Incremental updates.
1128write_to_tmp_batch_files(ViewKeyValuesToAdd, DocIdViewIdKeys, WriterAcc) ->
1129    #writer_acc{
1130        tmp_files = TmpFiles,
1131        group = #set_view_group{
1132            id_btree = IdBtree,
1133            mod = Mod
1134        }
1135    } = WriterAcc,
1136
1137    {AddDocIdViewIdKeys0, RemoveDocIds, LookupDocIds} = lists:foldr(
1138        fun({DocId, {PartId, [] = _ViewIdKeys}}, {A, B, C}) ->
1139                BackKey = make_back_index_key(DocId, PartId),
1140                case is_new_partition(PartId, WriterAcc) of
1141                true ->
1142                    {A, [BackKey | B], C};
1143                false ->
1144                    {A, [BackKey | B], [BackKey | C]}
1145                end;
1146            ({DocId, {PartId, _ViewIdKeys}} = KvPairs, {A, B, C}) ->
1147                BackKey = make_back_index_key(DocId, PartId),
1148                case is_new_partition(PartId, WriterAcc) of
1149                true ->
1150                    {[KvPairs | A], B, C};
1151                false ->
1152                    {[KvPairs | A], B, [BackKey | C]}
1153                end
1154        end,
1155        {[], [], []}, DocIdViewIdKeys),
1156
1157    AddDocIdViewIdKeys = Mod:convert_back_index_kvs_to_binary(
1158        AddDocIdViewIdKeys0, []),
1159
1160    IdsData1 = lists:map(
1161        fun(K) -> couch_set_view_updater_helper:encode_op(remove, K) end,
1162        RemoveDocIds),
1163
1164    IdsData2 = lists:foldl(
1165        fun({K, V}, Acc) ->
1166            Bin = couch_set_view_updater_helper:encode_op(insert, K, V),
1167            [Bin | Acc]
1168        end,
1169        IdsData1,
1170        AddDocIdViewIdKeys),
1171
1172    IdTmpFileInfo = dict:fetch(ids_index, TmpFiles),
1173    case IdTmpFileInfo of
1174    #set_view_tmp_file_info{fd = nil} ->
1175        0 = IdTmpFileInfo#set_view_tmp_file_info.size,
1176        IdTmpFilePath = new_sort_file_name(WriterAcc),
1177        {ok, IdTmpFileFd} = file2:open(IdTmpFilePath, [raw, append, binary]),
1178        IdTmpFileSize = 0;
1179    #set_view_tmp_file_info{
1180            fd = IdTmpFileFd, name = IdTmpFilePath, size = IdTmpFileSize} ->
1181        ok
1182    end,
1183
1184    ok = file:write(IdTmpFileFd, IdsData2),
1185
1186    IdTmpFileInfo2 = IdTmpFileInfo#set_view_tmp_file_info{
1187        fd = IdTmpFileFd,
1188        name = IdTmpFilePath,
1189        size = IdTmpFileSize + iolist_size(IdsData2)
1190    },
1191    TmpFiles2 = dict:store(ids_index, IdTmpFileInfo2, TmpFiles),
1192
1193    case LookupDocIds of
1194    [] ->
1195        LookupResults = [];
1196    _ ->
1197        {ok, LookupResults, IdBtree} =
1198            couch_btree:query_modify(IdBtree, LookupDocIds, [], [])
1199    end,
1200    KeysToRemoveByView = lists:foldl(
1201        fun(LookupResult, KeysToRemoveByViewAcc) ->
1202            case LookupResult of
1203            {ok, {<<_Part:16, DocId/binary>>, <<_Part:16, ViewIdKeys/binary>>}} ->
1204                lists:foldl(
1205                    fun({ViewId, Keys}, KeysToRemoveByViewAcc2) ->
1206                        RemoveKeysDict = lists:foldl(
1207                            fun(Key, RemoveKeysDictAcc) ->
1208                                EncodedKey = Mod:encode_key_docid(Key, DocId),
1209                                dict:store(EncodedKey, nil, RemoveKeysDictAcc)
1210                            end,
1211                        couch_util:dict_find(ViewId, KeysToRemoveByViewAcc2, dict:new()), Keys),
1212                        dict:store(ViewId, RemoveKeysDict, KeysToRemoveByViewAcc2)
1213                    end,
1214                    KeysToRemoveByViewAcc, couch_set_view_util:parse_view_id_keys(ViewIdKeys));
1215            {not_found, _} ->
1216                KeysToRemoveByViewAcc
1217            end
1218        end,
1219        dict:new(), LookupResults),
1220
1221    WriterAcc2 = update_tmp_files(
1222        Mod, WriterAcc#writer_acc{tmp_files = TmpFiles2}, ViewKeyValuesToAdd,
1223        KeysToRemoveByView),
1224    maybe_update_btrees(WriterAcc2).
1225
1226
1227% Update the temporary files with the key-values from the indexer. Return
1228% the updated writer accumulator.
1229-spec update_tmp_files(atom(), #writer_acc{},
1230                       [{#set_view{}, [set_view_key_value()]}],
1231                       dict:dict(non_neg_integer(), dict:dict(binary(), nil)))
1232                      -> #writer_acc{}.
1233update_tmp_files(Mod, WriterAcc, ViewKeyValues, KeysToRemoveByView) ->
1234    #writer_acc{
1235       group = Group,
1236       tmp_files = TmpFiles
1237    } = WriterAcc,
1238    TmpFiles2 = lists:foldl(
1239        fun({#set_view{id_num = ViewId}, AddKeyValues}, AccTmpFiles) ->
1240            AddKeyValuesBinaries = Mod:convert_primary_index_kvs_to_binary(
1241                AddKeyValues, Group, []),
1242            KeysToRemoveDict = couch_util:dict_find(
1243                ViewId, KeysToRemoveByView, dict:new()),
1244
1245            case Mod of
1246            % The b-tree replaces nodes with the same key, hence we don't
1247            % need to delete a node that gets updated anyway
1248            mapreduce_view ->
1249                {KeysToRemoveDict2, BatchData} = lists:foldl(
1250                    fun({K, V}, {KeysToRemoveAcc, BinOpAcc}) ->
1251                        Bin = couch_set_view_updater_helper:encode_op(
1252                            insert, K, V),
1253                        BinOpAcc2 = [Bin | BinOpAcc],
1254                        case dict:find(K, KeysToRemoveAcc) of
1255                        {ok, _} ->
1256                            {dict:erase(K, KeysToRemoveAcc), BinOpAcc2};
1257                        _ ->
1258                            {KeysToRemoveAcc, BinOpAcc2}
1259                        end
1260                    end,
1261                    {KeysToRemoveDict, []}, AddKeyValuesBinaries);
1262            % In r-trees there are multiple possible paths to a key. Hence it's
1263            % not easily possible to replace an existing node with a new one,
1264            % as the insertion could happen in a subtree that is different
1265            % from the subtree of the old value.
1266            spatial_view ->
1267                BatchData = [
1268                    couch_set_view_updater_helper:encode_op(insert, K, V) ||
1269                        {K, V} <- AddKeyValuesBinaries],
1270                KeysToRemoveDict2 = KeysToRemoveDict
1271            end,
1272
1273            BatchData2 = dict:fold(
1274                fun(K, _V, BatchAcc) ->
1275                    Bin = couch_set_view_updater_helper:encode_op(remove, K),
1276                    [Bin | BatchAcc]
1277                end,
1278                BatchData, KeysToRemoveDict2),
1279
1280            ViewTmpFileInfo = dict:fetch(ViewId, TmpFiles),
1281            case ViewTmpFileInfo of
1282            #set_view_tmp_file_info{fd = nil} ->
1283                0 = ViewTmpFileInfo#set_view_tmp_file_info.size,
1284                ViewTmpFilePath = new_sort_file_name(WriterAcc),
1285                {ok, ViewTmpFileFd} = file2:open(
1286                    ViewTmpFilePath, [raw, append, binary]),
1287                ViewTmpFileSize = 0;
1288            #set_view_tmp_file_info{fd = ViewTmpFileFd,
1289                                    size = ViewTmpFileSize,
1290                                    name = ViewTmpFilePath} ->
1291                ok
1292            end,
1293            ok = file:write(ViewTmpFileFd, BatchData2),
1294            ViewTmpFileInfo2 = ViewTmpFileInfo#set_view_tmp_file_info{
1295                fd = ViewTmpFileFd,
1296                name = ViewTmpFilePath,
1297                size = ViewTmpFileSize + iolist_size(BatchData2)
1298            },
1299            dict:store(ViewId, ViewTmpFileInfo2, AccTmpFiles)
1300        end,
1301    TmpFiles, ViewKeyValues),
1302    WriterAcc#writer_acc{
1303        tmp_files = TmpFiles2
1304    }.
1305
1306
1307is_new_partition(PartId, #writer_acc{initial_seqs = InitialSeqs}) ->
1308    couch_util:get_value(PartId, InitialSeqs, 0) == 0.
1309
1310
1311% For incremental index updates.
1312maybe_update_btrees(WriterAcc0) ->
1313    #writer_acc{
1314        view_empty_kvs = ViewEmptyKVs,
1315        tmp_files = TmpFiles,
1316        group = Group0,
1317        final_batch = IsFinalBatch,
1318        owner = Owner,
1319        last_seqs = LastSeqs,
1320        part_versions = PartVersions,
1321        force_flush = ForceFlush
1322    } = WriterAcc0,
1323    IdTmpFileInfo = dict:fetch(ids_index, TmpFiles),
1324    ShouldFlushViews =
1325        lists:any(
1326            fun({#set_view{id_num = Id}, _}) ->
1327                ViewTmpFileInfo = dict:fetch(Id, TmpFiles),
1328                ViewTmpFileInfo#set_view_tmp_file_info.size >= ?INC_MAX_TMP_FILE_SIZE
1329            end, ViewEmptyKVs),
1330    ShouldFlush = IsFinalBatch orelse
1331        ForceFlush orelse
1332        ((IdTmpFileInfo#set_view_tmp_file_info.size >= ?INC_MAX_TMP_FILE_SIZE) andalso
1333        ShouldFlushViews),
1334    case ShouldFlush of
1335    false ->
1336        NewLastSeqs1 = LastSeqs,
1337        case erlang:get(updater_worker) of
1338        undefined ->
1339            WriterAcc = WriterAcc0;
1340        UpdaterWorker when is_reference(UpdaterWorker) ->
1341            receive
1342            {UpdaterWorker, UpGroup, UpStats, CompactFiles} ->
1343                send_log_compact_files(Owner, CompactFiles, ?set_seqs(UpGroup),
1344                    ?set_partition_versions(UpGroup)),
1345                erlang:erase(updater_worker),
1346                WriterAcc = check_if_compactor_started(
1347                    WriterAcc0#writer_acc{group = UpGroup, stats = UpStats})
1348            after 0 ->
1349                WriterAcc = WriterAcc0
1350            end
1351        end;
1352    true ->
1353        ok = close_tmp_fd(IdTmpFileInfo),
1354        ok = lists:foreach(
1355            fun(#set_view{id_num = Id}) ->
1356                ViewTmpFileInfo = dict:fetch(Id, TmpFiles),
1357                ok = close_tmp_fd(ViewTmpFileInfo)
1358            end,
1359            Group0#set_view_group.views),
1360        case erlang:erase(updater_worker) of
1361        undefined ->
1362            WriterAcc1 = WriterAcc0;
1363        UpdaterWorker when is_reference(UpdaterWorker) ->
1364            receive
1365            {UpdaterWorker, UpGroup2, UpStats2, CompactFiles2} ->
1366                send_log_compact_files(Owner, CompactFiles2, ?set_seqs(UpGroup2),
1367                    ?set_partition_versions(UpGroup2)),
1368                WriterAcc1 = check_if_compactor_started(
1369                    WriterAcc0#writer_acc{
1370                        group = UpGroup2,
1371                        stats = UpStats2
1372                    })
1373            end
1374        end,
1375
1376
1377        % MB-11472: There is no id sortfile present and hence this is the final batch
1378        % and nothing left to be updated to the btree
1379        case IdTmpFileInfo#set_view_tmp_file_info.name of
1380        nil ->
1381            WriterAcc = WriterAcc1;
1382        _ ->
1383            WriterAcc2 = check_if_compactor_started(WriterAcc1),
1384            NewUpdaterWorker = spawn_updater_worker(WriterAcc2, LastSeqs, PartVersions),
1385            erlang:put(updater_worker, NewUpdaterWorker),
1386
1387            TmpFiles2 = dict:map(
1388                fun(_, _) -> #set_view_tmp_file_info{} end, TmpFiles),
1389            WriterAcc = WriterAcc2#writer_acc{tmp_files = TmpFiles2}
1390        end,
1391        NewLastSeqs1 = orddict:new()
1392    end,
1393    #writer_acc{
1394        stats = NewStats0,
1395        group = NewGroup0
1396    } = WriterAcc,
1397    case IsFinalBatch of
1398    true ->
1399        case erlang:erase(updater_worker) of
1400        undefined ->
1401            NewGroup = NewGroup0,
1402            NewStats = NewStats0;
1403        UpdaterWorker2 when is_reference(UpdaterWorker2) ->
1404            receive
1405            {UpdaterWorker2, NewGroup, NewStats, CompactFiles3} ->
1406                send_log_compact_files(Owner, CompactFiles3, ?set_seqs(NewGroup),
1407                    ?set_partition_versions(NewGroup))
1408            end
1409        end,
1410        NewLastSeqs = orddict:new();
1411    false ->
1412        NewGroup = NewGroup0,
1413        NewStats = NewStats0,
1414        NewLastSeqs = NewLastSeqs1
1415    end,
1416    NewWriterAcc = WriterAcc#writer_acc{
1417        stats = NewStats,
1418        group = NewGroup,
1419        last_seqs = NewLastSeqs
1420    },
1421    NewWriterAcc.
1422
1423
1424send_log_compact_files(_Owner, [], _Seqs, _PartVersions) ->
1425    ok;
1426send_log_compact_files(Owner, Files, Seqs, PartVersions) ->
1427    Init = case erlang:erase(new_compactor) of
1428    true ->
1429        true;
1430    undefined ->
1431        false
1432    end,
1433    ok = gen_server:cast(Owner, {compact_log_files, Files, Seqs, PartVersions, Init}).
1434
1435
1436-spec spawn_updater_worker(#writer_acc{}, partition_seqs(),
1437                           partition_versions()) -> reference().
1438spawn_updater_worker(WriterAcc, PartIdSeqs, PartVersions) ->
1439    Parent = self(),
1440    Ref = make_ref(),
1441    #writer_acc{
1442        group = Group,
1443        parent = UpdaterPid,
1444        max_seqs = MaxSeqs
1445    } = WriterAcc,
1446    % Wait for main updater process to ack
1447    UpdaterPid ! {native_updater_start, self()},
1448    receive
1449    {ok, native_updater_start} ->
1450        ok
1451    end,
1452    Pid = spawn_link(fun() ->
1453        case ?set_cbitmask(Group) of
1454        0 ->
1455            CleanupStart = 0;
1456        _ ->
1457            CleanupStart = os:timestamp()
1458        end,
1459        {ok, NewGroup0, CleanupCount, NewStats, NewCompactFiles} = update_btrees(WriterAcc),
1460        case ?set_cbitmask(Group) of
1461        0 ->
1462            CleanupTime = 0.0;
1463        _ ->
1464            CleanupTime = timer:now_diff(os:timestamp(), CleanupStart) / 1000000,
1465            #set_view_group{
1466                set_name = SetName,
1467                name = DDocId,
1468                type = GroupType
1469            } = Group,
1470            ?LOG_INFO("Updater for set view `~s`, ~s group `~s`, performed cleanup "
1471                      "of ~p key/value pairs in ~.3f seconds",
1472                      [?LOG_USERDATA(SetName), GroupType, ?LOG_USERDATA(DDocId), CleanupCount, CleanupTime])
1473        end,
1474        NewSeqs = update_seqs(PartIdSeqs, ?set_seqs(Group)),
1475        NewPartVersions = update_versions(PartVersions, ?set_partition_versions(Group)),
1476        Header = NewGroup0#set_view_group.index_header,
1477        NewHeader = Header#set_view_index_header{
1478            seqs = NewSeqs,
1479            partition_versions = NewPartVersions
1480        },
1481        NewGroup = NewGroup0#set_view_group{
1482            index_header = NewHeader
1483        },
1484        NewGroup2 = update_transferred_replicas(NewGroup, MaxSeqs, PartIdSeqs),
1485        NumChanges = count_seqs_done(Group, NewSeqs),
1486        NewStats2 = NewStats#set_view_updater_stats{
1487           seqs = NewStats#set_view_updater_stats.seqs + NumChanges,
1488           cleanup_time = NewStats#set_view_updater_stats.seqs + CleanupTime,
1489           cleanup_kv_count = NewStats#set_view_updater_stats.cleanup_kv_count + CleanupCount
1490        },
1491        Parent ! {Ref, NewGroup2, NewStats2, NewCompactFiles}
1492    end),
1493    UpdaterPid ! {native_updater_pid, Pid},
1494    Ref.
1495
1496% Update id btree and view btrees with current batch of changes
1497update_btrees(WriterAcc) ->
1498    #writer_acc{
1499        stats = Stats,
1500        group = Group0,
1501        tmp_dir = TmpDir,
1502        tmp_files = TmpFiles,
1503        compactor_running = CompactorRunning,
1504        max_insert_batch_size = MaxBatchSize
1505    } = WriterAcc,
1506
1507    % Prepare list of operation logs for each btree
1508    #set_view_tmp_file_info{name = IdFile} = dict:fetch(ids_index, TmpFiles),
1509    ViewFiles = lists:map(
1510        fun(#set_view{id_num = Id}) ->
1511            #set_view_tmp_file_info{
1512                name = ViewFile
1513            } = dict:fetch(Id, TmpFiles),
1514            ViewFile
1515        end, Group0#set_view_group.views),
1516    % `LogFiles` is supplied to the native updater. For spatial views only the
1517    % ID b-tree is updated.
1518    LogFiles = case Group0#set_view_group.mod of
1519    mapreduce_view ->
1520        [IdFile | ViewFiles];
1521    spatial_view ->
1522        [IdFile]
1523    end,
1524
1525    % Remove spatial views from group
1526    % The native updater can currently handle mapreduce views only, but it
1527    % handles the ID b-tree of the spatial view
1528    Group = couch_set_view_util:remove_group_views(Group0, spatial_view),
1529
1530    {ok, NewGroup0, Stats2} = couch_set_view_updater_helper:update_btrees(
1531        Group, TmpDir, LogFiles, MaxBatchSize, false),
1532    {IdsInserted, IdsDeleted, KVsInserted, KVsDeleted, CleanupCount} = Stats2,
1533
1534    % Add back spatial views
1535    NewGroup = couch_set_view_util:update_group_views(
1536        NewGroup0, Group0, spatial_view),
1537
1538    % The native update for the Id b-tree was run, now it's time to run the
1539    % Erlang updater for the spatial views
1540    NewGroup2 = case NewGroup#set_view_group.mod of
1541    mapreduce_view ->
1542        NewGroup;
1543    spatial_view = Mod ->
1544        process_flag(trap_exit, true),
1545        ok = couch_file:refresh_eof(NewGroup#set_view_group.fd),
1546        Parent = self(),
1547        Pid = spawn_link(fun() ->
1548            Views = Mod:update_spatial(NewGroup#set_view_group.views, ViewFiles,
1549                MaxBatchSize),
1550            Parent ! {spatial_views_updater_result, Views},
1551            exit(Parent, normal)
1552            end),
1553        receive
1554        {spatial_views_updater_result, Views} ->
1555            NewGroup#set_view_group{
1556                views = Views,
1557                index_header = (NewGroup#set_view_group.index_header)#set_view_index_header{
1558                    view_states = [Mod:get_state(V#set_view.indexer) || V <- Views]
1559                }
1560            };
1561        {'EXIT', Pid, Reason} ->
1562            exit(Reason);
1563        stop ->
1564            #set_view_group{
1565                set_name = SetName,
1566                name = DDocId,
1567                type = Type
1568            } = NewGroup,
1569            ?LOG_INFO("Set view `~s`, ~s group `~s`, index updater (spatial"
1570                      "update) stopped successfully.", [?LOG_USERDATA(SetName), Type, ?LOG_USERDATA(DDocId)]),
1571            couch_util:shutdown_sync(Pid),
1572            exit(shutdown)
1573        end
1574    end,
1575
1576    NewStats = Stats#set_view_updater_stats{
1577     inserted_ids = Stats#set_view_updater_stats.inserted_ids + IdsInserted,
1578     deleted_ids = Stats#set_view_updater_stats.deleted_ids + IdsDeleted,
1579     inserted_kvs = Stats#set_view_updater_stats.inserted_kvs + KVsInserted,
1580     deleted_kvs = Stats#set_view_updater_stats.deleted_kvs + KVsDeleted
1581    },
1582
1583    % Remove files if compactor is not running
1584    % Otherwise send them to compactor to apply deltas
1585    CompactFiles = lists:foldr(
1586        fun(SortedFile, AccCompactFiles) ->
1587            case CompactorRunning of
1588            true ->
1589                case filename:extension(SortedFile) of
1590                ".compact" ->
1591                     [SortedFile | AccCompactFiles];
1592                _ ->
1593                    SortedFile2 = new_sort_file_name(TmpDir, true),
1594                    ok = file2:rename(SortedFile, SortedFile2),
1595                    [SortedFile2 | AccCompactFiles]
1596                end;
1597            false ->
1598                ok = file2:delete(SortedFile),
1599                AccCompactFiles
1600            end
1601        end, [], [IdFile | ViewFiles]),
1602    {ok, NewGroup2, CleanupCount, NewStats, CompactFiles}.
1603
1604
1605update_seqs(PartIdSeqs, Seqs) ->
1606    orddict:fold(
1607        fun(PartId, NewSeq, Acc) ->
1608            OldSeq = couch_util:get_value(PartId, Acc, 0),
1609            case NewSeq > OldSeq of
1610            true ->
1611                ok;
1612            false ->
1613                exit({error, <<"New seq smaller or equal than old seq.">>, PartId, OldSeq, NewSeq})
1614            end,
1615            orddict:store(PartId, NewSeq, Acc)
1616        end,
1617        Seqs, PartIdSeqs).
1618
1619update_versions(PartVersions, AllPartVersions) ->
1620    lists:ukeymerge(1, PartVersions, AllPartVersions).
1621
1622
1623update_task(NumChanges) ->
1624    [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
1625    Changes2 = Changes + NumChanges,
1626    Total2 = erlang:max(Total, Changes2),
1627    Progress = (Changes2 * 100) div Total2,
1628    couch_task_status:update([
1629        {progress, Progress},
1630        {changes_done, Changes2},
1631        {total_changes, Total2}
1632    ]).
1633
1634
1635checkpoint(#writer_acc{owner = Owner, parent = Parent, group = Group} = Acc) ->
1636    #set_view_group{
1637        set_name = SetName,
1638        name = DDocId,
1639        type = Type
1640    } = Group,
1641    ?LOG_DEBUG("Updater checkpointing set view `~s` update for ~s group `~s`",
1642              [?LOG_USERDATA(SetName), Type, ?LOG_USERDATA(DDocId)]),
1643    NewGroup = maybe_fix_group(Group),
1644    ok = couch_file:refresh_eof(NewGroup#set_view_group.fd),
1645    Owner ! {partial_update, Parent, self(), NewGroup},
1646    receive
1647    update_processed ->
1648        ok;
1649    stop ->
1650        exit(shutdown)
1651    end,
1652    Acc#writer_acc{group = NewGroup}.
1653
1654
1655maybe_fix_group(#set_view_group{index_header = Header} = Group) ->
1656    receive
1657    {new_passive_partitions, Parts} ->
1658        Bitmask = couch_set_view_util:build_bitmask(Parts),
1659        {Seqs, PartVersions} = couch_set_view_util:fix_partitions(Group, Parts),
1660        Group#set_view_group{
1661            index_header = Header#set_view_index_header{
1662                seqs = Seqs,
1663                pbitmask = ?set_pbitmask(Group) bor Bitmask,
1664                partition_versions = PartVersions
1665            }
1666        }
1667    after 0 ->
1668        Group
1669    end.
1670
1671
1672check_if_compactor_started(#writer_acc{group = Group0} = Acc) ->
1673    receive
1674    {compactor_started, Pid} ->
1675        erlang:put(new_compactor, true),
1676        Group = maybe_fix_group(Group0),
1677        Pid ! {compactor_started_ack, self(), Group},
1678        Acc#writer_acc{compactor_running = true, group = Group}
1679    after 0 ->
1680        Acc
1681    end.
1682
1683
1684init_tmp_files(WriterAcc) ->
1685    #writer_acc{
1686        group = Group, initial_build = Init, tmp_dir = TmpDir
1687    } = WriterAcc,
1688    case WriterAcc#writer_acc.compactor_running of
1689    true ->
1690        ok = couch_set_view_util:delete_sort_files(TmpDir, updater);
1691    false ->
1692        ok = couch_set_view_util:delete_sort_files(TmpDir, all)
1693    end,
1694    Ids = [ids_index | [V#set_view.id_num || V <- Group#set_view_group.views]],
1695    Files = case Init of
1696    true ->
1697        [begin
1698             FileName = new_sort_file_name(WriterAcc),
1699             {ok, Fd} = file2:open(FileName, [raw, append, binary]),
1700             {Id, #set_view_tmp_file_info{fd = Fd, name = FileName}}
1701         end || Id <- Ids];
1702    false ->
1703         [{Id, #set_view_tmp_file_info{}} || Id <- Ids]
1704    end,
1705    WriterAcc#writer_acc{tmp_files = dict:from_list(Files)}.
1706
1707
1708new_sort_file_name(#writer_acc{tmp_dir = TmpDir, compactor_running = Cr}) ->
1709    new_sort_file_name(TmpDir, Cr).
1710
1711new_sort_file_name(TmpDir, true) ->
1712    couch_set_view_util:new_sort_file_path(TmpDir, compactor);
1713new_sort_file_name(TmpDir, false) ->
1714    couch_set_view_util:new_sort_file_path(TmpDir, updater).
1715
1716
1717make_back_index_key(DocId, PartId) ->
1718    <<PartId:16, DocId/binary>>.
1719
1720
1721count_seqs_done(Group, NewSeqs) ->
1722    % NewSeqs might have new passive partitions that Group's seqs doesn't
1723    % have yet (will get them after a checkpoint period).
1724    lists:foldl(
1725        fun({PartId, SeqDone}, Acc) ->
1726            SeqBefore = couch_util:get_value(PartId, ?set_seqs(Group), 0),
1727            Acc + (SeqDone - SeqBefore)
1728        end,
1729        0, NewSeqs).
1730
1731
1732close_tmp_fd(#set_view_tmp_file_info{fd = nil}) ->
1733    ok;
1734close_tmp_fd(#set_view_tmp_file_info{fd = Fd}) ->
1735    ok = file:close(Fd).
1736
1737
1738% DCP introduces the concept of snapshots, where a document mutation is only
1739% guaranteed to be unique within a single snapshot. But the flusher expects
1740% unique mutations within the full batch. Merge multiple snapshots (if there
1741% are any) into a single one. The latest mutation wins.
1742merge_snapshots(KVs) ->
1743    merge_snapshots(KVs, false, []).
1744
1745merge_snapshots([], true, Acc) ->
1746    {true, Acc};
1747merge_snapshots([], false, Acc) ->
1748    % The order of the KVs doesn't really matter, but having them sorted the
1749    % same way will make life easier when debugging
1750    {false, lists:reverse(Acc)};
1751merge_snapshots([snapshot_marker | KVs], _MultipleSnapshots, Acc) ->
1752    merge_snapshots(KVs, true, Acc);
1753merge_snapshots([{part_versions, _} = PartVersions | KVs], MultipleSnapshots, Acc) ->
1754    merge_snapshots(KVs, MultipleSnapshots, [PartVersions | Acc]);
1755merge_snapshots([KV | KVs], true, Acc0) ->
1756    {_Seq, DocId, _PartId, _QueryResults} = KV,
1757    Acc = lists:keystore(DocId, 2, Acc0, KV),
1758    merge_snapshots(KVs, true, Acc);
1759merge_snapshots([KV | KVs], false, Acc) ->
1760    merge_snapshots(KVs, false, [KV | Acc]).
1761