1% -*- Mode: Erlang; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3% Licensed under the Apache License, Version 2.0 (the "License"); you may not
4% use this file except in compliance with the License. You may obtain a copy of
5% the License at
6%
7%   http://www.apache.org/licenses/LICENSE-2.0
8%
9% Unless required by applicable law or agreed to in writing, software
10% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12% License for the specific language governing permissions and limitations under
13% the License.
14
15-module(couch_set_view_updater).
16
17-export([update/6]).
18% Exported for the MapReduce specific stuff
19-export([new_sort_file_name/1]).
20
21-include("couch_db.hrl").
22-include_lib("couch_set_view/include/couch_set_view.hrl").
23-include_lib("couch_dcp/include/couch_dcp.hrl").
24
25-define(MAP_QUEUE_SIZE, 256 * 1024).
26-define(WRITE_QUEUE_SIZE, 512 * 1024).
27% The size of the accumulator the emitted key-values are queued up in before
28% they get actually queued. The bigger the value, the less the lock contention,
29% but the higher the possible memory consumption is.
30-define(QUEUE_ACC_BATCH_SIZE, 256 * 1024).
31
32% incremental updates
33-define(INC_MAX_TMP_FILE_SIZE, 31457280).
34-define(MIN_BATCH_SIZE_PER_VIEW, 65536).
35
36% For file sorter and file merger commands.
37-define(PORT_OPTS,
38        [exit_status, use_stdio, stderr_to_stdout, {line, 4096}, binary]).
39
40-record(writer_acc, {
41    parent,
42    owner,
43    group,
44    last_seqs = orddict:new(),
45    part_versions = orddict:new(),
46    compactor_running,
47    write_queue,
48    initial_build,
49    view_empty_kvs,
50    kvs = [],
51    kvs_size = 0,
52    state = updating_active,
53    final_batch = false,
54    max_seqs,
55    stats = #set_view_updater_stats{},
56    tmp_dir = nil,
57    initial_seqs,
58    max_insert_batch_size,
59    tmp_files = dict:new(),
60    throttle = 0,
61    force_flush = false
62}).
63
64
65-spec update(pid(), #set_view_group{},
66             partition_seqs(), boolean(), string(), [term()]) -> no_return().
67update(Owner, Group, CurSeqs, CompactorRunning, TmpDir, Options) ->
68    #set_view_group{
69        set_name = SetName,
70        type = Type,
71        name = DDocId
72    } = Group,
73
74    ActiveParts = couch_set_view_util:decode_bitmask(?set_abitmask(Group)),
75    PassiveParts = couch_set_view_util:decode_bitmask(?set_pbitmask(Group)),
76    NumChanges = couch_set_view_util:missing_changes_count(CurSeqs, ?set_seqs(Group)),
77
78    process_flag(trap_exit, true),
79
80    BeforeEnterTs = os:timestamp(),
81    Parent = self(),
82    BarrierEntryPid = spawn_link(fun() ->
83        DDocIds = couch_set_view_util:get_ddoc_ids_with_sig(SetName, Group),
84        couch_task_status:add_task([
85            {type, blocked_indexer},
86            {set, SetName},
87            {signature, ?l2b(couch_util:to_hex(Group#set_view_group.sig))},
88            {design_documents, DDocIds},
89            {indexer_type, Type}
90        ]),
91        case Type of
92        main ->
93            ok = couch_index_barrier:enter(couch_main_index_barrier, Parent);
94        replica ->
95            ok = couch_index_barrier:enter(couch_replica_index_barrier, Parent)
96        end,
97        Parent ! {done, self(), (timer:now_diff(os:timestamp(), BeforeEnterTs) / 1000000)},
98        receive shutdown -> ok end
99    end),
100
101    BlockedTime = receive
102    {done, BarrierEntryPid, Duration} ->
103        Duration;
104    {'EXIT', _, Reason} ->
105        exit({updater_error, Reason})
106    end,
107
108    CleanupParts = couch_set_view_util:decode_bitmask(?set_cbitmask(Group)),
109    InitialBuild = couch_set_view_util:is_initial_build(Group),
110    ?LOG_INFO("Updater for set view `~s`, ~s group `~s` started~n"
111              "Active partitions:    ~w~n"
112              "Passive partitions:   ~w~n"
113              "Cleanup partitions:   ~w~n"
114              "Replicas to transfer: ~w~n"
115              "Pending transition:   ~n"
116              "    active:           ~w~n"
117              "    passive:          ~w~n"
118              "    unindexable:      ~w~n"
119              "Initial build:        ~s~n"
120              "Compactor running:    ~s~n"
121              "Min # changes:        ~p~n"
122              "Partition versions:   ~w~n",
123              [SetName, Type, DDocId,
124               ActiveParts,
125               PassiveParts,
126               CleanupParts,
127               ?set_replicas_on_transfer(Group),
128               ?pending_transition_active(?set_pending_transition(Group)),
129               ?pending_transition_passive(?set_pending_transition(Group)),
130               ?pending_transition_unindexable(?set_pending_transition(Group)),
131               InitialBuild,
132               CompactorRunning,
133               NumChanges,
134               ?set_partition_versions(Group)
135              ]),
136
137    WriterAcc0 = #writer_acc{
138        parent = self(),
139        owner = Owner,
140        group = Group,
141        initial_build = InitialBuild,
142        max_seqs = CurSeqs,
143        part_versions = ?set_partition_versions(Group),
144        tmp_dir = TmpDir,
145        max_insert_batch_size = list_to_integer(
146            couch_config:get("set_views", "indexer_max_insert_batch_size", "1048576"))
147    },
148    update(WriterAcc0, ActiveParts, PassiveParts,
149            BlockedTime, BarrierEntryPid, NumChanges, CompactorRunning, Options).
150
151
152update(WriterAcc, ActiveParts, PassiveParts, BlockedTime,
153       BarrierEntryPid, NumChanges, CompactorRunning, Options) ->
154    #writer_acc{
155        owner = Owner,
156        group = Group
157    } = WriterAcc,
158    #set_view_group{
159        set_name = SetName,
160        type = Type,
161        name = DDocId,
162        sig = GroupSig
163    } = Group,
164
165    StartTime = os:timestamp(),
166
167    MapQueueOptions = [{max_size, ?MAP_QUEUE_SIZE}, {max_items, infinity}],
168    WriteQueueOptions = [{max_size, ?WRITE_QUEUE_SIZE}, {max_items, infinity}],
169    {ok, MapQueue} = couch_work_queue:new(MapQueueOptions),
170    {ok, WriteQueue} = couch_work_queue:new(WriteQueueOptions),
171
172    Mapper = spawn_link(fun() ->
173        try
174            do_maps(Group, MapQueue, WriteQueue)
175        catch _:Error ->
176            Stacktrace = erlang:get_stacktrace(),
177            ?LOG_ERROR("Set view `~s`, ~s group `~s`, mapper error~n"
178                "error:      ~p~n"
179                "stacktrace: ~p~n",
180                [SetName, Type, DDocId, Error, Stacktrace]),
181            exit(Error)
182        end
183    end),
184
185    Parent = self(),
186    Writer = spawn_link(fun() ->
187        BarrierEntryPid ! shutdown,
188        ViewEmptyKVs = [{View, []} || View <- Group#set_view_group.views],
189        WriterAcc2 = init_tmp_files(WriterAcc#writer_acc{
190            parent = Parent,
191            group = Group,
192            write_queue = WriteQueue,
193            view_empty_kvs = ViewEmptyKVs,
194            compactor_running = CompactorRunning,
195            initial_seqs = ?set_seqs(Group),
196            throttle = case lists:member(throttle, Options) of
197                true ->
198                    list_to_integer(
199                        couch_config:get("set_views", "throttle_period", "100"));
200                false ->
201                    0
202                end
203        }),
204        ok = couch_set_view_util:open_raw_read_fd(Group),
205        try
206            WriterAcc3 = do_writes(WriterAcc2),
207            receive
208            {doc_loader_finished, {PartVersions0, RealMaxSeqs}} ->
209                WriterAccStats = WriterAcc3#writer_acc.stats,
210                WriterAccGroup = WriterAcc3#writer_acc.group,
211                WriterAccHeader = WriterAccGroup#set_view_group.index_header,
212                PartVersions = lists:ukeymerge(1, PartVersions0,
213                    WriterAccHeader#set_view_index_header.partition_versions),
214                case WriterAcc3#writer_acc.initial_build of
215                true ->
216                    % The doc loader might not load the mutations up to the
217                    % most recent one, but only to a lower one. Update the
218                    % group header and stats with the correct information.
219                    MaxSeqs = lists:ukeymerge(
220                        1, RealMaxSeqs, WriterAccHeader#set_view_index_header.seqs),
221                    Stats = WriterAccStats#set_view_updater_stats{
222                        seqs = lists:sum([S || {_, S} <- RealMaxSeqs])
223                    };
224                false ->
225                    MaxSeqs = WriterAccHeader#set_view_index_header.seqs,
226                    Stats = WriterAcc3#writer_acc.stats
227                end,
228                FinalWriterAcc = WriterAcc3#writer_acc{
229                    stats = Stats,
230                    group = WriterAccGroup#set_view_group{
231                        index_header = WriterAccHeader#set_view_index_header{
232                            partition_versions = PartVersions,
233                            seqs = MaxSeqs
234                        }
235                    }
236                }
237            end,
238            Parent ! {writer_finished, FinalWriterAcc}
239        catch _:Error ->
240            Stacktrace = erlang:get_stacktrace(),
241            ?LOG_ERROR("Set view `~s`, ~s group `~s`, writer error~n"
242                "error:      ~p~n"
243                "stacktrace: ~p~n",
244                [SetName, Type, DDocId, Error, Stacktrace]),
245            exit(Error)
246        after
247            ok = couch_set_view_util:close_raw_read_fd(Group)
248        end
249    end),
250
251    InitialBuild = WriterAcc#writer_acc.initial_build,
252    NumChanges2 = case InitialBuild of
253        true ->
254            couch_set_view_updater_helper:count_items_from_set(Group, ActiveParts ++ PassiveParts);
255        false ->
256            NumChanges
257    end,
258
259    DocLoader = spawn_link(fun() ->
260        DDocIds = couch_set_view_util:get_ddoc_ids_with_sig(SetName, Group),
261        couch_task_status:add_task([
262            {type, indexer},
263            {set, SetName},
264            {signature, ?l2b(couch_util:to_hex(GroupSig))},
265            {design_documents, DDocIds},
266            {indexer_type, Type},
267            {progress, 0},
268            {changes_done, 0},
269            {initial_build, InitialBuild},
270            {total_changes, NumChanges2}
271        ]),
272        couch_task_status:set_update_frequency(5000),
273        case lists:member(pause, Options) of
274        true ->
275            % For reliable unit testing, to verify that adding new partitions
276            % to the passive state doesn't restart the updater and the updater
277            % can be aware of it and index these new partitions in the same run.
278            receive continue -> ok end;
279        false ->
280            ok
281        end,
282        try
283            {PartVersions, MaxSeqs} = load_changes(
284                Owner, Parent, Group, MapQueue, ActiveParts, PassiveParts,
285                WriterAcc#writer_acc.max_seqs, WriterAcc#writer_acc.initial_build),
286            Parent ! {doc_loader_finished, {PartVersions, MaxSeqs}}
287        catch
288        throw:purge ->
289            exit(purge);
290        throw:{rollback, RollbackSeqs} ->
291            exit({rollback, RollbackSeqs});
292        _:Error ->
293            Stacktrace = erlang:get_stacktrace(),
294            ?LOG_ERROR("Set view `~s`, ~s group `~s`, doc loader error~n"
295                "error:      ~p~n"
296                "stacktrace: ~p~n",
297                [SetName, Type, DDocId, Error, Stacktrace]),
298            exit(Error)
299        end,
300        % Since updater progress stats is added from docloader,
301        % this process has to stay till updater has completed.
302        receive
303        updater_finished ->
304            ok
305        end
306    end),
307
308    Result = wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, Group),
309    case Type of
310    main ->
311        ok = couch_index_barrier:leave(couch_main_index_barrier);
312    replica ->
313        ok = couch_index_barrier:leave(couch_replica_index_barrier)
314    end,
315    case Result of
316    {updater_finished, #set_view_updater_result{group = NewGroup}} ->
317        ?LOG_DEBUG("Updater for ~s set view group `~s`, set `~s`, writer finished:~n"
318                   "  start seqs: ~w~n"
319                   "  end seqs:   ~w~n",
320                   [Type, DDocId, SetName, ?set_seqs(Group), ?set_seqs(NewGroup)]);
321    _ ->
322        ok
323    end,
324    DocLoader ! updater_finished,
325    exit(Result).
326
327
328wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup) ->
329    #set_view_group{set_name = SetName, name = DDocId, type = Type} = OldGroup,
330    receive
331    {new_passive_partitions, _} = NewPassivePartitions ->
332        Writer ! NewPassivePartitions,
333        DocLoader ! NewPassivePartitions,
334        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
335    continue ->
336        % Used by unit tests.
337        DocLoader ! continue,
338        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
339    {doc_loader_finished, {PartVersions, MaxSeqs}} ->
340        Writer ! {doc_loader_finished, {PartVersions, MaxSeqs}},
341        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
342    {writer_finished, WriterAcc} ->
343        Stats0 = WriterAcc#writer_acc.stats,
344        Result = #set_view_updater_result{
345            group = WriterAcc#writer_acc.group,
346            state = WriterAcc#writer_acc.state,
347            stats = Stats0#set_view_updater_stats{
348                indexing_time = timer:now_diff(os:timestamp(), StartTime) / 1000000,
349                blocked_time = BlockedTime
350            },
351            tmp_file = case WriterAcc#writer_acc.initial_build of
352                true ->
353                    dict:fetch(build_file, WriterAcc#writer_acc.tmp_files);
354                false ->
355                    ""
356                end
357        },
358        {updater_finished, Result};
359    {compactor_started, Pid, Ref} ->
360        ?LOG_INFO("Set view `~s`, ~s group `~s`, updater received "
361                  "compactor ~p notification, ref ~p, writer ~p",
362                   [SetName, Type, DDocId, Pid, Ref, Writer]),
363        Writer ! {compactor_started, self()},
364        erlang:put(compactor_pid, {Pid, Ref}),
365        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
366    {compactor_started_ack, Writer, GroupSnapshot} ->
367        ?LOG_INFO("Set view `~s`, ~s group `~s`, updater received compaction ack"
368                  " from writer ~p", [SetName, Type, DDocId, Writer]),
369        case erlang:erase(compactor_pid) of
370        {Pid, Ref} ->
371            Pid ! {Ref, {ok, GroupSnapshot}};
372        undefined ->
373            ok
374        end,
375        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
376    {'EXIT', _, Reason} when Reason =/= normal ->
377        couch_util:shutdown_sync(DocLoader),
378        couch_util:shutdown_sync(Mapper),
379        couch_util:shutdown_sync(Writer),
380        {updater_error, Reason};
381    {native_updater_start, Writer} ->
382        % We need control over spawning native updater process
383        % This helps to terminate native os processes correctly
384        Writer ! {ok, native_updater_start},
385        receive
386        {native_updater_pid, NativeUpdater} ->
387            erlang:put(native_updater, NativeUpdater)
388        end,
389        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
390    stop ->
391        case erlang:erase(native_updater) of
392        undefined ->
393            couch_util:shutdown_sync(DocLoader),
394            couch_util:shutdown_sync(Mapper),
395            couch_util:shutdown_sync(Writer);
396        NativeUpdater ->
397            MRef = erlang:monitor(process, NativeUpdater),
398            NativeUpdater ! stop,
399            receive
400            {'DOWN', MRef, process, NativeUpdater, _} ->
401                couch_util:shutdown_sync(DocLoader),
402                couch_util:shutdown_sync(Mapper),
403                couch_util:shutdown_sync(Writer)
404            end
405        end,
406        exit({updater_error, shutdown})
407    end.
408
409
410dcp_marker_to_string(Type) ->
411    case Type band ?DCP_SNAPSHOT_TYPE_MASK of
412    ?DCP_SNAPSHOT_TYPE_DISK ->
413        "on-disk";
414    ?DCP_SNAPSHOT_TYPE_MEMORY ->
415        "in-memory";
416    _ ->
417        io_lib:format("unknown (~w)", [Type])
418    end.
419
420
421load_changes(Owner, Updater, Group, MapQueue, ActiveParts, PassiveParts,
422        EndSeqs, InitialBuild) ->
423    #set_view_group{
424        set_name = SetName,
425        name = DDocId,
426        type = GroupType,
427        index_header = #set_view_index_header{
428            seqs = SinceSeqs,
429            partition_versions = PartVersions0
430        },
431        dcp_pid = DcpPid,
432        category = Category
433    } = Group,
434
435    MaxDocSize = list_to_integer(
436        couch_config:get("set_views", "indexer_max_doc_size", "0")),
437    FoldFun = fun({PartId, EndSeq}, {AccCount, AccSeqs, AccVersions, AccRollbacks}) ->
438        case couch_set_view_util:has_part_seq(PartId, ?set_unindexable_seqs(Group))
439            andalso not lists:member(PartId, ?set_replicas_on_transfer(Group)) of
440        true ->
441            {AccCount, AccSeqs, AccVersions, AccRollbacks};
442        false ->
443            Since = couch_util:get_value(PartId, SinceSeqs, 0),
444            PartVersions = couch_util:get_value(PartId, AccVersions),
445            Flags = case InitialBuild of
446            true ->
447                ?DCP_FLAG_DISKONLY;
448            false ->
449                ?DCP_FLAG_NOFLAG
450            end,
451            % For stream request from 0, If a vbucket got reset in the window
452            % of time between seqno was obtained from stats and stream request
453            % was made, the end_seqno may be higher than current vb high seqno.
454            % Use a special flag to tell server to set end_seqno.
455            Flags2 = case PartVersions of
456            [{0, 0}] ->
457                Flags bor ?DCP_FLAG_USELATEST_ENDSEQNO;
458            _ ->
459                Flags
460            end,
461            case AccRollbacks of
462            [] ->
463                case EndSeq =:= Since of
464                true ->
465                    {AccCount, AccSeqs, AccVersions, AccRollbacks};
466                false ->
467                    ChangesWrapper = fun
468                        ({part_versions, _} = NewVersions, Acc) ->
469                            queue_doc(
470                                NewVersions, MapQueue, Group,
471                                MaxDocSize, InitialBuild),
472                            Acc;
473                        ({snapshot_marker, {MarkerStartSeq, MarkerEndSeq, MarkerType}}, {Count, _})
474                            when MarkerType band ?DCP_SNAPSHOT_TYPE_MASK =/= 0 ->
475                            ?LOG_INFO(
476                                "set view `~s`, ~s (~s) group `~s`: received "
477                                "a snapshot marker (~s) for partition ~p from "
478                                "sequence ~p to ~p",
479                                [SetName, GroupType, Category, DDocId,
480                                    dcp_marker_to_string(MarkerType),
481                                    PartId, MarkerStartSeq, MarkerEndSeq]),
482                            case Count of
483                            % Ignore the snapshot marker that is at the
484                            % beginning of the stream.
485                            % If it wasn't ignored, it would lead to an
486                            % additional forced flush which isn't needed. A
487                            % flush is needed if there are several mutations
488                            % of the same document within one batch. As two
489                            % different partitions can't contain the same
490                            % document ID, we are safe to not force flushing
491                            % between two partitions.
492                            0 ->
493                                {Count, MarkerEndSeq};
494                            _ ->
495                                queue_doc(
496                                    snapshot_marker, MapQueue, Group,
497                                    MaxDocSize, InitialBuild),
498                                {Count, MarkerEndSeq}
499                            end;
500                        ({snapshot_marker, {MarkerStartSeq, MarkerEndSeq, MarkerType}}, Acc) ->
501                            ?LOG_ERROR(
502                                "set view `~s`, ~s (~s) group `~s`: received "
503                                "a snapshot marker (~s) for partition ~p from "
504                                "sequence ~p to ~p",
505                                [SetName, GroupType, Category, DDocId,
506                                    dcp_marker_to_string(MarkerType),
507                                    PartId, MarkerStartSeq, MarkerEndSeq]),
508                            throw({error, unknown_snapshot_marker, MarkerType}),
509                            Acc;
510                        (#dcp_doc{} = Item, {Count, AccEndSeq}) ->
511                            queue_doc(
512                                Item, MapQueue, Group,
513                                MaxDocSize, InitialBuild),
514                            {Count + 1, AccEndSeq}
515                        end,
516                    Result = couch_dcp_client:enum_docs_since(
517                        DcpPid, PartId, PartVersions, Since, EndSeq, Flags2,
518                        ChangesWrapper, {0, 0}),
519                    case Result of
520                    {ok, {AccCount2, AccEndSeq}, NewPartVersions} ->
521                        AccSeqs2 = orddict:store(PartId, AccEndSeq, AccSeqs),
522                        AccVersions2 = lists:ukeymerge(
523                            1, [{PartId, NewPartVersions}], AccVersions),
524                        AccRollbacks2 = AccRollbacks;
525                    {rollback, RollbackSeq} ->
526                        AccCount2 = AccCount,
527                        AccSeqs2 = AccSeqs,
528                        AccVersions2 = AccVersions,
529                        AccRollbacks2 = ordsets:add_element(
530                            {PartId, RollbackSeq}, AccRollbacks);
531                    Error ->
532                        AccCount2 = AccCount,
533                        AccSeqs2 = AccSeqs,
534                        AccVersions2 = AccVersions,
535                        AccRollbacks2 = AccRollbacks,
536                        ?LOG_ERROR("set view `~s`, ~s (~s) group `~s` error"
537                            "while loading changes for partition ~p:~n~p~n",
538                            [SetName, GroupType, Category, DDocId, PartId,
539                                Error]),
540                        throw(Error)
541                    end,
542                    {AccCount2, AccSeqs2, AccVersions2, AccRollbacks2}
543                end;
544            _ ->
545                % If there is a rollback needed, don't store any new documents
546                % in the index, but just check for a rollback of another
547                % partition (i.e. a request with start seq == end seq)
548                ChangesWrapper = fun(_, _) -> ok end,
549                Result = couch_dcp_client:enum_docs_since(
550                    DcpPid, PartId, PartVersions, Since, Since, Flags2,
551                    ChangesWrapper, ok),
552                case Result of
553                {ok, _, _} ->
554                    AccRollbacks2 = AccRollbacks;
555                {rollback, RollbackSeq} ->
556                    AccRollbacks2 = ordsets:add_element(
557                        {PartId, RollbackSeq}, AccRollbacks)
558                end,
559                {AccCount, AccSeqs, AccVersions, AccRollbacks2}
560            end
561        end
562    end,
563
564    notify_owner(Owner, {state, updating_active}, Updater),
565    case ActiveParts of
566    [] ->
567        ActiveChangesCount = 0,
568        MaxSeqs = orddict:new(),
569        PartVersions = PartVersions0,
570        Rollbacks = [];
571    _ ->
572        ?LOG_INFO("Updater reading changes from active partitions to "
573                  "update ~s set view group `~s` from set `~s`",
574                  [GroupType, DDocId, SetName]),
575        {ActiveChangesCount, MaxSeqs, PartVersions, Rollbacks} = lists:foldl(
576            FoldFun, {0, orddict:new(), PartVersions0, ordsets:new()},
577            couch_set_view_util:filter_seqs(ActiveParts, EndSeqs))
578    end,
579    case PassiveParts of
580    [] ->
581        FinalChangesCount = ActiveChangesCount,
582        MaxSeqs2 = MaxSeqs,
583        PartVersions2 = PartVersions,
584        Rollbacks2 = Rollbacks;
585    _ ->
586        ?LOG_INFO("Updater reading changes from passive partitions to "
587                  "update ~s set view group `~s` from set `~s`",
588                  [GroupType, DDocId, SetName]),
589        {FinalChangesCount, MaxSeqs2, PartVersions2, Rollbacks2} = lists:foldl(
590            FoldFun, {ActiveChangesCount, MaxSeqs, PartVersions, Rollbacks},
591            couch_set_view_util:filter_seqs(PassiveParts, EndSeqs))
592    end,
593    {FinalChangesCount3, MaxSeqs3, PartVersions3, Rollbacks3} =
594        load_changes_from_passive_parts_in_mailbox(DcpPid,
595            Group, FoldFun, FinalChangesCount, MaxSeqs2, PartVersions2, Rollbacks2),
596
597    case Rollbacks3 of
598    [] ->
599        ok;
600    _ ->
601        throw({rollback, Rollbacks3})
602    end,
603
604    couch_work_queue:close(MapQueue),
605    ?LOG_INFO("Updater for ~s set view group `~s`, set `~s`, read a total of ~p changes",
606              [GroupType, DDocId, SetName, FinalChangesCount3]),
607    ?LOG_DEBUG("Updater for ~s set view group `~s`, set `~s`, max partition seqs found:~n~w",
608               [GroupType, DDocId, SetName, MaxSeqs3]),
609    {PartVersions3, MaxSeqs3}.
610
611
612load_changes_from_passive_parts_in_mailbox(DcpPid,
613        Group, FoldFun, ChangesCount, MaxSeqs0, PartVersions0, Rollbacks) ->
614    #set_view_group{
615        set_name = SetName,
616        name = DDocId,
617        type = GroupType
618    } = Group,
619    receive
620    {new_passive_partitions, Parts0} ->
621        Parts = get_more_passive_partitions(Parts0),
622        AddPartVersions = [{P, [{0, 0}]} || P <- Parts],
623        {ok, AddMaxSeqs} = couch_dcp_client:get_seqs(DcpPid, Parts),
624        PartVersions = lists:ukeymerge(1, AddPartVersions, PartVersions0),
625
626        MaxSeqs = lists:ukeymerge(1, AddMaxSeqs, MaxSeqs0),
627        ?LOG_INFO("Updater reading changes from new passive partitions ~w to "
628                  "update ~s set view group `~s` from set `~s`",
629                  [Parts, GroupType, DDocId, SetName]),
630        {ChangesCount2, MaxSeqs2, PartVersions2, Rollbacks2} = lists:foldl(
631            FoldFun, {ChangesCount, MaxSeqs, PartVersions, Rollbacks}, AddMaxSeqs),
632        load_changes_from_passive_parts_in_mailbox(DcpPid,
633            Group, FoldFun, ChangesCount2, MaxSeqs2, PartVersions2, Rollbacks2)
634    after 0 ->
635        {ChangesCount, MaxSeqs0, PartVersions0, Rollbacks}
636    end.
637
638
639get_more_passive_partitions(Parts) ->
640    receive
641    {new_passive_partitions, Parts2} ->
642        get_more_passive_partitions(Parts ++ Parts2)
643    after 0 ->
644        lists:sort(Parts)
645    end.
646
647
648notify_owner(Owner, Msg, UpdaterPid) ->
649    Owner ! {updater_info, UpdaterPid, Msg}.
650
651
652queue_doc(snapshot_marker, MapQueue, _Group, _MaxDocSize, _InitialBuild) ->
653    couch_work_queue:queue(MapQueue, snapshot_marker);
654queue_doc({part_versions, _} = PartVersions, MapQueue, _Group, _MaxDocSize,
655    _InitialBuild) ->
656    couch_work_queue:queue(MapQueue, PartVersions);
657queue_doc(Doc, MapQueue, Group, MaxDocSize, InitialBuild) ->
658    case Doc#dcp_doc.deleted of
659    true when InitialBuild ->
660        Entry = nil;
661    true ->
662        Entry = Doc;
663    false ->
664        #set_view_group{
665           set_name = SetName,
666           name = DDocId,
667           type = GroupType
668        } = Group,
669        case couch_util:validate_utf8(Doc#dcp_doc.id) of
670        true ->
671            case (MaxDocSize > 0) andalso
672                (iolist_size(Doc#dcp_doc.body) > MaxDocSize) of
673            true ->
674                ?LOG_MAPREDUCE_ERROR("Bucket `~s`, ~s group `~s`, skipping "
675                    "document with ID `~s`: too large body (~p bytes)",
676                    [SetName, GroupType, DDocId,
677                     ?b2l(Doc#dcp_doc.id), iolist_size(Doc#dcp_doc.body)]),
678                Entry = Doc#dcp_doc{deleted = true};
679            false ->
680                Entry = Doc
681            end;
682        false ->
683            % If the id isn't utf8 (memcached allows it), then log an error
684            % message and skip the doc. Send it through the queue anyway
685            % so we record the high seq num in case there are a bunch of
686            % these at the end, we want to keep track of the high seq and
687            % not reprocess again.
688            ?LOG_MAPREDUCE_ERROR("Bucket `~s`, ~s group `~s`, skipping "
689                "document with non-utf8 id. Doc id bytes: ~w",
690                [SetName, GroupType, DDocId, ?b2l(Doc#dcp_doc.id)]),
691            Entry = Doc#dcp_doc{deleted = true}
692        end
693    end,
694    case Entry of
695    nil ->
696        ok;
697    _ ->
698        couch_work_queue:queue(MapQueue, Entry),
699        update_task(1)
700    end.
701
702
703do_maps(Group, MapQueue, WriteQueue) ->
704    #set_view_group{
705        set_name = SetName,
706        name = DDocId,
707        type = Type,
708        mod = Mod
709    } = Group,
710    case couch_work_queue:dequeue(MapQueue) of
711    closed ->
712        couch_work_queue:close(WriteQueue);
713    {ok, Queue, _QueueSize} ->
714        ViewCount = length(Group#set_view_group.views),
715        {Items, _} = lists:foldl(
716            fun(#dcp_doc{deleted = true} = DcpDoc, {Acc, Size}) ->
717                #dcp_doc{
718                    id = Id,
719                    partition = PartId,
720                    seq = Seq
721                } = DcpDoc,
722                Item = {Seq, Id, PartId, []},
723                {[Item | Acc], Size};
724            (#dcp_doc{deleted = false} = DcpDoc, {Acc0, Size0}) ->
725                % When there are a lot of emits per document the memory can
726                % grow almost indefinitely as the queue size is only limited
727                % by the number of documents and not their emits.
728                % In case the accumulator grows huge, queue the items early
729                % into the writer queue. Only take emits into account, the
730                % other cases in this `foldl` won't ever have an significant
731                % size.
732                {Acc, Size} = case Size0 > ?QUEUE_ACC_BATCH_SIZE of
733                true ->
734                    couch_work_queue:queue(WriteQueue, lists:reverse(Acc0)),
735                    {[], 0};
736                false ->
737                    {Acc0, Size0}
738                end,
739                #dcp_doc{
740                    id = Id,
741                    body = Body,
742                    partition = PartId,
743                    rev_seq = RevSeq,
744                    seq = Seq,
745                    cas = Cas,
746                    expiration = Expiration,
747                    flags = Flags,
748                    data_type = DcpDataType
749                } = DcpDoc,
750                DataType = case DcpDataType of
751                ?DCP_DATA_TYPE_RAW ->
752                    ?CONTENT_META_NON_JSON_MODE;
753                ?DCP_DATA_TYPE_JSON ->
754                    ?CONTENT_META_JSON
755                end,
756                Doc = #doc{
757                    id = Id,
758                    rev = {RevSeq, <<Cas:64, Expiration:32, Flags:32>>},
759                    body = Body,
760                    content_meta = DataType,
761                    deleted = false
762                },
763                try
764                    {ok, Result, LogList} = couch_set_view_mapreduce:map(
765                        Doc, PartId, Seq, Group),
766                    {Result2, _} = lists:foldr(
767                        fun({error, Reason}, {AccRes, Pos}) ->
768                            ErrorMsg = "Bucket `~s`, ~s group `~s`, error mapping"
769                                    " document `~s` for view `~s`: ~s",
770                            ViewName = Mod:view_name(Group, Pos),
771                            Args = [SetName, Type, DDocId, Id, ViewName,
772                                    couch_util:to_binary(Reason)],
773                            ?LOG_MAPREDUCE_ERROR(ErrorMsg, Args),
774                            {[[] | AccRes], Pos - 1};
775                        (KVs, {AccRes, Pos}) ->
776                            {[KVs | AccRes], Pos - 1}
777                        end,
778                        {[], ViewCount}, Result),
779                    lists:foreach(
780                        fun(Msg) ->
781                            DebugMsg = "Bucket `~s`, ~s group `~s`, map function"
782                                " log for document `~s`: ~s",
783                            Args = [SetName, Type, DDocId, Id, binary_to_list(Msg)],
784                            ?LOG_MAPREDUCE_ERROR(DebugMsg, Args)
785                        end, LogList),
786                    Item = {Seq, Id, PartId, Result2},
787                    {[Item | Acc], Size + erlang:external_size(Result2)}
788                catch _:{error, Reason} ->
789                    ErrorMsg = "Bucket `~s`, ~s group `~s`, error mapping document `~s`: ~s",
790                    Args = [SetName, Type, DDocId, Id, couch_util:to_binary(Reason)],
791                    ?LOG_MAPREDUCE_ERROR(ErrorMsg, Args),
792                    {[{Seq, Id, PartId, []} | Acc], Size}
793                end;
794            (snapshot_marker, {Acc, Size}) ->
795                {[snapshot_marker | Acc], Size};
796            ({part_versions, _} = PartVersions, {Acc, Size}) ->
797                {[PartVersions | Acc], Size}
798            end,
799            {[], 0}, Queue),
800        ok = couch_work_queue:queue(WriteQueue, lists:reverse(Items)),
801        do_maps(Group, MapQueue, WriteQueue)
802    end.
803
804
805do_writes(Acc) ->
806    #writer_acc{
807        kvs = Kvs,
808        kvs_size = KvsSize,
809        write_queue = WriteQueue,
810        throttle = Throttle
811    } = Acc,
812    ok = timer:sleep(Throttle),
813    case couch_work_queue:dequeue(WriteQueue) of
814    closed ->
815        flush_writes(Acc#writer_acc{final_batch = true});
816    {ok, Queue0, QueueSize} ->
817        Queue = lists:flatten(Queue0),
818        Kvs2 = Kvs ++ Queue,
819        KvsSize2 = KvsSize + QueueSize,
820
821        Acc2 = Acc#writer_acc{
822            kvs = Kvs2,
823            kvs_size = KvsSize2
824        },
825        case should_flush_writes(Acc2) of
826        true ->
827            Acc3 = flush_writes(Acc2),
828            Acc4 = Acc3#writer_acc{kvs = [], kvs_size = 0};
829        false ->
830            Acc4 = Acc2
831        end,
832        do_writes(Acc4)
833    end.
834
835should_flush_writes(Acc) ->
836    #writer_acc{
837        view_empty_kvs = ViewEmptyKvs,
838        kvs_size = KvsSize
839    } = Acc,
840    KvsSize >= (?MIN_BATCH_SIZE_PER_VIEW * length(ViewEmptyKvs)).
841
842
843flush_writes(#writer_acc{kvs = [], initial_build = false} = Acc) ->
844    Acc2 = maybe_update_btrees(Acc),
845    checkpoint(Acc2);
846
847flush_writes(#writer_acc{initial_build = false} = Acc0) ->
848    #writer_acc{
849        kvs = Kvs,
850        view_empty_kvs = ViewEmptyKVs,
851        group = Group,
852        parent = Parent,
853        owner = Owner,
854        final_batch = IsFinalBatch,
855        part_versions = PartVersions
856    } = Acc0,
857    Mod = Group#set_view_group.mod,
858    % Only incremental updates can contain multiple snapshots
859    {MultipleSnapshots, Kvs2} = merge_snapshots(Kvs),
860    Acc1 = case MultipleSnapshots of
861    true ->
862        Acc2 = maybe_update_btrees(Acc0#writer_acc{force_flush = true}),
863        checkpoint(Acc2);
864    false ->
865        Acc0
866    end,
867    #writer_acc{last_seqs = LastSeqs} = Acc1,
868    {ViewKVs, DocIdViewIdKeys, NewLastSeqs, NewPartVersions} =
869        process_map_results(Mod, Kvs2, ViewEmptyKVs, LastSeqs, PartVersions),
870    Acc3 = Acc1#writer_acc{last_seqs = NewLastSeqs, part_versions = NewPartVersions},
871    Acc4 = write_to_tmp_batch_files(ViewKVs, DocIdViewIdKeys, Acc3),
872    #writer_acc{group = NewGroup} = Acc4,
873    case ?set_seqs(NewGroup) =/= ?set_seqs(Group) of
874    true ->
875        Acc5 = checkpoint(Acc4),
876        case (Acc4#writer_acc.state =:= updating_active) andalso
877            lists:any(fun({PartId, _}) ->
878                ((1 bsl PartId) band ?set_pbitmask(Group) =/= 0)
879            end, NewLastSeqs) of
880        true ->
881            notify_owner(Owner, {state, updating_passive}, Parent),
882            Acc5#writer_acc{state = updating_passive};
883        false ->
884            Acc5
885        end;
886    false ->
887        case IsFinalBatch of
888        true ->
889            checkpoint(Acc4);
890        false ->
891            Acc4
892        end
893    end;
894
895flush_writes(#writer_acc{initial_build = true} = WriterAcc) ->
896    #writer_acc{
897        kvs = Kvs,
898        view_empty_kvs = ViewEmptyKVs,
899        tmp_files = TmpFiles,
900        tmp_dir = TmpDir,
901        group = Group,
902        final_batch = IsFinalBatch,
903        max_seqs = MaxSeqs,
904        part_versions = PartVersions,
905        stats = Stats
906    } = WriterAcc,
907    #set_view_group{
908        set_name = SetName,
909        type = Type,
910        name = DDocId,
911        mod = Mod
912    } = Group,
913    {ViewKVs, DocIdViewIdKeys, MaxSeqs2, PartVersions2} = process_map_results(
914        Mod, Kvs, ViewEmptyKVs, MaxSeqs, PartVersions),
915
916    IdRecords = lists:foldr(
917        fun({_DocId, {_PartId, []}}, Acc) ->
918                Acc;
919            (Kv, Acc) ->
920                [{KeyBin, ValBin}] = Mod:convert_back_index_kvs_to_binary([Kv], []),
921                KvBin = [<<(byte_size(KeyBin)):16>>, KeyBin, ValBin],
922                [[<<(iolist_size(KvBin)):32/native>>, KvBin] | Acc]
923        end,
924        [], DocIdViewIdKeys),
925    #set_view_tmp_file_info{fd = IdFd} = dict:fetch(ids_index, TmpFiles),
926    ok = file:write(IdFd, IdRecords),
927
928    {InsertKVCount, TmpFiles2} = Mod:write_kvs(Group, TmpFiles, ViewKVs),
929
930    Stats2 = Stats#set_view_updater_stats{
931        inserted_kvs = Stats#set_view_updater_stats.inserted_kvs + InsertKVCount,
932        inserted_ids = Stats#set_view_updater_stats.inserted_ids + length(DocIdViewIdKeys)
933    },
934    case IsFinalBatch of
935    false ->
936        WriterAcc#writer_acc{
937            max_seqs = MaxSeqs2,
938            stats = Stats2,
939            tmp_files = TmpFiles2
940        };
941    true ->
942        ?LOG_INFO("Updater for set view `~s`, ~s group `~s`, starting btree "
943                  "build phase" , [SetName, Type, DDocId]),
944        {Group2, BuildFd} = Mod:finish_build(Group, TmpFiles2, TmpDir),
945        WriterAcc#writer_acc{
946            tmp_files = dict:store(build_file, BuildFd, TmpFiles2),
947            max_seqs = MaxSeqs2,
948            part_versions = PartVersions2,
949            stats = Stats2,
950            group = Group2
951        }
952    end.
953
954
955process_map_results(Mod, Kvs, ViewEmptyKVs, PartSeqs, PartVersions) ->
956    lists:foldl(
957        fun({Seq, DocId, PartId, []}, {ViewKVsAcc, DocIdViewIdKeysAcc, PartIdSeqs, PartIdVersions}) ->
958            PartIdSeqs2 = update_part_seq(Seq, PartId, PartIdSeqs),
959            {ViewKVsAcc, [{DocId, {PartId, []}} | DocIdViewIdKeysAcc], PartIdSeqs2, PartIdVersions};
960        ({Seq, DocId, PartId, QueryResults}, {ViewKVsAcc, DocIdViewIdKeysAcc, PartIdSeqs, PartIdVersions}) ->
961            {NewViewKVs, NewViewIdKeys} = Mod:view_insert_doc_query_results(
962                    DocId, PartId, QueryResults, ViewKVsAcc, [], []),
963            PartIdSeqs2 = update_part_seq(Seq, PartId, PartIdSeqs),
964            {NewViewKVs, [{DocId, {PartId, NewViewIdKeys}} | DocIdViewIdKeysAcc], PartIdSeqs2, PartIdVersions};
965        (snapshot_marker, _Acc) ->
966            throw({error,
967                <<"The multiple snapshots should have been merged, "
968                  "but theu weren't">>});
969        ({part_versions, {PartId, NewVersions}}, {ViewKVsAcc, DocIdViewIdKeysAcc, PartIdSeqs, PartIdVersions}) ->
970            PartIdVersions2 = update_part_versions(NewVersions, PartId, PartIdVersions),
971            {ViewKVsAcc, DocIdViewIdKeysAcc, PartIdSeqs, PartIdVersions2}
972        end,
973        {ViewEmptyKVs, [], PartSeqs, PartVersions}, Kvs).
974
975
976-spec update_transferred_replicas(#set_view_group{},
977                                  partition_seqs(),
978                                  partition_seqs()) -> #set_view_group{}.
979update_transferred_replicas(Group, _MaxSeqs, _PartIdSeqs) when ?set_replicas_on_transfer(Group) =:= [] ->
980    Group;
981update_transferred_replicas(Group, MaxSeqs, PartIdSeqs) ->
982    #set_view_group{index_header = Header} = Group,
983    RepsTransferred = lists:foldl(
984        fun({PartId, Seq}, A) ->
985            case lists:member(PartId, ?set_replicas_on_transfer(Group))
986                andalso (Seq >= couch_set_view_util:get_part_seq(PartId, MaxSeqs)) of
987            true ->
988                ordsets:add_element(PartId, A);
989            false ->
990                A
991            end
992        end,
993        ordsets:new(), PartIdSeqs),
994    ReplicasOnTransfer2 = ordsets:subtract(?set_replicas_on_transfer(Group), RepsTransferred),
995    {Abitmask2, Pbitmask2} = lists:foldl(
996        fun(Id, {A, P}) ->
997            Mask = 1 bsl Id,
998            Mask = ?set_pbitmask(Group) band Mask,
999            0 = ?set_abitmask(Group) band Mask,
1000            {A bor Mask, P bxor Mask}
1001        end,
1002        {?set_abitmask(Group), ?set_pbitmask(Group)},
1003        RepsTransferred),
1004    Group#set_view_group{
1005        index_header = Header#set_view_index_header{
1006            abitmask = Abitmask2,
1007            pbitmask = Pbitmask2,
1008            replicas_on_transfer = ReplicasOnTransfer2
1009        }
1010    }.
1011
1012
1013-spec update_part_seq(update_seq(), partition_id(), partition_seqs()) -> partition_seqs().
1014update_part_seq(Seq, PartId, Acc) ->
1015    case couch_set_view_util:find_part_seq(PartId, Acc) of
1016    {ok, Max} when Max >= Seq ->
1017        Acc;
1018    _ ->
1019        orddict:store(PartId, Seq, Acc)
1020    end.
1021
1022
1023-spec update_part_versions(partition_version(), partition_id(), partition_versions()) ->
1024    partition_versions().
1025update_part_versions(NewVersions, PartId, PartVersions) ->
1026    orddict:store(PartId, NewVersions, PartVersions).
1027
1028
1029% Incremental updates.
1030write_to_tmp_batch_files(ViewKeyValuesToAdd, DocIdViewIdKeys, WriterAcc) ->
1031    #writer_acc{
1032        tmp_files = TmpFiles,
1033        group = #set_view_group{
1034            id_btree = IdBtree,
1035            mod = Mod
1036        }
1037    } = WriterAcc,
1038
1039    {AddDocIdViewIdKeys0, RemoveDocIds, LookupDocIds} = lists:foldr(
1040        fun({DocId, {PartId, [] = _ViewIdKeys}}, {A, B, C}) ->
1041                BackKey = make_back_index_key(DocId, PartId),
1042                case is_new_partition(PartId, WriterAcc) of
1043                true ->
1044                    {A, [BackKey | B], C};
1045                false ->
1046                    {A, [BackKey | B], [BackKey | C]}
1047                end;
1048            ({DocId, {PartId, _ViewIdKeys}} = KvPairs, {A, B, C}) ->
1049                BackKey = make_back_index_key(DocId, PartId),
1050                case is_new_partition(PartId, WriterAcc) of
1051                true ->
1052                    {[KvPairs | A], B, C};
1053                false ->
1054                    {[KvPairs | A], B, [BackKey | C]}
1055                end
1056        end,
1057        {[], [], []}, DocIdViewIdKeys),
1058
1059    AddDocIdViewIdKeys = Mod:convert_back_index_kvs_to_binary(
1060        AddDocIdViewIdKeys0, []),
1061
1062    IdsData1 = lists:map(
1063        fun(K) -> couch_set_view_updater_helper:encode_op(remove, K) end,
1064        RemoveDocIds),
1065
1066    IdsData2 = lists:foldl(
1067        fun({K, V}, Acc) ->
1068            Bin = couch_set_view_updater_helper:encode_op(insert, K, V),
1069            [Bin | Acc]
1070        end,
1071        IdsData1,
1072        AddDocIdViewIdKeys),
1073
1074    IdTmpFileInfo = dict:fetch(ids_index, TmpFiles),
1075    case IdTmpFileInfo of
1076    #set_view_tmp_file_info{fd = nil} ->
1077        0 = IdTmpFileInfo#set_view_tmp_file_info.size,
1078        IdTmpFilePath = new_sort_file_name(WriterAcc),
1079        {ok, IdTmpFileFd} = file2:open(IdTmpFilePath, [raw, append, binary]),
1080        IdTmpFileSize = 0;
1081    #set_view_tmp_file_info{
1082            fd = IdTmpFileFd, name = IdTmpFilePath, size = IdTmpFileSize} ->
1083        ok
1084    end,
1085
1086    ok = file:write(IdTmpFileFd, IdsData2),
1087
1088    IdTmpFileInfo2 = IdTmpFileInfo#set_view_tmp_file_info{
1089        fd = IdTmpFileFd,
1090        name = IdTmpFilePath,
1091        size = IdTmpFileSize + iolist_size(IdsData2)
1092    },
1093    TmpFiles2 = dict:store(ids_index, IdTmpFileInfo2, TmpFiles),
1094
1095    case LookupDocIds of
1096    [] ->
1097        LookupResults = [];
1098    _ ->
1099        {ok, LookupResults, IdBtree} =
1100            couch_btree:query_modify(IdBtree, LookupDocIds, [], [])
1101    end,
1102    KeysToRemoveByView = lists:foldl(
1103        fun(LookupResult, KeysToRemoveByViewAcc) ->
1104            case LookupResult of
1105            {ok, {<<_Part:16, DocId/binary>>, <<_Part:16, ViewIdKeys/binary>>}} ->
1106                lists:foldl(
1107                    fun({ViewId, Keys}, KeysToRemoveByViewAcc2) ->
1108                        RemoveKeysDict = lists:foldl(
1109                            fun(Key, RemoveKeysDictAcc) ->
1110                                EncodedKey = Mod:encode_key_docid(Key, DocId),
1111                                dict:store(EncodedKey, nil, RemoveKeysDictAcc)
1112                            end,
1113                        couch_util:dict_find(ViewId, KeysToRemoveByViewAcc2, dict:new()), Keys),
1114                        dict:store(ViewId, RemoveKeysDict, KeysToRemoveByViewAcc2)
1115                    end,
1116                    KeysToRemoveByViewAcc, couch_set_view_util:parse_view_id_keys(ViewIdKeys));
1117            {not_found, _} ->
1118                KeysToRemoveByViewAcc
1119            end
1120        end,
1121        dict:new(), LookupResults),
1122
1123    WriterAcc2 = update_tmp_files(
1124        Mod, WriterAcc#writer_acc{tmp_files = TmpFiles2}, ViewKeyValuesToAdd,
1125        KeysToRemoveByView),
1126    maybe_update_btrees(WriterAcc2).
1127
1128
1129% Update the temporary files with the key-values from the indexer. Return
1130% the updated writer accumulator.
1131-spec update_tmp_files(atom(), #writer_acc{},
1132                       [{#set_view{}, [set_view_key_value()]}],
1133                       dict:dict(non_neg_integer(), dict:dict(binary(), nil)))
1134                      -> #writer_acc{}.
1135update_tmp_files(Mod, WriterAcc, ViewKeyValues, KeysToRemoveByView) ->
1136    #writer_acc{
1137       group = Group,
1138       tmp_files = TmpFiles
1139    } = WriterAcc,
1140    TmpFiles2 = lists:foldl(
1141        fun({#set_view{id_num = ViewId}, AddKeyValues}, AccTmpFiles) ->
1142            AddKeyValuesBinaries = Mod:convert_primary_index_kvs_to_binary(
1143                AddKeyValues, Group, []),
1144            KeysToRemoveDict = couch_util:dict_find(
1145                ViewId, KeysToRemoveByView, dict:new()),
1146
1147            case Mod of
1148            % The b-tree replaces nodes with the same key, hence we don't
1149            % need to delete a node that gets updated anyway
1150            mapreduce_view ->
1151                {KeysToRemoveDict2, BatchData} = lists:foldl(
1152                    fun({K, V}, {KeysToRemoveAcc, BinOpAcc}) ->
1153                        Bin = couch_set_view_updater_helper:encode_op(
1154                            insert, K, V),
1155                        BinOpAcc2 = [Bin | BinOpAcc],
1156                        case dict:find(K, KeysToRemoveAcc) of
1157                        {ok, _} ->
1158                            {dict:erase(K, KeysToRemoveAcc), BinOpAcc2};
1159                        _ ->
1160                            {KeysToRemoveAcc, BinOpAcc2}
1161                        end
1162                    end,
1163                    {KeysToRemoveDict, []}, AddKeyValuesBinaries);
1164            % In r-trees there are multiple possible paths to a key. Hence it's
1165            % not easily possible to replace an existing node with a new one,
1166            % as the insertion could happen in a subtree that is different
1167            % from the subtree of the old value.
1168            spatial_view ->
1169                BatchData = [
1170                    couch_set_view_updater_helper:encode_op(insert, K, V) ||
1171                        {K, V} <- AddKeyValuesBinaries],
1172                KeysToRemoveDict2 = KeysToRemoveDict
1173            end,
1174
1175            BatchData2 = dict:fold(
1176                fun(K, _V, BatchAcc) ->
1177                    Bin = couch_set_view_updater_helper:encode_op(remove, K),
1178                    [Bin | BatchAcc]
1179                end,
1180                BatchData, KeysToRemoveDict2),
1181
1182            ViewTmpFileInfo = dict:fetch(ViewId, TmpFiles),
1183            case ViewTmpFileInfo of
1184            #set_view_tmp_file_info{fd = nil} ->
1185                0 = ViewTmpFileInfo#set_view_tmp_file_info.size,
1186                ViewTmpFilePath = new_sort_file_name(WriterAcc),
1187                {ok, ViewTmpFileFd} = file2:open(
1188                    ViewTmpFilePath, [raw, append, binary]),
1189                ViewTmpFileSize = 0;
1190            #set_view_tmp_file_info{fd = ViewTmpFileFd,
1191                                    size = ViewTmpFileSize,
1192                                    name = ViewTmpFilePath} ->
1193                ok
1194            end,
1195            ok = file:write(ViewTmpFileFd, BatchData2),
1196            ViewTmpFileInfo2 = ViewTmpFileInfo#set_view_tmp_file_info{
1197                fd = ViewTmpFileFd,
1198                name = ViewTmpFilePath,
1199                size = ViewTmpFileSize + iolist_size(BatchData2)
1200            },
1201            dict:store(ViewId, ViewTmpFileInfo2, AccTmpFiles)
1202        end,
1203    TmpFiles, ViewKeyValues),
1204    WriterAcc#writer_acc{
1205        tmp_files = TmpFiles2
1206    }.
1207
1208
1209is_new_partition(PartId, #writer_acc{initial_seqs = InitialSeqs}) ->
1210    couch_util:get_value(PartId, InitialSeqs, 0) == 0.
1211
1212
1213% For incremental index updates.
1214maybe_update_btrees(WriterAcc0) ->
1215    #writer_acc{
1216        view_empty_kvs = ViewEmptyKVs,
1217        tmp_files = TmpFiles,
1218        group = Group0,
1219        final_batch = IsFinalBatch,
1220        owner = Owner,
1221        last_seqs = LastSeqs,
1222        part_versions = PartVersions,
1223        force_flush = ForceFlush
1224    } = WriterAcc0,
1225    IdTmpFileInfo = dict:fetch(ids_index, TmpFiles),
1226    ShouldFlushViews =
1227        lists:any(
1228            fun({#set_view{id_num = Id}, _}) ->
1229                ViewTmpFileInfo = dict:fetch(Id, TmpFiles),
1230                ViewTmpFileInfo#set_view_tmp_file_info.size >= ?INC_MAX_TMP_FILE_SIZE
1231            end, ViewEmptyKVs),
1232    ShouldFlush = IsFinalBatch orelse
1233        ForceFlush orelse
1234        ((IdTmpFileInfo#set_view_tmp_file_info.size >= ?INC_MAX_TMP_FILE_SIZE) andalso
1235        ShouldFlushViews),
1236    case ShouldFlush of
1237    false ->
1238        NewLastSeqs1 = LastSeqs,
1239        case erlang:get(updater_worker) of
1240        undefined ->
1241            WriterAcc = WriterAcc0;
1242        UpdaterWorker when is_reference(UpdaterWorker) ->
1243            receive
1244            {UpdaterWorker, UpGroup, UpStats, CompactFiles} ->
1245                send_log_compact_files(Owner, CompactFiles, ?set_seqs(UpGroup),
1246                    ?set_partition_versions(UpGroup)),
1247                erlang:erase(updater_worker),
1248                WriterAcc = check_if_compactor_started(
1249                    WriterAcc0#writer_acc{group = UpGroup, stats = UpStats})
1250            after 0 ->
1251                WriterAcc = WriterAcc0
1252            end
1253        end;
1254    true ->
1255        ok = close_tmp_fd(IdTmpFileInfo),
1256        ok = lists:foreach(
1257            fun(#set_view{id_num = Id}) ->
1258                ViewTmpFileInfo = dict:fetch(Id, TmpFiles),
1259                ok = close_tmp_fd(ViewTmpFileInfo)
1260            end,
1261            Group0#set_view_group.views),
1262        case erlang:erase(updater_worker) of
1263        undefined ->
1264            WriterAcc1 = WriterAcc0;
1265        UpdaterWorker when is_reference(UpdaterWorker) ->
1266            receive
1267            {UpdaterWorker, UpGroup2, UpStats2, CompactFiles2} ->
1268                send_log_compact_files(Owner, CompactFiles2, ?set_seqs(UpGroup2),
1269                    ?set_partition_versions(UpGroup2)),
1270                WriterAcc1 = check_if_compactor_started(
1271                    WriterAcc0#writer_acc{
1272                        group = UpGroup2,
1273                        stats = UpStats2
1274                    })
1275            end
1276        end,
1277
1278
1279        % MB-11472: There is no id sortfile present and hence this is the final batch
1280        % and nothing left to be updated to the btree
1281        case IdTmpFileInfo#set_view_tmp_file_info.name of
1282        nil ->
1283            WriterAcc = WriterAcc1;
1284        _ ->
1285            WriterAcc2 = check_if_compactor_started(WriterAcc1),
1286            NewUpdaterWorker = spawn_updater_worker(WriterAcc2, LastSeqs, PartVersions),
1287            erlang:put(updater_worker, NewUpdaterWorker),
1288
1289            TmpFiles2 = dict:map(
1290                fun(_, _) -> #set_view_tmp_file_info{} end, TmpFiles),
1291            WriterAcc = WriterAcc2#writer_acc{tmp_files = TmpFiles2}
1292        end,
1293        NewLastSeqs1 = orddict:new()
1294    end,
1295    #writer_acc{
1296        stats = NewStats0,
1297        group = NewGroup0
1298    } = WriterAcc,
1299    case IsFinalBatch of
1300    true ->
1301        case erlang:erase(updater_worker) of
1302        undefined ->
1303            NewGroup = NewGroup0,
1304            NewStats = NewStats0;
1305        UpdaterWorker2 when is_reference(UpdaterWorker2) ->
1306            receive
1307            {UpdaterWorker2, NewGroup, NewStats, CompactFiles3} ->
1308                send_log_compact_files(Owner, CompactFiles3, ?set_seqs(NewGroup),
1309                    ?set_partition_versions(NewGroup))
1310            end
1311        end,
1312        NewLastSeqs = orddict:new();
1313    false ->
1314        NewGroup = NewGroup0,
1315        NewStats = NewStats0,
1316        NewLastSeqs = NewLastSeqs1
1317    end,
1318    NewWriterAcc = WriterAcc#writer_acc{
1319        stats = NewStats,
1320        group = NewGroup,
1321        last_seqs = NewLastSeqs
1322    },
1323    NewWriterAcc.
1324
1325
1326send_log_compact_files(_Owner, [], _Seqs, _PartVersions) ->
1327    ok;
1328send_log_compact_files(Owner, Files, Seqs, PartVersions) ->
1329    Init = case erlang:erase(new_compactor) of
1330    true ->
1331        true;
1332    undefined ->
1333        false
1334    end,
1335    ok = gen_server:cast(Owner, {compact_log_files, Files, Seqs, PartVersions, Init}).
1336
1337
1338-spec spawn_updater_worker(#writer_acc{}, partition_seqs(),
1339                           partition_versions()) -> reference().
1340spawn_updater_worker(WriterAcc, PartIdSeqs, PartVersions) ->
1341    Parent = self(),
1342    Ref = make_ref(),
1343    #writer_acc{
1344        group = Group,
1345        parent = UpdaterPid,
1346        max_seqs = MaxSeqs
1347    } = WriterAcc,
1348    % Wait for main updater process to ack
1349    UpdaterPid ! {native_updater_start, self()},
1350    receive
1351    {ok, native_updater_start} ->
1352        ok
1353    end,
1354    Pid = spawn_link(fun() ->
1355        case ?set_cbitmask(Group) of
1356        0 ->
1357            CleanupStart = 0;
1358        _ ->
1359            CleanupStart = os:timestamp()
1360        end,
1361        {ok, NewGroup0, CleanupCount, NewStats, NewCompactFiles} = update_btrees(WriterAcc),
1362        case ?set_cbitmask(Group) of
1363        0 ->
1364            CleanupTime = 0.0;
1365        _ ->
1366            CleanupTime = timer:now_diff(os:timestamp(), CleanupStart) / 1000000,
1367            #set_view_group{
1368                set_name = SetName,
1369                name = DDocId,
1370                type = GroupType
1371            } = Group,
1372            ?LOG_INFO("Updater for set view `~s`, ~s group `~s`, performed cleanup "
1373                      "of ~p key/value pairs in ~.3f seconds",
1374                      [SetName, GroupType, DDocId, CleanupCount, CleanupTime])
1375        end,
1376        NewSeqs = update_seqs(PartIdSeqs, ?set_seqs(Group)),
1377        NewPartVersions = update_versions(PartVersions, ?set_partition_versions(Group)),
1378        Header = NewGroup0#set_view_group.index_header,
1379        NewHeader = Header#set_view_index_header{
1380            seqs = NewSeqs,
1381            partition_versions = NewPartVersions
1382        },
1383        NewGroup = NewGroup0#set_view_group{
1384            index_header = NewHeader
1385        },
1386        NewGroup2 = update_transferred_replicas(NewGroup, MaxSeqs, PartIdSeqs),
1387        NumChanges = count_seqs_done(Group, NewSeqs),
1388        NewStats2 = NewStats#set_view_updater_stats{
1389           seqs = NewStats#set_view_updater_stats.seqs + NumChanges,
1390           cleanup_time = NewStats#set_view_updater_stats.seqs + CleanupTime,
1391           cleanup_kv_count = NewStats#set_view_updater_stats.cleanup_kv_count + CleanupCount
1392        },
1393        Parent ! {Ref, NewGroup2, NewStats2, NewCompactFiles}
1394    end),
1395    UpdaterPid ! {native_updater_pid, Pid},
1396    Ref.
1397
1398% Update id btree and view btrees with current batch of changes
1399update_btrees(WriterAcc) ->
1400    #writer_acc{
1401        stats = Stats,
1402        group = Group0,
1403        tmp_dir = TmpDir,
1404        tmp_files = TmpFiles,
1405        compactor_running = CompactorRunning,
1406        max_insert_batch_size = MaxBatchSize
1407    } = WriterAcc,
1408
1409    % Prepare list of operation logs for each btree
1410    #set_view_tmp_file_info{name = IdFile} = dict:fetch(ids_index, TmpFiles),
1411    ViewFiles = lists:map(
1412        fun(#set_view{id_num = Id}) ->
1413            #set_view_tmp_file_info{
1414                name = ViewFile
1415            } = dict:fetch(Id, TmpFiles),
1416            ViewFile
1417        end, Group0#set_view_group.views),
1418    % `LogFiles` is supplied to the native updater. For spatial views only the
1419    % ID b-tree is updated.
1420    LogFiles = case Group0#set_view_group.mod of
1421    mapreduce_view ->
1422        [IdFile | ViewFiles];
1423    spatial_view ->
1424        [IdFile]
1425    end,
1426
1427    % Remove spatial views from group
1428    % The native updater can currently handle mapreduce views only, but it
1429    % handles the ID b-tree of the spatial view
1430    Group = couch_set_view_util:remove_group_views(Group0, spatial_view),
1431
1432    {ok, NewGroup0, Stats2} = couch_set_view_updater_helper:update_btrees(
1433        Group, TmpDir, LogFiles, MaxBatchSize, false),
1434    {IdsInserted, IdsDeleted, KVsInserted, KVsDeleted, CleanupCount} = Stats2,
1435
1436    % Add back spatial views
1437    NewGroup = couch_set_view_util:update_group_views(
1438        NewGroup0, Group0, spatial_view),
1439
1440    % The native update for the Id b-tree was run, now it's time to run the
1441    % Erlang updater for the spatial views
1442    NewGroup2 = case NewGroup#set_view_group.mod of
1443    mapreduce_view ->
1444        NewGroup;
1445    spatial_view = Mod ->
1446        ok = couch_file:refresh_eof(NewGroup#set_view_group.fd),
1447        Views = Mod:update_spatial(NewGroup#set_view_group.views, ViewFiles,
1448            MaxBatchSize),
1449        NewGroup#set_view_group{
1450            views = Views,
1451            index_header = (NewGroup#set_view_group.index_header)#set_view_index_header{
1452                view_states = [Mod:get_state(V#set_view.indexer) || V <- Views]
1453            }
1454        }
1455    end,
1456
1457    NewStats = Stats#set_view_updater_stats{
1458     inserted_ids = Stats#set_view_updater_stats.inserted_ids + IdsInserted,
1459     deleted_ids = Stats#set_view_updater_stats.deleted_ids + IdsDeleted,
1460     inserted_kvs = Stats#set_view_updater_stats.inserted_kvs + KVsInserted,
1461     deleted_kvs = Stats#set_view_updater_stats.deleted_kvs + KVsDeleted
1462    },
1463
1464    % Remove files if compactor is not running
1465    % Otherwise send them to compactor to apply deltas
1466    CompactFiles = lists:foldr(
1467        fun(SortedFile, AccCompactFiles) ->
1468            case CompactorRunning of
1469            true ->
1470                case filename:extension(SortedFile) of
1471                ".compact" ->
1472                     [SortedFile | AccCompactFiles];
1473                _ ->
1474                    SortedFile2 = new_sort_file_name(TmpDir, true),
1475                    ok = file2:rename(SortedFile, SortedFile2),
1476                    [SortedFile2 | AccCompactFiles]
1477                end;
1478            false ->
1479                ok = file2:delete(SortedFile),
1480                AccCompactFiles
1481            end
1482        end, [], [IdFile | ViewFiles]),
1483    {ok, NewGroup2, CleanupCount, NewStats, CompactFiles}.
1484
1485
1486update_seqs(PartIdSeqs, Seqs) ->
1487    orddict:fold(
1488        fun(PartId, NewSeq, Acc) ->
1489            OldSeq = couch_util:get_value(PartId, Acc, 0),
1490            case NewSeq > OldSeq of
1491            true ->
1492                ok;
1493            false ->
1494                exit({error, <<"New seq smaller or equal than old seq.">>, PartId, OldSeq, NewSeq})
1495            end,
1496            orddict:store(PartId, NewSeq, Acc)
1497        end,
1498        Seqs, PartIdSeqs).
1499
1500update_versions(PartVersions, AllPartVersions) ->
1501    lists:ukeymerge(1, PartVersions, AllPartVersions).
1502
1503
1504update_task(NumChanges) ->
1505    [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
1506    Changes2 = Changes + NumChanges,
1507    Total2 = erlang:max(Total, Changes2),
1508    Progress = (Changes2 * 100) div Total2,
1509    couch_task_status:update([
1510        {progress, Progress},
1511        {changes_done, Changes2},
1512        {total_changes, Total2}
1513    ]).
1514
1515
1516checkpoint(#writer_acc{owner = Owner, parent = Parent, group = Group} = Acc) ->
1517    #set_view_group{
1518        set_name = SetName,
1519        name = DDocId,
1520        type = Type
1521    } = Group,
1522    ?LOG_INFO("Updater checkpointing set view `~s` update for ~s group `~s`",
1523              [SetName, Type, DDocId]),
1524    NewGroup = maybe_fix_group(Group),
1525    ok = couch_file:refresh_eof(NewGroup#set_view_group.fd),
1526    Owner ! {partial_update, Parent, self(), NewGroup},
1527    receive
1528    update_processed ->
1529        ok;
1530    stop ->
1531        exit(shutdown)
1532    end,
1533    Acc#writer_acc{group = NewGroup}.
1534
1535
1536maybe_fix_group(#set_view_group{index_header = Header} = Group) ->
1537    receive
1538    {new_passive_partitions, Parts} ->
1539        Bitmask = couch_set_view_util:build_bitmask(Parts),
1540        {Seqs, PartVersions} = lists:foldl(
1541            fun(PartId, {SeqAcc, PartVersionsAcc} = Acc) ->
1542                case couch_set_view_util:has_part_seq(PartId, SeqAcc) of
1543                true ->
1544                    Acc;
1545                false ->
1546                    {ordsets:add_element({PartId, 0}, SeqAcc),
1547                        ordsets:add_element({PartId, [{0, 0}]},
1548                            PartVersionsAcc)}
1549                end
1550            end,
1551            {?set_seqs(Group), ?set_partition_versions(Group)}, Parts),
1552        Group#set_view_group{
1553            index_header = Header#set_view_index_header{
1554                seqs = Seqs,
1555                pbitmask = ?set_pbitmask(Group) bor Bitmask,
1556                partition_versions = PartVersions
1557            }
1558        }
1559    after 0 ->
1560        Group
1561    end.
1562
1563
1564check_if_compactor_started(#writer_acc{group = Group0} = Acc) ->
1565    receive
1566    {compactor_started, Pid} ->
1567        erlang:put(new_compactor, true),
1568        Group = maybe_fix_group(Group0),
1569        Pid ! {compactor_started_ack, self(), Group},
1570        Acc#writer_acc{compactor_running = true, group = Group}
1571    after 0 ->
1572        Acc
1573    end.
1574
1575
1576init_tmp_files(WriterAcc) ->
1577    #writer_acc{
1578        group = Group, initial_build = Init, tmp_dir = TmpDir
1579    } = WriterAcc,
1580    case WriterAcc#writer_acc.compactor_running of
1581    true ->
1582        ok = couch_set_view_util:delete_sort_files(TmpDir, updater);
1583    false ->
1584        ok = couch_set_view_util:delete_sort_files(TmpDir, all)
1585    end,
1586    Ids = [ids_index | [V#set_view.id_num || V <- Group#set_view_group.views]],
1587    Files = case Init of
1588    true ->
1589        [begin
1590             FileName = new_sort_file_name(WriterAcc),
1591             {ok, Fd} = file2:open(FileName, [raw, append, binary]),
1592             {Id, #set_view_tmp_file_info{fd = Fd, name = FileName}}
1593         end || Id <- Ids];
1594    false ->
1595         [{Id, #set_view_tmp_file_info{}} || Id <- Ids]
1596    end,
1597    WriterAcc#writer_acc{tmp_files = dict:from_list(Files)}.
1598
1599
1600new_sort_file_name(#writer_acc{tmp_dir = TmpDir, compactor_running = Cr}) ->
1601    new_sort_file_name(TmpDir, Cr).
1602
1603new_sort_file_name(TmpDir, true) ->
1604    couch_set_view_util:new_sort_file_path(TmpDir, compactor);
1605new_sort_file_name(TmpDir, false) ->
1606    couch_set_view_util:new_sort_file_path(TmpDir, updater).
1607
1608
1609make_back_index_key(DocId, PartId) ->
1610    <<PartId:16, DocId/binary>>.
1611
1612
1613count_seqs_done(Group, NewSeqs) ->
1614    % NewSeqs might have new passive partitions that Group's seqs doesn't
1615    % have yet (will get them after a checkpoint period).
1616    lists:foldl(
1617        fun({PartId, SeqDone}, Acc) ->
1618            SeqBefore = couch_util:get_value(PartId, ?set_seqs(Group), 0),
1619            Acc + (SeqDone - SeqBefore)
1620        end,
1621        0, NewSeqs).
1622
1623
1624close_tmp_fd(#set_view_tmp_file_info{fd = nil}) ->
1625    ok;
1626close_tmp_fd(#set_view_tmp_file_info{fd = Fd}) ->
1627    ok = file:close(Fd).
1628
1629
1630% DCP introduces the concept of snapshots, where a document mutation is only
1631% guaranteed to be unique within a single snapshot. But the flusher expects
1632% unique mutations within the full batch. Merge multiple snapshots (if there
1633% are any) into a single one. The latest mutation wins.
1634merge_snapshots(KVs) ->
1635    merge_snapshots(KVs, false, []).
1636
1637merge_snapshots([], true, Acc) ->
1638    {true, Acc};
1639merge_snapshots([], false, Acc) ->
1640    % The order of the KVs doesn't really matter, but having them sorted the
1641    % same way will make life easier when debugging
1642    {false, lists:reverse(Acc)};
1643merge_snapshots([snapshot_marker | KVs], _MultipleSnapshots, Acc) ->
1644    merge_snapshots(KVs, true, Acc);
1645merge_snapshots([{part_versions, _} = PartVersions | KVs], MultipleSnapshots, Acc) ->
1646    merge_snapshots(KVs, MultipleSnapshots, [PartVersions | Acc]);
1647merge_snapshots([KV | KVs], true, Acc0) ->
1648    {_Seq, DocId, _PartId, _QueryResults} = KV,
1649    Acc = lists:keystore(DocId, 2, Acc0, KV),
1650    merge_snapshots(KVs, true, Acc);
1651merge_snapshots([KV | KVs], false, Acc) ->
1652    merge_snapshots(KVs, false, [KV | Acc]).
1653