1% -*- Mode: Erlang; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3% Licensed under the Apache License, Version 2.0 (the "License"); you may not
4% use this file except in compliance with the License. You may obtain a copy of
5% the License at
6%
7%   http://www.apache.org/licenses/LICENSE-2.0
8%
9% Unless required by applicable law or agreed to in writing, software
10% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12% License for the specific language governing permissions and limitations under
13% the License.
14
15-module(couch_set_view_updater).
16
17-export([update/6]).
18% Exported for the MapReduce specific stuff
19-export([new_sort_file_name/1]).
20
21-include("couch_db.hrl").
22-include_lib("couch_set_view/include/couch_set_view.hrl").
23-include_lib("couch_dcp/include/couch_dcp.hrl").
24
25-define(MAP_QUEUE_SIZE, 256 * 1024).
26-define(WRITE_QUEUE_SIZE, 512 * 1024).
27% The size of the accumulator the emitted key-values are queued up in before
28% they get actually queued. The bigger the value, the less the lock contention,
29% but the higher the possible memory consumption is.
30-define(QUEUE_ACC_BATCH_SIZE, 256 * 1024).
31
32% incremental updates
33-define(INC_MAX_TMP_FILE_SIZE, 31457280).
34-define(MIN_BATCH_SIZE_PER_VIEW, 65536).
35
36% For file sorter and file merger commands.
37-define(PORT_OPTS,
38        [exit_status, use_stdio, stderr_to_stdout, {line, 4096}, binary]).
39
40-record(writer_acc, {
41    parent,
42    owner,
43    group,
44    last_seqs = orddict:new(),
45    part_versions = orddict:new(),
46    compactor_running,
47    write_queue,
48    initial_build,
49    view_empty_kvs,
50    kvs = [],
51    kvs_size = 0,
52    state = updating_active,
53    final_batch = false,
54    max_seqs,
55    stats = #set_view_updater_stats{},
56    tmp_dir = nil,
57    initial_seqs,
58    max_insert_batch_size,
59    tmp_files = dict:new(),
60    throttle = 0,
61    force_flush = false
62}).
63
64
65-spec update(pid(), #set_view_group{},
66             partition_seqs(), boolean(), string(), [term()]) -> no_return().
67update(Owner, Group, CurSeqs, CompactorRunning, TmpDir, Options) ->
68    #set_view_group{
69        set_name = SetName,
70        type = Type,
71        name = DDocId
72    } = Group,
73
74    ActiveParts = couch_set_view_util:decode_bitmask(?set_abitmask(Group)),
75    PassiveParts = couch_set_view_util:decode_bitmask(?set_pbitmask(Group)),
76    NumChanges = couch_set_view_util:missing_changes_count(CurSeqs, ?set_seqs(Group)),
77
78    process_flag(trap_exit, true),
79
80    BeforeEnterTs = os:timestamp(),
81    Parent = self(),
82    BarrierEntryPid = spawn_link(fun() ->
83        DDocIds = couch_set_view_util:get_ddoc_ids_with_sig(SetName, Group),
84        couch_task_status:add_task([
85            {type, blocked_indexer},
86            {set, SetName},
87            {signature, ?l2b(couch_util:to_hex(Group#set_view_group.sig))},
88            {design_documents, DDocIds},
89            {indexer_type, Type}
90        ]),
91        case Type of
92        main ->
93            ok = couch_index_barrier:enter(couch_main_index_barrier, Parent);
94        replica ->
95            ok = couch_index_barrier:enter(couch_replica_index_barrier, Parent)
96        end,
97        Parent ! {done, self(), (timer:now_diff(os:timestamp(), BeforeEnterTs) / 1000000)},
98        receive shutdown -> ok end
99    end),
100
101    BlockedTime = receive
102    {done, BarrierEntryPid, Duration} ->
103        Duration;
104    {'EXIT', _, Reason} ->
105        exit({updater_error, Reason})
106    end,
107
108    CleanupParts = couch_set_view_util:decode_bitmask(?set_cbitmask(Group)),
109    InitialBuild = couch_set_view_util:is_initial_build(Group),
110    ?LOG_INFO("Updater for set view `~s`, ~s group `~s` started~n"
111              "Active partitions:    ~w~n"
112              "Passive partitions:   ~w~n"
113              "Cleanup partitions:   ~w~n"
114              "Replicas to transfer: ~w~n"
115              "Pending transition:   ~n"
116              "    active:           ~w~n"
117              "    passive:          ~w~n"
118              "    unindexable:      ~w~n"
119              "Initial build:        ~s~n"
120              "Compactor running:    ~s~n"
121              "Min # changes:        ~p~n"
122              "Partition versions:   ~w~n",
123              [SetName, Type, DDocId,
124               ActiveParts,
125               PassiveParts,
126               CleanupParts,
127               ?set_replicas_on_transfer(Group),
128               ?pending_transition_active(?set_pending_transition(Group)),
129               ?pending_transition_passive(?set_pending_transition(Group)),
130               ?pending_transition_unindexable(?set_pending_transition(Group)),
131               InitialBuild,
132               CompactorRunning,
133               NumChanges,
134               ?set_partition_versions(Group)
135              ]),
136
137    WriterAcc0 = #writer_acc{
138        parent = self(),
139        owner = Owner,
140        group = Group,
141        initial_build = InitialBuild,
142        max_seqs = CurSeqs,
143        part_versions = ?set_partition_versions(Group),
144        tmp_dir = TmpDir,
145        max_insert_batch_size = list_to_integer(
146            couch_config:get("set_views", "indexer_max_insert_batch_size", "1048576"))
147    },
148    update(WriterAcc0, ActiveParts, PassiveParts,
149            BlockedTime, BarrierEntryPid, NumChanges, CompactorRunning, Options).
150
151
152update(WriterAcc, ActiveParts, PassiveParts, BlockedTime,
153       BarrierEntryPid, NumChanges, CompactorRunning, Options) ->
154    #writer_acc{
155        owner = Owner,
156        group = Group
157    } = WriterAcc,
158    #set_view_group{
159        set_name = SetName,
160        type = Type,
161        name = DDocId,
162        sig = GroupSig
163    } = Group,
164
165    StartTime = os:timestamp(),
166
167    MapQueueOptions = [{max_size, ?MAP_QUEUE_SIZE}, {max_items, infinity}],
168    WriteQueueOptions = [{max_size, ?WRITE_QUEUE_SIZE}, {max_items, infinity}],
169    {ok, MapQueue} = couch_work_queue:new(MapQueueOptions),
170    {ok, WriteQueue} = couch_work_queue:new(WriteQueueOptions),
171
172    Parent = self(),
173
174    Mapper = spawn_link(fun() ->
175        try
176            do_maps(Group, MapQueue, WriteQueue)
177        catch _:Error ->
178            Stacktrace = erlang:get_stacktrace(),
179            ?LOG_ERROR("Set view `~s`, ~s group `~s`, mapper error~n"
180                "error:      ~p~n"
181                "stacktrace: ~p~n",
182                [SetName, Type, DDocId, Error, Stacktrace]),
183            exit(Error)
184        end
185    end),
186
187    Writer = spawn_link(fun() ->
188        BarrierEntryPid ! shutdown,
189        ViewEmptyKVs = [{View, []} || View <- Group#set_view_group.views],
190        WriterAcc2 = init_tmp_files(WriterAcc#writer_acc{
191            parent = Parent,
192            group = Group,
193            write_queue = WriteQueue,
194            view_empty_kvs = ViewEmptyKVs,
195            compactor_running = CompactorRunning,
196            initial_seqs = ?set_seqs(Group),
197            throttle = case lists:member(throttle, Options) of
198                true ->
199                    list_to_integer(
200                        couch_config:get("set_views", "throttle_period", "100"));
201                false ->
202                    0
203                end
204        }),
205        ok = couch_set_view_util:open_raw_read_fd(Group),
206        try
207            WriterAcc3 = do_writes(WriterAcc2),
208            receive
209            {doc_loader_finished, {PartVersions0, RealMaxSeqs}} ->
210                WriterAccStats = WriterAcc3#writer_acc.stats,
211                WriterAccGroup = WriterAcc3#writer_acc.group,
212                WriterAccHeader = WriterAccGroup#set_view_group.index_header,
213                PartVersions = lists:ukeymerge(1, PartVersions0,
214                    WriterAccHeader#set_view_index_header.partition_versions),
215                case WriterAcc3#writer_acc.initial_build of
216                true ->
217                    % The doc loader might not load the mutations up to the
218                    % most recent one, but only to a lower one. Update the
219                    % group header and stats with the correct information.
220                    MaxSeqs = lists:ukeymerge(
221                        1, RealMaxSeqs, WriterAccHeader#set_view_index_header.seqs),
222                    Stats = WriterAccStats#set_view_updater_stats{
223                        seqs = lists:sum([S || {_, S} <- RealMaxSeqs])
224                    };
225                false ->
226                    MaxSeqs = WriterAccHeader#set_view_index_header.seqs,
227                    Stats = WriterAcc3#writer_acc.stats
228                end,
229                FinalWriterAcc = WriterAcc3#writer_acc{
230                    stats = Stats,
231                    group = WriterAccGroup#set_view_group{
232                        index_header = WriterAccHeader#set_view_index_header{
233                            partition_versions = PartVersions,
234                            seqs = MaxSeqs
235                        }
236                    }
237                }
238            end,
239            Parent ! {writer_finished, FinalWriterAcc}
240        catch _:Error ->
241            Stacktrace = erlang:get_stacktrace(),
242            ?LOG_ERROR("Set view `~s`, ~s group `~s`, writer error~n"
243                "error:      ~p~n"
244                "stacktrace: ~p~n",
245                [SetName, Type, DDocId, Error, Stacktrace]),
246            exit(Error)
247        after
248            ok = couch_set_view_util:close_raw_read_fd(Group)
249        end
250    end),
251
252    InitialBuild = WriterAcc#writer_acc.initial_build,
253    NumChanges2 = case InitialBuild of
254        true ->
255            couch_set_view_updater_helper:count_items_from_set(Group, ActiveParts ++ PassiveParts);
256        false ->
257            NumChanges
258    end,
259
260    DocLoader = spawn_link(fun() ->
261        DDocIds = couch_set_view_util:get_ddoc_ids_with_sig(SetName, Group),
262        couch_task_status:add_task([
263            {type, indexer},
264            {set, SetName},
265            {signature, ?l2b(couch_util:to_hex(GroupSig))},
266            {design_documents, DDocIds},
267            {indexer_type, Type},
268            {progress, 0},
269            {changes_done, 0},
270            {initial_build, InitialBuild},
271            {total_changes, NumChanges2}
272        ]),
273        couch_task_status:set_update_frequency(5000),
274        case lists:member(pause, Options) of
275        true ->
276            % For reliable unit testing, to verify that adding new partitions
277            % to the passive state doesn't restart the updater and the updater
278            % can be aware of it and index these new partitions in the same run.
279            receive continue -> ok end;
280        false ->
281            ok
282        end,
283        try
284            {PartVersions, MaxSeqs} = load_changes(
285                Owner, Parent, Group, MapQueue, ActiveParts, PassiveParts,
286                WriterAcc#writer_acc.max_seqs,
287                WriterAcc#writer_acc.initial_build),
288            Parent ! {doc_loader_finished, {PartVersions, MaxSeqs}}
289        catch
290        throw:purge ->
291            exit(purge);
292        throw:{rollback, RollbackSeqs} ->
293            exit({rollback, RollbackSeqs});
294        _:Error ->
295            Stacktrace = erlang:get_stacktrace(),
296            ?LOG_ERROR("Set view `~s`, ~s group `~s`, doc loader error~n"
297                "error:      ~p~n"
298                "stacktrace: ~p~n",
299                [SetName, Type, DDocId, Error, Stacktrace]),
300            exit(Error)
301        end,
302        % Since updater progress stats is added from docloader,
303        % this process has to stay till updater has completed.
304        receive
305        updater_finished ->
306            ok
307        end
308    end),
309
310    Result = wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, Group),
311    case Type of
312    main ->
313        ok = couch_index_barrier:leave(couch_main_index_barrier);
314    replica ->
315        ok = couch_index_barrier:leave(couch_replica_index_barrier)
316    end,
317    case Result of
318    {updater_finished, #set_view_updater_result{group = NewGroup}} ->
319        ?LOG_DEBUG("Updater for ~s set view group `~s`, set `~s`, writer finished:~n"
320                   "  start seqs: ~w~n"
321                   "  end seqs:   ~w~n",
322                   [Type, DDocId, SetName, ?set_seqs(Group), ?set_seqs(NewGroup)]);
323    _ ->
324        ok
325    end,
326    DocLoader ! updater_finished,
327    exit(Result).
328
329
330wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup) ->
331    #set_view_group{set_name = SetName, name = DDocId, type = Type} = OldGroup,
332    receive
333    {new_passive_partitions, _} = NewPassivePartitions ->
334        Writer ! NewPassivePartitions,
335        DocLoader ! NewPassivePartitions,
336        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
337    continue ->
338        % Used by unit tests.
339        DocLoader ! continue,
340        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
341    {doc_loader_finished, {PartVersions, MaxSeqs}} ->
342        Writer ! {doc_loader_finished, {PartVersions, MaxSeqs}},
343        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
344    {writer_finished, WriterAcc} ->
345        Stats0 = WriterAcc#writer_acc.stats,
346        Result = #set_view_updater_result{
347            group = WriterAcc#writer_acc.group,
348            state = WriterAcc#writer_acc.state,
349            stats = Stats0#set_view_updater_stats{
350                indexing_time = timer:now_diff(os:timestamp(), StartTime) / 1000000,
351                blocked_time = BlockedTime
352            },
353            tmp_file = case WriterAcc#writer_acc.initial_build of
354                true ->
355                    dict:fetch(build_file, WriterAcc#writer_acc.tmp_files);
356                false ->
357                    ""
358                end
359        },
360        {updater_finished, Result};
361    {compactor_started, Pid, Ref} ->
362        ?LOG_INFO("Set view `~s`, ~s group `~s`, updater received "
363                  "compactor ~p notification, ref ~p, writer ~p",
364                   [SetName, Type, DDocId, Pid, Ref, Writer]),
365        Writer ! {compactor_started, self()},
366        erlang:put(compactor_pid, {Pid, Ref}),
367        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
368    {compactor_started_ack, Writer, GroupSnapshot} ->
369        ?LOG_INFO("Set view `~s`, ~s group `~s`, updater received compaction ack"
370                  " from writer ~p", [SetName, Type, DDocId, Writer]),
371        case erlang:erase(compactor_pid) of
372        {Pid, Ref} ->
373            Pid ! {Ref, {ok, GroupSnapshot}};
374        undefined ->
375            ok
376        end,
377        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
378    {'EXIT', _, Reason} when Reason =/= normal ->
379        couch_util:shutdown_sync(DocLoader),
380        couch_util:shutdown_sync(Mapper),
381        couch_util:shutdown_sync(Writer),
382        {updater_error, Reason};
383    {native_updater_start, Writer} ->
384        % We need control over spawning native updater process
385        % This helps to terminate native os processes correctly
386        Writer ! {ok, native_updater_start},
387        receive
388        {native_updater_pid, NativeUpdater} ->
389            erlang:put(native_updater, NativeUpdater)
390        end,
391        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
392    stop ->
393        case erlang:erase(native_updater) of
394        undefined ->
395            couch_util:shutdown_sync(DocLoader),
396            couch_util:shutdown_sync(Mapper),
397            couch_util:shutdown_sync(Writer);
398        NativeUpdater ->
399            MRef = erlang:monitor(process, NativeUpdater),
400            NativeUpdater ! stop,
401            receive
402            {'DOWN', MRef, process, NativeUpdater, _} ->
403                couch_util:shutdown_sync(DocLoader),
404                couch_util:shutdown_sync(Mapper),
405                couch_util:shutdown_sync(Writer)
406            end
407        end,
408        exit({updater_error, shutdown})
409    end.
410
411
412dcp_marker_to_string(Type) ->
413    case Type band ?DCP_SNAPSHOT_TYPE_MASK of
414    ?DCP_SNAPSHOT_TYPE_DISK ->
415        "on-disk";
416    ?DCP_SNAPSHOT_TYPE_MEMORY ->
417        "in-memory";
418    _ ->
419        io_lib:format("unknown (~w)", [Type])
420    end.
421
422
423load_changes(Owner, Updater, Group, MapQueue, ActiveParts, PassiveParts,
424        EndSeqs, InitialBuild) ->
425    #set_view_group{
426        set_name = SetName,
427        name = DDocId,
428        type = GroupType,
429        index_header = #set_view_index_header{
430            seqs = SinceSeqs,
431            partition_versions = PartVersions0
432        },
433        dcp_pid = DcpPid,
434        category = Category
435    } = Group,
436
437    MaxDocSize = list_to_integer(
438        couch_config:get("set_views", "indexer_max_doc_size", "0")),
439    FoldFun = fun({PartId, EndSeq}, {AccCount, AccSeqs, AccVersions, AccRollbacks}) ->
440        case couch_set_view_util:has_part_seq(PartId, ?set_unindexable_seqs(Group))
441            andalso not lists:member(PartId, ?set_replicas_on_transfer(Group)) of
442        true ->
443            {AccCount, AccSeqs, AccVersions, AccRollbacks};
444        false ->
445            Since = couch_util:get_value(PartId, SinceSeqs, 0),
446            PartVersions = couch_util:get_value(PartId, AccVersions),
447            Flags = case InitialBuild of
448            true ->
449                ?DCP_FLAG_DISKONLY;
450            false ->
451                ?DCP_FLAG_NOFLAG
452            end,
453            % For stream request from 0, If a vbucket got reset in the window
454            % of time between seqno was obtained from stats and stream request
455            % was made, the end_seqno may be higher than current vb high seqno.
456            % Use a special flag to tell server to set end_seqno.
457            Flags2 = case PartVersions of
458            [{0, 0}] ->
459                Flags bor ?DCP_FLAG_USELATEST_ENDSEQNO;
460            _ ->
461                Flags
462            end,
463
464            case AccRollbacks of
465            [] ->
466                case EndSeq =:= Since of
467                true ->
468                    {AccCount, AccSeqs, AccVersions, AccRollbacks};
469                false ->
470                    ChangesWrapper = fun
471                        ({part_versions, _} = NewVersions, Acc) ->
472                            queue_doc(
473                                NewVersions, MapQueue, Group,
474                                MaxDocSize, InitialBuild),
475                            Acc;
476                        ({snapshot_marker, {MarkerStartSeq, MarkerEndSeq, MarkerType}}, {Count, _})
477                            when MarkerType band ?DCP_SNAPSHOT_TYPE_MASK =/= 0 ->
478                            ?LOG_INFO(
479                                "set view `~s`, ~s (~s) group `~s`: received "
480                                "a snapshot marker (~s) for partition ~p from "
481                                "sequence ~p to ~p",
482                                [SetName, GroupType, Category, DDocId,
483                                    dcp_marker_to_string(MarkerType),
484                                    PartId, MarkerStartSeq, MarkerEndSeq]),
485                            case Count of
486                            % Ignore the snapshot marker that is at the
487                            % beginning of the stream.
488                            % If it wasn't ignored, it would lead to an
489                            % additional forced flush which isn't needed. A
490                            % flush is needed if there are several mutations
491                            % of the same document within one batch. As two
492                            % different partitions can't contain the same
493                            % document ID, we are safe to not force flushing
494                            % between two partitions.
495                            0 ->
496                                {Count, MarkerEndSeq};
497                            _ ->
498                                queue_doc(
499                                    snapshot_marker, MapQueue, Group,
500                                    MaxDocSize, InitialBuild),
501                                {Count, MarkerEndSeq}
502                            end;
503                        ({snapshot_marker, {MarkerStartSeq, MarkerEndSeq, MarkerType}}, Acc) ->
504                            ?LOG_ERROR(
505                                "set view `~s`, ~s (~s) group `~s`: received "
506                                "a snapshot marker (~s) for partition ~p from "
507                                "sequence ~p to ~p",
508                                [SetName, GroupType, Category, DDocId,
509                                    dcp_marker_to_string(MarkerType),
510                                    PartId, MarkerStartSeq, MarkerEndSeq]),
511                            throw({error, unknown_snapshot_marker, MarkerType}),
512                            Acc;
513                        (#dcp_doc{} = Item, {Count, AccEndSeq}) ->
514                            queue_doc(
515                                Item, MapQueue, Group,
516                                MaxDocSize, InitialBuild),
517                            {Count + 1, AccEndSeq}
518                        end,
519                    Result = couch_dcp_client:enum_docs_since(
520                        DcpPid, PartId, PartVersions, Since, EndSeq, Flags2,
521                        ChangesWrapper, {0, 0}),
522                    case Result of
523                    {ok, {AccCount2, AccEndSeq}, NewPartVersions} ->
524                        AccSeqs2 = orddict:store(PartId, AccEndSeq, AccSeqs),
525                        AccVersions2 = lists:ukeymerge(
526                            1, [{PartId, NewPartVersions}], AccVersions),
527                        AccRollbacks2 = AccRollbacks;
528                    {rollback, RollbackSeq} ->
529                        AccCount2 = AccCount,
530                        AccSeqs2 = AccSeqs,
531                        AccVersions2 = AccVersions,
532                        AccRollbacks2 = ordsets:add_element(
533                            {PartId, RollbackSeq}, AccRollbacks);
534                    Error ->
535                        AccCount2 = AccCount,
536                        AccSeqs2 = AccSeqs,
537                        AccVersions2 = AccVersions,
538                        AccRollbacks2 = AccRollbacks,
539                        ?LOG_ERROR("set view `~s`, ~s (~s) group `~s` error"
540                            "while loading changes for partition ~p:~n~p~n",
541                            [SetName, GroupType, Category, DDocId, PartId,
542                                Error]),
543                        throw(Error)
544                    end,
545                    {AccCount2, AccSeqs2, AccVersions2, AccRollbacks2}
546                end;
547            _ ->
548                % If there is a rollback needed, don't store any new documents
549                % in the index, but just check for a rollback of another
550                % partition (i.e. a request with start seq == end seq)
551                ChangesWrapper = fun(_, _) -> ok end,
552                Result = couch_dcp_client:enum_docs_since(
553                    DcpPid, PartId, PartVersions, Since, Since, Flags2,
554                    ChangesWrapper, ok),
555                case Result of
556                {ok, _, _} ->
557                    AccRollbacks2 = AccRollbacks;
558                {rollback, RollbackSeq} ->
559                    AccRollbacks2 = ordsets:add_element(
560                        {PartId, RollbackSeq}, AccRollbacks)
561                end,
562                {AccCount, AccSeqs, AccVersions, AccRollbacks2}
563            end
564        end
565    end,
566
567    notify_owner(Owner, {state, updating_active}, Updater),
568    case ActiveParts of
569    [] ->
570        ActiveChangesCount = 0,
571        MaxSeqs = orddict:new(),
572        PartVersions = PartVersions0,
573        Rollbacks = [];
574    _ ->
575        ?LOG_INFO("Updater reading changes from active partitions to "
576                  "update ~s set view group `~s` from set `~s`",
577                  [GroupType, DDocId, SetName]),
578        {ActiveChangesCount, MaxSeqs, PartVersions, Rollbacks} = lists:foldl(
579            FoldFun, {0, orddict:new(), PartVersions0, ordsets:new()},
580            couch_set_view_util:filter_seqs(ActiveParts, EndSeqs))
581    end,
582    case PassiveParts of
583    [] ->
584        FinalChangesCount = ActiveChangesCount,
585        MaxSeqs2 = MaxSeqs,
586        PartVersions2 = PartVersions,
587        Rollbacks2 = Rollbacks;
588    _ ->
589        ?LOG_INFO("Updater reading changes from passive partitions to "
590                  "update ~s set view group `~s` from set `~s`",
591                  [GroupType, DDocId, SetName]),
592        {FinalChangesCount, MaxSeqs2, PartVersions2, Rollbacks2} = lists:foldl(
593            FoldFun, {ActiveChangesCount, MaxSeqs, PartVersions, Rollbacks},
594            couch_set_view_util:filter_seqs(PassiveParts, EndSeqs))
595    end,
596    {FinalChangesCount3, MaxSeqs3, PartVersions3, Rollbacks3} =
597        load_changes_from_passive_parts_in_mailbox(DcpPid,
598            Group, FoldFun, FinalChangesCount, MaxSeqs2, PartVersions2, Rollbacks2),
599
600    case Rollbacks3 of
601    [] ->
602        ok;
603    _ ->
604        throw({rollback, Rollbacks3})
605    end,
606
607    couch_work_queue:close(MapQueue),
608    ?LOG_INFO("Updater for ~s set view group `~s`, set `~s` (~s), "
609              "read a total of ~p changes",
610              [GroupType, DDocId, SetName, Category, FinalChangesCount3]),
611    ?LOG_DEBUG("Updater for ~s set view group `~s`, set `~s`, max partition seqs found:~n~w",
612               [GroupType, DDocId, SetName, MaxSeqs3]),
613    {PartVersions3, MaxSeqs3}.
614
615
616load_changes_from_passive_parts_in_mailbox(DcpPid,
617        Group, FoldFun, ChangesCount, MaxSeqs0, PartVersions0, Rollbacks) ->
618    #set_view_group{
619        set_name = SetName,
620        name = DDocId,
621        type = GroupType
622    } = Group,
623    receive
624    {new_passive_partitions, Parts0} ->
625        Parts = get_more_passive_partitions(Parts0),
626        AddPartVersions = [{P, [{0, 0}]} || P <- Parts],
627        {ok, AddMaxSeqs} = couch_dcp_client:get_seqs(DcpPid, Parts),
628        PartVersions = lists:ukeymerge(1, AddPartVersions, PartVersions0),
629
630        MaxSeqs = lists:ukeymerge(1, AddMaxSeqs, MaxSeqs0),
631        ?LOG_INFO("Updater reading changes from new passive partitions ~w to "
632                  "update ~s set view group `~s` from set `~s`",
633                  [Parts, GroupType, DDocId, SetName]),
634        {ChangesCount2, MaxSeqs2, PartVersions2, Rollbacks2} = lists:foldl(
635            FoldFun, {ChangesCount, MaxSeqs, PartVersions, Rollbacks}, AddMaxSeqs),
636        load_changes_from_passive_parts_in_mailbox(DcpPid,
637            Group, FoldFun, ChangesCount2, MaxSeqs2, PartVersions2, Rollbacks2)
638    after 0 ->
639        {ChangesCount, MaxSeqs0, PartVersions0, Rollbacks}
640    end.
641
642
643get_more_passive_partitions(Parts) ->
644    receive
645    {new_passive_partitions, Parts2} ->
646        get_more_passive_partitions(Parts ++ Parts2)
647    after 0 ->
648        lists:sort(Parts)
649    end.
650
651
652notify_owner(Owner, Msg, UpdaterPid) ->
653    Owner ! {updater_info, UpdaterPid, Msg}.
654
655
656queue_doc(snapshot_marker, MapQueue, _Group, _MaxDocSize, _InitialBuild) ->
657    couch_work_queue:queue(MapQueue, snapshot_marker);
658queue_doc({part_versions, _} = PartVersions, MapQueue, _Group, _MaxDocSize,
659    _InitialBuild) ->
660    couch_work_queue:queue(MapQueue, PartVersions);
661queue_doc(Doc, MapQueue, Group, MaxDocSize, InitialBuild) ->
662    case Doc#dcp_doc.deleted of
663    true when InitialBuild ->
664        Entry = nil;
665    true ->
666        Entry = Doc;
667    false ->
668        #set_view_group{
669           set_name = SetName,
670           name = DDocId,
671           type = GroupType
672        } = Group,
673        case couch_util:validate_utf8(Doc#dcp_doc.id) of
674        true ->
675            case (MaxDocSize > 0) andalso
676                (iolist_size(Doc#dcp_doc.body) > MaxDocSize) of
677            true ->
678                ?LOG_MAPREDUCE_ERROR("Bucket `~s`, ~s group `~s`, skipping "
679                    "document with ID `~s`: too large body (~p bytes)",
680                    [SetName, GroupType, DDocId,
681                     ?b2l(Doc#dcp_doc.id), iolist_size(Doc#dcp_doc.body)]),
682                Entry = Doc#dcp_doc{deleted = true};
683            false ->
684                Entry = Doc
685            end;
686        false ->
687            % If the id isn't utf8 (memcached allows it), then log an error
688            % message and skip the doc. Send it through the queue anyway
689            % so we record the high seq num in case there are a bunch of
690            % these at the end, we want to keep track of the high seq and
691            % not reprocess again.
692            ?LOG_MAPREDUCE_ERROR("Bucket `~s`, ~s group `~s`, skipping "
693                "document with non-utf8 id. Doc id bytes: ~w",
694                [SetName, GroupType, DDocId, ?b2l(Doc#dcp_doc.id)]),
695            Entry = Doc#dcp_doc{deleted = true}
696        end
697    end,
698    case Entry of
699    nil ->
700        ok;
701    _ ->
702        couch_work_queue:queue(MapQueue, Entry),
703        update_task(1)
704    end.
705
706
707do_maps(Group, MapQueue, WriteQueue) ->
708    #set_view_group{
709        set_name = SetName,
710        name = DDocId,
711        type = Type,
712        mod = Mod
713    } = Group,
714    case couch_work_queue:dequeue(MapQueue) of
715    closed ->
716        couch_work_queue:close(WriteQueue);
717    {ok, Queue, _QueueSize} ->
718        ViewCount = length(Group#set_view_group.views),
719        {Items, _} = lists:foldl(
720            fun(#dcp_doc{deleted = true} = DcpDoc, {Acc, Size}) ->
721                #dcp_doc{
722                    id = Id,
723                    partition = PartId,
724                    seq = Seq
725                } = DcpDoc,
726                Item = {Seq, Id, PartId, []},
727                {[Item | Acc], Size};
728            (#dcp_doc{deleted = false} = DcpDoc, {Acc0, Size0}) ->
729                % When there are a lot of emits per document the memory can
730                % grow almost indefinitely as the queue size is only limited
731                % by the number of documents and not their emits.
732                % In case the accumulator grows huge, queue the items early
733                % into the writer queue. Only take emits into account, the
734                % other cases in this `foldl` won't ever have an significant
735                % size.
736                {Acc, Size} = case Size0 > ?QUEUE_ACC_BATCH_SIZE of
737                true ->
738                    couch_work_queue:queue(WriteQueue, lists:reverse(Acc0)),
739                    {[], 0};
740                false ->
741                    {Acc0, Size0}
742                end,
743                #dcp_doc{
744                    id = Id,
745                    body = Body,
746                    partition = PartId,
747                    rev_seq = RevSeq,
748                    seq = Seq,
749                    cas = Cas,
750                    expiration = Expiration,
751                    flags = Flags,
752                    data_type = DcpDataType
753                } = DcpDoc,
754                DataType = case DcpDataType of
755                ?DCP_DATA_TYPE_RAW ->
756                    ?CONTENT_META_NON_JSON_MODE;
757                ?DCP_DATA_TYPE_JSON ->
758                    ?CONTENT_META_JSON
759                end,
760                Doc = #doc{
761                    id = Id,
762                    rev = {RevSeq, <<Cas:64, Expiration:32, Flags:32>>},
763                    body = Body,
764                    content_meta = DataType,
765                    deleted = false
766                },
767                try
768                    {ok, Result, LogList} = couch_set_view_mapreduce:map(
769                        Doc, PartId, Seq, Group),
770                    {Result2, _} = lists:foldr(
771                        fun({error, Reason}, {AccRes, Pos}) ->
772                            ErrorMsg = "Bucket `~s`, ~s group `~s`, error mapping"
773                                    " document `~s` for view `~s`: ~s",
774                            ViewName = Mod:view_name(Group, Pos),
775                            Args = [SetName, Type, DDocId, Id, ViewName,
776                                    couch_util:to_binary(Reason)],
777                            ?LOG_MAPREDUCE_ERROR(ErrorMsg, Args),
778                            {[[] | AccRes], Pos - 1};
779                        (KVs, {AccRes, Pos}) ->
780                            {[KVs | AccRes], Pos - 1}
781                        end,
782                        {[], ViewCount}, Result),
783                    lists:foreach(
784                        fun(Msg) ->
785                            DebugMsg = "Bucket `~s`, ~s group `~s`, map function"
786                                " log for document `~s`: ~s",
787                            Args = [SetName, Type, DDocId, Id, binary_to_list(Msg)],
788                            ?LOG_MAPREDUCE_ERROR(DebugMsg, Args)
789                        end, LogList),
790                    Item = {Seq, Id, PartId, Result2},
791                    {[Item | Acc], Size + erlang:external_size(Result2)}
792                catch _:{error, Reason} ->
793                    ErrorMsg = "Bucket `~s`, ~s group `~s`, error mapping document `~s`: ~s",
794                    Args = [SetName, Type, DDocId, Id, couch_util:to_binary(Reason)],
795                    ?LOG_MAPREDUCE_ERROR(ErrorMsg, Args),
796                    {[{Seq, Id, PartId, []} | Acc], Size}
797                end;
798            (snapshot_marker, {Acc, Size}) ->
799                {[snapshot_marker | Acc], Size};
800            ({part_versions, _} = PartVersions, {Acc, Size}) ->
801                {[PartVersions | Acc], Size}
802            end,
803            {[], 0}, Queue),
804        ok = couch_work_queue:queue(WriteQueue, lists:reverse(Items)),
805        do_maps(Group, MapQueue, WriteQueue)
806    end.
807
808
809do_writes(Acc) ->
810    #writer_acc{
811        kvs = Kvs,
812        kvs_size = KvsSize,
813        write_queue = WriteQueue,
814        throttle = Throttle
815    } = Acc,
816    ok = timer:sleep(Throttle),
817    case couch_work_queue:dequeue(WriteQueue) of
818    closed ->
819        flush_writes(Acc#writer_acc{final_batch = true});
820    {ok, Queue0, QueueSize} ->
821        Queue = lists:flatten(Queue0),
822        Kvs2 = Kvs ++ Queue,
823        KvsSize2 = KvsSize + QueueSize,
824
825        Acc2 = Acc#writer_acc{
826            kvs = Kvs2,
827            kvs_size = KvsSize2
828        },
829        case should_flush_writes(Acc2) of
830        true ->
831            Acc3 = flush_writes(Acc2),
832            Acc4 = Acc3#writer_acc{kvs = [], kvs_size = 0};
833        false ->
834            Acc4 = Acc2
835        end,
836        do_writes(Acc4)
837    end.
838
839should_flush_writes(Acc) ->
840    #writer_acc{
841        view_empty_kvs = ViewEmptyKvs,
842        kvs_size = KvsSize
843    } = Acc,
844    KvsSize >= (?MIN_BATCH_SIZE_PER_VIEW * length(ViewEmptyKvs)).
845
846
847flush_writes(#writer_acc{kvs = [], initial_build = false} = Acc) ->
848    Acc2 = maybe_update_btrees(Acc),
849    checkpoint(Acc2);
850
851flush_writes(#writer_acc{initial_build = false} = Acc0) ->
852    #writer_acc{
853        kvs = Kvs,
854        view_empty_kvs = ViewEmptyKVs,
855        group = Group,
856        parent = Parent,
857        owner = Owner,
858        final_batch = IsFinalBatch,
859        part_versions = PartVersions
860    } = Acc0,
861    Mod = Group#set_view_group.mod,
862    % Only incremental updates can contain multiple snapshots
863    {MultipleSnapshots, Kvs2} = merge_snapshots(Kvs),
864    Acc1 = case MultipleSnapshots of
865    true ->
866        Acc2 = maybe_update_btrees(Acc0#writer_acc{force_flush = true}),
867        checkpoint(Acc2);
868    false ->
869        Acc0
870    end,
871    #writer_acc{last_seqs = LastSeqs} = Acc1,
872    {ViewKVs, DocIdViewIdKeys, NewLastSeqs, NewPartVersions} =
873        process_map_results(Mod, Kvs2, ViewEmptyKVs, LastSeqs, PartVersions),
874    Acc3 = Acc1#writer_acc{last_seqs = NewLastSeqs, part_versions = NewPartVersions},
875    Acc4 = write_to_tmp_batch_files(ViewKVs, DocIdViewIdKeys, Acc3),
876    #writer_acc{group = NewGroup} = Acc4,
877    case ?set_seqs(NewGroup) =/= ?set_seqs(Group) of
878    true ->
879        Acc5 = checkpoint(Acc4),
880        case (Acc4#writer_acc.state =:= updating_active) andalso
881            lists:any(fun({PartId, _}) ->
882                ((1 bsl PartId) band ?set_pbitmask(Group) =/= 0)
883            end, NewLastSeqs) of
884        true ->
885            notify_owner(Owner, {state, updating_passive}, Parent),
886            Acc5#writer_acc{state = updating_passive};
887        false ->
888            Acc5
889        end;
890    false ->
891        case IsFinalBatch of
892        true ->
893            checkpoint(Acc4);
894        false ->
895            Acc4
896        end
897    end;
898
899flush_writes(#writer_acc{initial_build = true} = WriterAcc) ->
900    #writer_acc{
901        kvs = Kvs,
902        view_empty_kvs = ViewEmptyKVs,
903        tmp_files = TmpFiles,
904        tmp_dir = TmpDir,
905        group = Group,
906        final_batch = IsFinalBatch,
907        max_seqs = MaxSeqs,
908        part_versions = PartVersions,
909        stats = Stats
910    } = WriterAcc,
911    #set_view_group{
912        set_name = SetName,
913        type = Type,
914        name = DDocId,
915        mod = Mod
916    } = Group,
917    {ViewKVs, DocIdViewIdKeys, MaxSeqs2, PartVersions2} = process_map_results(
918        Mod, Kvs, ViewEmptyKVs, MaxSeqs, PartVersions),
919
920    IdRecords = lists:foldr(
921        fun({_DocId, {_PartId, []}}, Acc) ->
922                Acc;
923            (Kv, Acc) ->
924                [{KeyBin, ValBin}] = Mod:convert_back_index_kvs_to_binary([Kv], []),
925                KvBin = [<<(byte_size(KeyBin)):16>>, KeyBin, ValBin],
926                [[<<(iolist_size(KvBin)):32/native>>, KvBin] | Acc]
927        end,
928        [], DocIdViewIdKeys),
929    #set_view_tmp_file_info{fd = IdFd} = dict:fetch(ids_index, TmpFiles),
930    ok = file:write(IdFd, IdRecords),
931
932    {InsertKVCount, TmpFiles2} = Mod:write_kvs(Group, TmpFiles, ViewKVs),
933
934    Stats2 = Stats#set_view_updater_stats{
935        inserted_kvs = Stats#set_view_updater_stats.inserted_kvs + InsertKVCount,
936        inserted_ids = Stats#set_view_updater_stats.inserted_ids + length(DocIdViewIdKeys)
937    },
938    case IsFinalBatch of
939    false ->
940        WriterAcc#writer_acc{
941            max_seqs = MaxSeqs2,
942            stats = Stats2,
943            tmp_files = TmpFiles2
944        };
945    true ->
946        ?LOG_INFO("Updater for set view `~s`, ~s group `~s`, starting btree "
947                  "build phase" , [SetName, Type, DDocId]),
948        {Group2, BuildFd} = Mod:finish_build(Group, TmpFiles2, TmpDir),
949        WriterAcc#writer_acc{
950            tmp_files = dict:store(build_file, BuildFd, TmpFiles2),
951            max_seqs = MaxSeqs2,
952            part_versions = PartVersions2,
953            stats = Stats2,
954            group = Group2
955        }
956    end.
957
958
959process_map_results(Mod, Kvs, ViewEmptyKVs, PartSeqs, PartVersions) ->
960    lists:foldl(
961        fun({Seq, DocId, PartId, []}, {ViewKVsAcc, DocIdViewIdKeysAcc, PartIdSeqs, PartIdVersions}) ->
962            PartIdSeqs2 = update_part_seq(Seq, PartId, PartIdSeqs),
963            {ViewKVsAcc, [{DocId, {PartId, []}} | DocIdViewIdKeysAcc], PartIdSeqs2, PartIdVersions};
964        ({Seq, DocId, PartId, QueryResults}, {ViewKVsAcc, DocIdViewIdKeysAcc, PartIdSeqs, PartIdVersions}) ->
965            {NewViewKVs, NewViewIdKeys} = Mod:view_insert_doc_query_results(
966                    DocId, PartId, QueryResults, ViewKVsAcc, [], []),
967            PartIdSeqs2 = update_part_seq(Seq, PartId, PartIdSeqs),
968            {NewViewKVs, [{DocId, {PartId, NewViewIdKeys}} | DocIdViewIdKeysAcc], PartIdSeqs2, PartIdVersions};
969        (snapshot_marker, _Acc) ->
970            throw({error,
971                <<"The multiple snapshots should have been merged, "
972                  "but theu weren't">>});
973        ({part_versions, {PartId, NewVersions}}, {ViewKVsAcc, DocIdViewIdKeysAcc, PartIdSeqs, PartIdVersions}) ->
974            PartIdVersions2 = update_part_versions(NewVersions, PartId, PartIdVersions),
975            {ViewKVsAcc, DocIdViewIdKeysAcc, PartIdSeqs, PartIdVersions2}
976        end,
977        {ViewEmptyKVs, [], PartSeqs, PartVersions}, Kvs).
978
979
980-spec update_transferred_replicas(#set_view_group{},
981                                  partition_seqs(),
982                                  partition_seqs()) -> #set_view_group{}.
983update_transferred_replicas(Group, _MaxSeqs, _PartIdSeqs) when ?set_replicas_on_transfer(Group) =:= [] ->
984    Group;
985update_transferred_replicas(Group, MaxSeqs, PartIdSeqs) ->
986    #set_view_group{index_header = Header} = Group,
987    RepsTransferred = lists:foldl(
988        fun({PartId, Seq}, A) ->
989            case lists:member(PartId, ?set_replicas_on_transfer(Group))
990                andalso (Seq >= couch_set_view_util:get_part_seq(PartId, MaxSeqs)) of
991            true ->
992                ordsets:add_element(PartId, A);
993            false ->
994                A
995            end
996        end,
997        ordsets:new(), PartIdSeqs),
998    ReplicasOnTransfer2 = ordsets:subtract(?set_replicas_on_transfer(Group), RepsTransferred),
999    {Abitmask2, Pbitmask2} = lists:foldl(
1000        fun(Id, {A, P}) ->
1001            Mask = 1 bsl Id,
1002            Mask = ?set_pbitmask(Group) band Mask,
1003            0 = ?set_abitmask(Group) band Mask,
1004            {A bor Mask, P bxor Mask}
1005        end,
1006        {?set_abitmask(Group), ?set_pbitmask(Group)},
1007        RepsTransferred),
1008    Group#set_view_group{
1009        index_header = Header#set_view_index_header{
1010            abitmask = Abitmask2,
1011            pbitmask = Pbitmask2,
1012            replicas_on_transfer = ReplicasOnTransfer2
1013        }
1014    }.
1015
1016
1017-spec update_part_seq(update_seq(), partition_id(), partition_seqs()) -> partition_seqs().
1018update_part_seq(Seq, PartId, Acc) ->
1019    case couch_set_view_util:find_part_seq(PartId, Acc) of
1020    {ok, Max} when Max >= Seq ->
1021        Acc;
1022    _ ->
1023        orddict:store(PartId, Seq, Acc)
1024    end.
1025
1026
1027-spec update_part_versions(partition_version(), partition_id(), partition_versions()) ->
1028    partition_versions().
1029update_part_versions(NewVersions, PartId, PartVersions) ->
1030    orddict:store(PartId, NewVersions, PartVersions).
1031
1032
1033% Incremental updates.
1034write_to_tmp_batch_files(ViewKeyValuesToAdd, DocIdViewIdKeys, WriterAcc) ->
1035    #writer_acc{
1036        tmp_files = TmpFiles,
1037        group = #set_view_group{
1038            id_btree = IdBtree,
1039            mod = Mod
1040        }
1041    } = WriterAcc,
1042
1043    {AddDocIdViewIdKeys0, RemoveDocIds, LookupDocIds} = lists:foldr(
1044        fun({DocId, {PartId, [] = _ViewIdKeys}}, {A, B, C}) ->
1045                BackKey = make_back_index_key(DocId, PartId),
1046                case is_new_partition(PartId, WriterAcc) of
1047                true ->
1048                    {A, [BackKey | B], C};
1049                false ->
1050                    {A, [BackKey | B], [BackKey | C]}
1051                end;
1052            ({DocId, {PartId, _ViewIdKeys}} = KvPairs, {A, B, C}) ->
1053                BackKey = make_back_index_key(DocId, PartId),
1054                case is_new_partition(PartId, WriterAcc) of
1055                true ->
1056                    {[KvPairs | A], B, C};
1057                false ->
1058                    {[KvPairs | A], B, [BackKey | C]}
1059                end
1060        end,
1061        {[], [], []}, DocIdViewIdKeys),
1062
1063    AddDocIdViewIdKeys = Mod:convert_back_index_kvs_to_binary(
1064        AddDocIdViewIdKeys0, []),
1065
1066    IdsData1 = lists:map(
1067        fun(K) -> couch_set_view_updater_helper:encode_op(remove, K) end,
1068        RemoveDocIds),
1069
1070    IdsData2 = lists:foldl(
1071        fun({K, V}, Acc) ->
1072            Bin = couch_set_view_updater_helper:encode_op(insert, K, V),
1073            [Bin | Acc]
1074        end,
1075        IdsData1,
1076        AddDocIdViewIdKeys),
1077
1078    IdTmpFileInfo = dict:fetch(ids_index, TmpFiles),
1079    case IdTmpFileInfo of
1080    #set_view_tmp_file_info{fd = nil} ->
1081        0 = IdTmpFileInfo#set_view_tmp_file_info.size,
1082        IdTmpFilePath = new_sort_file_name(WriterAcc),
1083        {ok, IdTmpFileFd} = file2:open(IdTmpFilePath, [raw, append, binary]),
1084        IdTmpFileSize = 0;
1085    #set_view_tmp_file_info{
1086            fd = IdTmpFileFd, name = IdTmpFilePath, size = IdTmpFileSize} ->
1087        ok
1088    end,
1089
1090    ok = file:write(IdTmpFileFd, IdsData2),
1091
1092    IdTmpFileInfo2 = IdTmpFileInfo#set_view_tmp_file_info{
1093        fd = IdTmpFileFd,
1094        name = IdTmpFilePath,
1095        size = IdTmpFileSize + iolist_size(IdsData2)
1096    },
1097    TmpFiles2 = dict:store(ids_index, IdTmpFileInfo2, TmpFiles),
1098
1099    case LookupDocIds of
1100    [] ->
1101        LookupResults = [];
1102    _ ->
1103        {ok, LookupResults, IdBtree} =
1104            couch_btree:query_modify(IdBtree, LookupDocIds, [], [])
1105    end,
1106    KeysToRemoveByView = lists:foldl(
1107        fun(LookupResult, KeysToRemoveByViewAcc) ->
1108            case LookupResult of
1109            {ok, {<<_Part:16, DocId/binary>>, <<_Part:16, ViewIdKeys/binary>>}} ->
1110                lists:foldl(
1111                    fun({ViewId, Keys}, KeysToRemoveByViewAcc2) ->
1112                        RemoveKeysDict = lists:foldl(
1113                            fun(Key, RemoveKeysDictAcc) ->
1114                                EncodedKey = Mod:encode_key_docid(Key, DocId),
1115                                dict:store(EncodedKey, nil, RemoveKeysDictAcc)
1116                            end,
1117                        couch_util:dict_find(ViewId, KeysToRemoveByViewAcc2, dict:new()), Keys),
1118                        dict:store(ViewId, RemoveKeysDict, KeysToRemoveByViewAcc2)
1119                    end,
1120                    KeysToRemoveByViewAcc, couch_set_view_util:parse_view_id_keys(ViewIdKeys));
1121            {not_found, _} ->
1122                KeysToRemoveByViewAcc
1123            end
1124        end,
1125        dict:new(), LookupResults),
1126
1127    WriterAcc2 = update_tmp_files(
1128        Mod, WriterAcc#writer_acc{tmp_files = TmpFiles2}, ViewKeyValuesToAdd,
1129        KeysToRemoveByView),
1130    maybe_update_btrees(WriterAcc2).
1131
1132
1133% Update the temporary files with the key-values from the indexer. Return
1134% the updated writer accumulator.
1135-spec update_tmp_files(atom(), #writer_acc{},
1136                       [{#set_view{}, [set_view_key_value()]}],
1137                       dict:dict(non_neg_integer(), dict:dict(binary(), nil)))
1138                      -> #writer_acc{}.
1139update_tmp_files(Mod, WriterAcc, ViewKeyValues, KeysToRemoveByView) ->
1140    #writer_acc{
1141       group = Group,
1142       tmp_files = TmpFiles
1143    } = WriterAcc,
1144    TmpFiles2 = lists:foldl(
1145        fun({#set_view{id_num = ViewId}, AddKeyValues}, AccTmpFiles) ->
1146            AddKeyValuesBinaries = Mod:convert_primary_index_kvs_to_binary(
1147                AddKeyValues, Group, []),
1148            KeysToRemoveDict = couch_util:dict_find(
1149                ViewId, KeysToRemoveByView, dict:new()),
1150
1151            case Mod of
1152            % The b-tree replaces nodes with the same key, hence we don't
1153            % need to delete a node that gets updated anyway
1154            mapreduce_view ->
1155                {KeysToRemoveDict2, BatchData} = lists:foldl(
1156                    fun({K, V}, {KeysToRemoveAcc, BinOpAcc}) ->
1157                        Bin = couch_set_view_updater_helper:encode_op(
1158                            insert, K, V),
1159                        BinOpAcc2 = [Bin | BinOpAcc],
1160                        case dict:find(K, KeysToRemoveAcc) of
1161                        {ok, _} ->
1162                            {dict:erase(K, KeysToRemoveAcc), BinOpAcc2};
1163                        _ ->
1164                            {KeysToRemoveAcc, BinOpAcc2}
1165                        end
1166                    end,
1167                    {KeysToRemoveDict, []}, AddKeyValuesBinaries);
1168            % In r-trees there are multiple possible paths to a key. Hence it's
1169            % not easily possible to replace an existing node with a new one,
1170            % as the insertion could happen in a subtree that is different
1171            % from the subtree of the old value.
1172            spatial_view ->
1173                BatchData = [
1174                    couch_set_view_updater_helper:encode_op(insert, K, V) ||
1175                        {K, V} <- AddKeyValuesBinaries],
1176                KeysToRemoveDict2 = KeysToRemoveDict
1177            end,
1178
1179            BatchData2 = dict:fold(
1180                fun(K, _V, BatchAcc) ->
1181                    Bin = couch_set_view_updater_helper:encode_op(remove, K),
1182                    [Bin | BatchAcc]
1183                end,
1184                BatchData, KeysToRemoveDict2),
1185
1186            ViewTmpFileInfo = dict:fetch(ViewId, TmpFiles),
1187            case ViewTmpFileInfo of
1188            #set_view_tmp_file_info{fd = nil} ->
1189                0 = ViewTmpFileInfo#set_view_tmp_file_info.size,
1190                ViewTmpFilePath = new_sort_file_name(WriterAcc),
1191                {ok, ViewTmpFileFd} = file2:open(
1192                    ViewTmpFilePath, [raw, append, binary]),
1193                ViewTmpFileSize = 0;
1194            #set_view_tmp_file_info{fd = ViewTmpFileFd,
1195                                    size = ViewTmpFileSize,
1196                                    name = ViewTmpFilePath} ->
1197                ok
1198            end,
1199            ok = file:write(ViewTmpFileFd, BatchData2),
1200            ViewTmpFileInfo2 = ViewTmpFileInfo#set_view_tmp_file_info{
1201                fd = ViewTmpFileFd,
1202                name = ViewTmpFilePath,
1203                size = ViewTmpFileSize + iolist_size(BatchData2)
1204            },
1205            dict:store(ViewId, ViewTmpFileInfo2, AccTmpFiles)
1206        end,
1207    TmpFiles, ViewKeyValues),
1208    WriterAcc#writer_acc{
1209        tmp_files = TmpFiles2
1210    }.
1211
1212
1213is_new_partition(PartId, #writer_acc{initial_seqs = InitialSeqs}) ->
1214    couch_util:get_value(PartId, InitialSeqs, 0) == 0.
1215
1216
1217% For incremental index updates.
1218maybe_update_btrees(WriterAcc0) ->
1219    #writer_acc{
1220        view_empty_kvs = ViewEmptyKVs,
1221        tmp_files = TmpFiles,
1222        group = Group0,
1223        final_batch = IsFinalBatch,
1224        owner = Owner,
1225        last_seqs = LastSeqs,
1226        part_versions = PartVersions,
1227        force_flush = ForceFlush
1228    } = WriterAcc0,
1229    IdTmpFileInfo = dict:fetch(ids_index, TmpFiles),
1230    ShouldFlushViews =
1231        lists:any(
1232            fun({#set_view{id_num = Id}, _}) ->
1233                ViewTmpFileInfo = dict:fetch(Id, TmpFiles),
1234                ViewTmpFileInfo#set_view_tmp_file_info.size >= ?INC_MAX_TMP_FILE_SIZE
1235            end, ViewEmptyKVs),
1236    ShouldFlush = IsFinalBatch orelse
1237        ForceFlush orelse
1238        ((IdTmpFileInfo#set_view_tmp_file_info.size >= ?INC_MAX_TMP_FILE_SIZE) andalso
1239        ShouldFlushViews),
1240    case ShouldFlush of
1241    false ->
1242        NewLastSeqs1 = LastSeqs,
1243        case erlang:get(updater_worker) of
1244        undefined ->
1245            WriterAcc = WriterAcc0;
1246        UpdaterWorker when is_reference(UpdaterWorker) ->
1247            receive
1248            {UpdaterWorker, UpGroup, UpStats, CompactFiles} ->
1249                send_log_compact_files(Owner, CompactFiles, ?set_seqs(UpGroup),
1250                    ?set_partition_versions(UpGroup)),
1251                erlang:erase(updater_worker),
1252                WriterAcc = check_if_compactor_started(
1253                    WriterAcc0#writer_acc{group = UpGroup, stats = UpStats})
1254            after 0 ->
1255                WriterAcc = WriterAcc0
1256            end
1257        end;
1258    true ->
1259        ok = close_tmp_fd(IdTmpFileInfo),
1260        ok = lists:foreach(
1261            fun(#set_view{id_num = Id}) ->
1262                ViewTmpFileInfo = dict:fetch(Id, TmpFiles),
1263                ok = close_tmp_fd(ViewTmpFileInfo)
1264            end,
1265            Group0#set_view_group.views),
1266        case erlang:erase(updater_worker) of
1267        undefined ->
1268            WriterAcc1 = WriterAcc0;
1269        UpdaterWorker when is_reference(UpdaterWorker) ->
1270            receive
1271            {UpdaterWorker, UpGroup2, UpStats2, CompactFiles2} ->
1272                send_log_compact_files(Owner, CompactFiles2, ?set_seqs(UpGroup2),
1273                    ?set_partition_versions(UpGroup2)),
1274                WriterAcc1 = check_if_compactor_started(
1275                    WriterAcc0#writer_acc{
1276                        group = UpGroup2,
1277                        stats = UpStats2
1278                    })
1279            end
1280        end,
1281
1282
1283        % MB-11472: There is no id sortfile present and hence this is the final batch
1284        % and nothing left to be updated to the btree
1285        case IdTmpFileInfo#set_view_tmp_file_info.name of
1286        nil ->
1287            WriterAcc = WriterAcc1;
1288        _ ->
1289            WriterAcc2 = check_if_compactor_started(WriterAcc1),
1290            NewUpdaterWorker = spawn_updater_worker(WriterAcc2, LastSeqs, PartVersions),
1291            erlang:put(updater_worker, NewUpdaterWorker),
1292
1293            TmpFiles2 = dict:map(
1294                fun(_, _) -> #set_view_tmp_file_info{} end, TmpFiles),
1295            WriterAcc = WriterAcc2#writer_acc{tmp_files = TmpFiles2}
1296        end,
1297        NewLastSeqs1 = orddict:new()
1298    end,
1299    #writer_acc{
1300        stats = NewStats0,
1301        group = NewGroup0
1302    } = WriterAcc,
1303    case IsFinalBatch of
1304    true ->
1305        case erlang:erase(updater_worker) of
1306        undefined ->
1307            NewGroup = NewGroup0,
1308            NewStats = NewStats0;
1309        UpdaterWorker2 when is_reference(UpdaterWorker2) ->
1310            receive
1311            {UpdaterWorker2, NewGroup, NewStats, CompactFiles3} ->
1312                send_log_compact_files(Owner, CompactFiles3, ?set_seqs(NewGroup),
1313                    ?set_partition_versions(NewGroup))
1314            end
1315        end,
1316        NewLastSeqs = orddict:new();
1317    false ->
1318        NewGroup = NewGroup0,
1319        NewStats = NewStats0,
1320        NewLastSeqs = NewLastSeqs1
1321    end,
1322    NewWriterAcc = WriterAcc#writer_acc{
1323        stats = NewStats,
1324        group = NewGroup,
1325        last_seqs = NewLastSeqs
1326    },
1327    NewWriterAcc.
1328
1329
1330send_log_compact_files(_Owner, [], _Seqs, _PartVersions) ->
1331    ok;
1332send_log_compact_files(Owner, Files, Seqs, PartVersions) ->
1333    Init = case erlang:erase(new_compactor) of
1334    true ->
1335        true;
1336    undefined ->
1337        false
1338    end,
1339    ok = gen_server:cast(Owner, {compact_log_files, Files, Seqs, PartVersions, Init}).
1340
1341
1342-spec spawn_updater_worker(#writer_acc{}, partition_seqs(),
1343                           partition_versions()) -> reference().
1344spawn_updater_worker(WriterAcc, PartIdSeqs, PartVersions) ->
1345    Parent = self(),
1346    Ref = make_ref(),
1347    #writer_acc{
1348        group = Group,
1349        parent = UpdaterPid,
1350        max_seqs = MaxSeqs
1351    } = WriterAcc,
1352    % Wait for main updater process to ack
1353    UpdaterPid ! {native_updater_start, self()},
1354    receive
1355    {ok, native_updater_start} ->
1356        ok
1357    end,
1358    Pid = spawn_link(fun() ->
1359        case ?set_cbitmask(Group) of
1360        0 ->
1361            CleanupStart = 0;
1362        _ ->
1363            CleanupStart = os:timestamp()
1364        end,
1365        {ok, NewGroup0, CleanupCount, NewStats, NewCompactFiles} = update_btrees(WriterAcc),
1366        case ?set_cbitmask(Group) of
1367        0 ->
1368            CleanupTime = 0.0;
1369        _ ->
1370            CleanupTime = timer:now_diff(os:timestamp(), CleanupStart) / 1000000,
1371            #set_view_group{
1372                set_name = SetName,
1373                name = DDocId,
1374                type = GroupType
1375            } = Group,
1376            ?LOG_INFO("Updater for set view `~s`, ~s group `~s`, performed cleanup "
1377                      "of ~p key/value pairs in ~.3f seconds",
1378                      [SetName, GroupType, DDocId, CleanupCount, CleanupTime])
1379        end,
1380        NewSeqs = update_seqs(PartIdSeqs, ?set_seqs(Group)),
1381        NewPartVersions = update_versions(PartVersions, ?set_partition_versions(Group)),
1382        Header = NewGroup0#set_view_group.index_header,
1383        NewHeader = Header#set_view_index_header{
1384            seqs = NewSeqs,
1385            partition_versions = NewPartVersions
1386        },
1387        NewGroup = NewGroup0#set_view_group{
1388            index_header = NewHeader
1389        },
1390        NewGroup2 = update_transferred_replicas(NewGroup, MaxSeqs, PartIdSeqs),
1391        NumChanges = count_seqs_done(Group, NewSeqs),
1392        NewStats2 = NewStats#set_view_updater_stats{
1393           seqs = NewStats#set_view_updater_stats.seqs + NumChanges,
1394           cleanup_time = NewStats#set_view_updater_stats.seqs + CleanupTime,
1395           cleanup_kv_count = NewStats#set_view_updater_stats.cleanup_kv_count + CleanupCount
1396        },
1397        Parent ! {Ref, NewGroup2, NewStats2, NewCompactFiles}
1398    end),
1399    UpdaterPid ! {native_updater_pid, Pid},
1400    Ref.
1401
1402% Update id btree and view btrees with current batch of changes
1403update_btrees(WriterAcc) ->
1404    #writer_acc{
1405        stats = Stats,
1406        group = Group0,
1407        tmp_dir = TmpDir,
1408        tmp_files = TmpFiles,
1409        compactor_running = CompactorRunning,
1410        max_insert_batch_size = MaxBatchSize
1411    } = WriterAcc,
1412
1413    % Prepare list of operation logs for each btree
1414    #set_view_tmp_file_info{name = IdFile} = dict:fetch(ids_index, TmpFiles),
1415    ViewFiles = lists:map(
1416        fun(#set_view{id_num = Id}) ->
1417            #set_view_tmp_file_info{
1418                name = ViewFile
1419            } = dict:fetch(Id, TmpFiles),
1420            ViewFile
1421        end, Group0#set_view_group.views),
1422    % `LogFiles` is supplied to the native updater. For spatial views only the
1423    % ID b-tree is updated.
1424    LogFiles = case Group0#set_view_group.mod of
1425    mapreduce_view ->
1426        [IdFile | ViewFiles];
1427    spatial_view ->
1428        [IdFile]
1429    end,
1430
1431    % Remove spatial views from group
1432    % The native updater can currently handle mapreduce views only, but it
1433    % handles the ID b-tree of the spatial view
1434    Group = couch_set_view_util:remove_group_views(Group0, spatial_view),
1435
1436    {ok, NewGroup0, Stats2} = couch_set_view_updater_helper:update_btrees(
1437        Group, TmpDir, LogFiles, MaxBatchSize, false),
1438    {IdsInserted, IdsDeleted, KVsInserted, KVsDeleted, CleanupCount} = Stats2,
1439
1440    % Add back spatial views
1441    NewGroup = couch_set_view_util:update_group_views(
1442        NewGroup0, Group0, spatial_view),
1443
1444    % The native update for the Id b-tree was run, now it's time to run the
1445    % Erlang updater for the spatial views
1446    NewGroup2 = case NewGroup#set_view_group.mod of
1447    mapreduce_view ->
1448        NewGroup;
1449    spatial_view = Mod ->
1450        process_flag(trap_exit, true),
1451        ok = couch_file:refresh_eof(NewGroup#set_view_group.fd),
1452        Pid = spawn_link(exit(fun() ->
1453            Views = Mod:update_spatial(NewGroup#set_view_group.views, ViewFiles,
1454                MaxBatchSize),
1455            {spatial_views_updater_result, Views}
1456            end)),
1457        receive
1458        {'EXIT', Pid, Result} ->
1459            case Result of
1460            {spatial_views_updater_result, Views} ->
1461                NewGroup#set_view_group{
1462                    views = Views,
1463                    index_header = (NewGroup#set_view_group.index_header)#set_view_index_header{
1464                        view_states = [Mod:get_state(V#set_view.indexer) || V <- Views]
1465                    }
1466                };
1467            _ ->
1468                exit(Result)
1469            end;
1470        stop ->
1471            #set_view_group{
1472                set_name = SetName,
1473                name = DDocId,
1474                type = Type
1475            } = NewGroup,
1476            ?LOG_INFO("Set view `~s`, ~s group `~s`, index updater (spatial"
1477                      "update) stopped successfully.", [SetName, Type, DDocId]),
1478            couch_util:shutdown_sync(Pid),
1479            exit(shutdown)
1480        end
1481    end,
1482
1483    NewStats = Stats#set_view_updater_stats{
1484     inserted_ids = Stats#set_view_updater_stats.inserted_ids + IdsInserted,
1485     deleted_ids = Stats#set_view_updater_stats.deleted_ids + IdsDeleted,
1486     inserted_kvs = Stats#set_view_updater_stats.inserted_kvs + KVsInserted,
1487     deleted_kvs = Stats#set_view_updater_stats.deleted_kvs + KVsDeleted
1488    },
1489
1490    % Remove files if compactor is not running
1491    % Otherwise send them to compactor to apply deltas
1492    CompactFiles = lists:foldr(
1493        fun(SortedFile, AccCompactFiles) ->
1494            case CompactorRunning of
1495            true ->
1496                case filename:extension(SortedFile) of
1497                ".compact" ->
1498                     [SortedFile | AccCompactFiles];
1499                _ ->
1500                    SortedFile2 = new_sort_file_name(TmpDir, true),
1501                    ok = file2:rename(SortedFile, SortedFile2),
1502                    [SortedFile2 | AccCompactFiles]
1503                end;
1504            false ->
1505                ok = file2:delete(SortedFile),
1506                AccCompactFiles
1507            end
1508        end, [], [IdFile | ViewFiles]),
1509    {ok, NewGroup2, CleanupCount, NewStats, CompactFiles}.
1510
1511
1512update_seqs(PartIdSeqs, Seqs) ->
1513    orddict:fold(
1514        fun(PartId, NewSeq, Acc) ->
1515            OldSeq = couch_util:get_value(PartId, Acc, 0),
1516            case NewSeq > OldSeq of
1517            true ->
1518                ok;
1519            false ->
1520                exit({error, <<"New seq smaller or equal than old seq.">>, PartId, OldSeq, NewSeq})
1521            end,
1522            orddict:store(PartId, NewSeq, Acc)
1523        end,
1524        Seqs, PartIdSeqs).
1525
1526update_versions(PartVersions, AllPartVersions) ->
1527    lists:ukeymerge(1, PartVersions, AllPartVersions).
1528
1529
1530update_task(NumChanges) ->
1531    [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
1532    Changes2 = Changes + NumChanges,
1533    Total2 = erlang:max(Total, Changes2),
1534    Progress = (Changes2 * 100) div Total2,
1535    couch_task_status:update([
1536        {progress, Progress},
1537        {changes_done, Changes2},
1538        {total_changes, Total2}
1539    ]).
1540
1541
1542checkpoint(#writer_acc{owner = Owner, parent = Parent, group = Group} = Acc) ->
1543    #set_view_group{
1544        set_name = SetName,
1545        name = DDocId,
1546        type = Type
1547    } = Group,
1548    ?LOG_INFO("Updater checkpointing set view `~s` update for ~s group `~s`",
1549              [SetName, Type, DDocId]),
1550    NewGroup = maybe_fix_group(Group),
1551    ok = couch_file:refresh_eof(NewGroup#set_view_group.fd),
1552    Owner ! {partial_update, Parent, self(), NewGroup},
1553    receive
1554    update_processed ->
1555        ok;
1556    stop ->
1557        exit(shutdown)
1558    end,
1559    Acc#writer_acc{group = NewGroup}.
1560
1561
1562maybe_fix_group(#set_view_group{index_header = Header} = Group) ->
1563    receive
1564    {new_passive_partitions, Parts} ->
1565        Bitmask = couch_set_view_util:build_bitmask(Parts),
1566        {Seqs, PartVersions} = couch_set_view_util:fix_partitions(Group, Parts),
1567        Group#set_view_group{
1568            index_header = Header#set_view_index_header{
1569                seqs = Seqs,
1570                pbitmask = ?set_pbitmask(Group) bor Bitmask,
1571                partition_versions = PartVersions
1572            }
1573        }
1574    after 0 ->
1575        Group
1576    end.
1577
1578
1579check_if_compactor_started(#writer_acc{group = Group0} = Acc) ->
1580    receive
1581    {compactor_started, Pid} ->
1582        erlang:put(new_compactor, true),
1583        Group = maybe_fix_group(Group0),
1584        Pid ! {compactor_started_ack, self(), Group},
1585        Acc#writer_acc{compactor_running = true, group = Group}
1586    after 0 ->
1587        Acc
1588    end.
1589
1590
1591init_tmp_files(WriterAcc) ->
1592    #writer_acc{
1593        group = Group, initial_build = Init, tmp_dir = TmpDir
1594    } = WriterAcc,
1595    case WriterAcc#writer_acc.compactor_running of
1596    true ->
1597        ok = couch_set_view_util:delete_sort_files(TmpDir, updater);
1598    false ->
1599        ok = couch_set_view_util:delete_sort_files(TmpDir, all)
1600    end,
1601    Ids = [ids_index | [V#set_view.id_num || V <- Group#set_view_group.views]],
1602    Files = case Init of
1603    true ->
1604        [begin
1605             FileName = new_sort_file_name(WriterAcc),
1606             {ok, Fd} = file2:open(FileName, [raw, append, binary]),
1607             {Id, #set_view_tmp_file_info{fd = Fd, name = FileName}}
1608         end || Id <- Ids];
1609    false ->
1610         [{Id, #set_view_tmp_file_info{}} || Id <- Ids]
1611    end,
1612    WriterAcc#writer_acc{tmp_files = dict:from_list(Files)}.
1613
1614
1615new_sort_file_name(#writer_acc{tmp_dir = TmpDir, compactor_running = Cr}) ->
1616    new_sort_file_name(TmpDir, Cr).
1617
1618new_sort_file_name(TmpDir, true) ->
1619    couch_set_view_util:new_sort_file_path(TmpDir, compactor);
1620new_sort_file_name(TmpDir, false) ->
1621    couch_set_view_util:new_sort_file_path(TmpDir, updater).
1622
1623
1624make_back_index_key(DocId, PartId) ->
1625    <<PartId:16, DocId/binary>>.
1626
1627
1628count_seqs_done(Group, NewSeqs) ->
1629    % NewSeqs might have new passive partitions that Group's seqs doesn't
1630    % have yet (will get them after a checkpoint period).
1631    lists:foldl(
1632        fun({PartId, SeqDone}, Acc) ->
1633            SeqBefore = couch_util:get_value(PartId, ?set_seqs(Group), 0),
1634            Acc + (SeqDone - SeqBefore)
1635        end,
1636        0, NewSeqs).
1637
1638
1639close_tmp_fd(#set_view_tmp_file_info{fd = nil}) ->
1640    ok;
1641close_tmp_fd(#set_view_tmp_file_info{fd = Fd}) ->
1642    ok = file:close(Fd).
1643
1644
1645% DCP introduces the concept of snapshots, where a document mutation is only
1646% guaranteed to be unique within a single snapshot. But the flusher expects
1647% unique mutations within the full batch. Merge multiple snapshots (if there
1648% are any) into a single one. The latest mutation wins.
1649merge_snapshots(KVs) ->
1650    merge_snapshots(KVs, false, []).
1651
1652merge_snapshots([], true, Acc) ->
1653    {true, Acc};
1654merge_snapshots([], false, Acc) ->
1655    % The order of the KVs doesn't really matter, but having them sorted the
1656    % same way will make life easier when debugging
1657    {false, lists:reverse(Acc)};
1658merge_snapshots([snapshot_marker | KVs], _MultipleSnapshots, Acc) ->
1659    merge_snapshots(KVs, true, Acc);
1660merge_snapshots([{part_versions, _} = PartVersions | KVs], MultipleSnapshots, Acc) ->
1661    merge_snapshots(KVs, MultipleSnapshots, [PartVersions | Acc]);
1662merge_snapshots([KV | KVs], true, Acc0) ->
1663    {_Seq, DocId, _PartId, _QueryResults} = KV,
1664    Acc = lists:keystore(DocId, 2, Acc0, KV),
1665    merge_snapshots(KVs, true, Acc);
1666merge_snapshots([KV | KVs], false, Acc) ->
1667    merge_snapshots(KVs, false, [KV | Acc]).
1668