1% -*- Mode: Erlang; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3% Licensed under the Apache License, Version 2.0 (the "License"); you may not
4% use this file except in compliance with the License. You may obtain a copy of
5% the License at
6%
7%   http://www.apache.org/licenses/LICENSE-2.0
8%
9% Unless required by applicable law or agreed to in writing, software
10% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12% License for the specific language governing permissions and limitations under
13% the License.
14
15-module(couch_set_view_updater).
16
17-export([update/6]).
18% Exported for the MapReduce specific stuff
19-export([new_sort_file_name/1]).
20
21-include("couch_db.hrl").
22-include_lib("couch_set_view/include/couch_set_view.hrl").
23-include_lib("couch_dcp/include/couch_dcp.hrl").
24
25-define(MAP_QUEUE_SIZE, 256 * 1024).
26-define(WRITE_QUEUE_SIZE, 512 * 1024).
27% The size of the accumulator the emitted key-values are queued up in before
28% they get actually queued. The bigger the value, the less the lock contention,
29% but the higher the possible memory consumption is.
30-define(QUEUE_ACC_BATCH_SIZE, 256 * 1024).
31
32% incremental updates
33-define(INC_MAX_TMP_FILE_SIZE, 31457280).
34-define(MIN_BATCH_SIZE_PER_VIEW, 65536).
35
36% For file sorter and file merger commands.
37-define(PORT_OPTS,
38        [exit_status, use_stdio, stderr_to_stdout, {line, 4096}, binary]).
39
40-record(writer_acc, {
41    parent,
42    owner,
43    group,
44    last_seqs = orddict:new(),
45    part_versions = orddict:new(),
46    compactor_running,
47    write_queue,
48    initial_build,
49    view_empty_kvs,
50    kvs = [],
51    kvs_size = 0,
52    state = updating_active,
53    final_batch = false,
54    max_seqs,
55    stats = #set_view_updater_stats{},
56    tmp_dir = nil,
57    initial_seqs,
58    max_insert_batch_size,
59    tmp_files = dict:new(),
60    throttle = 0,
61    force_flush = false
62}).
63
64
65-spec update(pid(), #set_view_group{},
66             partition_seqs(), boolean(), string(), [term()]) -> no_return().
67update(Owner, Group, CurSeqs, CompactorRunning, TmpDir, Options) ->
68    #set_view_group{
69        set_name = SetName,
70        type = Type,
71        name = DDocId
72    } = Group,
73
74    ActiveParts = couch_set_view_util:decode_bitmask(?set_abitmask(Group)),
75    PassiveParts = couch_set_view_util:decode_bitmask(?set_pbitmask(Group)),
76    NumChanges = couch_set_view_util:missing_changes_count(CurSeqs, ?set_seqs(Group)),
77
78    process_flag(trap_exit, true),
79
80    BeforeEnterTs = os:timestamp(),
81    Parent = self(),
82    BarrierEntryPid = spawn_link(fun() ->
83        DDocIds = couch_set_view_util:get_ddoc_ids_with_sig(SetName, Group),
84        couch_task_status:add_task([
85            {type, blocked_indexer},
86            {set, SetName},
87            {signature, ?l2b(couch_util:to_hex(Group#set_view_group.sig))},
88            {design_documents, DDocIds},
89            {indexer_type, Type}
90        ]),
91        case Type of
92        main ->
93            ok = couch_index_barrier:enter(couch_main_index_barrier, Parent);
94        replica ->
95            ok = couch_index_barrier:enter(couch_replica_index_barrier, Parent)
96        end,
97        Parent ! {done, self(), (timer:now_diff(os:timestamp(), BeforeEnterTs) / 1000000)},
98        receive shutdown -> ok end
99    end),
100
101    BlockedTime = receive
102    {done, BarrierEntryPid, Duration} ->
103        Duration;
104    {'EXIT', _, Reason} ->
105        exit({updater_error, Reason})
106    end,
107
108    CleanupParts = couch_set_view_util:decode_bitmask(?set_cbitmask(Group)),
109    InitialBuild = couch_set_view_util:is_initial_build(Group),
110    ?LOG_INFO("Updater for set view `~s`, ~s group `~s` started~n"
111              "Active partitions:    ~w~n"
112              "Passive partitions:   ~w~n"
113              "Cleanup partitions:   ~w~n"
114              "Replicas to transfer: ~w~n"
115              "Pending transition:   ~n"
116              "    active:           ~w~n"
117              "    passive:          ~w~n"
118              "    unindexable:      ~w~n"
119              "Initial build:        ~s~n"
120              "Compactor running:    ~s~n"
121              "Min # changes:        ~p~n"
122              "Partition versions:   ~w~n",
123              [SetName, Type, DDocId,
124               ActiveParts,
125               PassiveParts,
126               CleanupParts,
127               ?set_replicas_on_transfer(Group),
128               ?pending_transition_active(?set_pending_transition(Group)),
129               ?pending_transition_passive(?set_pending_transition(Group)),
130               ?pending_transition_unindexable(?set_pending_transition(Group)),
131               InitialBuild,
132               CompactorRunning,
133               NumChanges,
134               ?set_partition_versions(Group)
135              ]),
136
137    WriterAcc0 = #writer_acc{
138        parent = self(),
139        owner = Owner,
140        group = Group,
141        initial_build = InitialBuild,
142        max_seqs = CurSeqs,
143        part_versions = ?set_partition_versions(Group),
144        tmp_dir = TmpDir,
145        max_insert_batch_size = list_to_integer(
146            couch_config:get("set_views", "indexer_max_insert_batch_size", "1048576"))
147    },
148    update(WriterAcc0, ActiveParts, PassiveParts,
149            BlockedTime, BarrierEntryPid, NumChanges, CompactorRunning, Options).
150
151
152update(WriterAcc, ActiveParts, PassiveParts, BlockedTime,
153       BarrierEntryPid, NumChanges, CompactorRunning, Options) ->
154    #writer_acc{
155        owner = Owner,
156        group = Group
157    } = WriterAcc,
158    #set_view_group{
159        set_name = SetName,
160        type = Type,
161        name = DDocId,
162        sig = GroupSig
163    } = Group,
164
165    StartTime = os:timestamp(),
166
167    MapQueueOptions = [{max_size, ?MAP_QUEUE_SIZE}, {max_items, infinity}],
168    WriteQueueOptions = [{max_size, ?WRITE_QUEUE_SIZE}, {max_items, infinity}],
169    {ok, MapQueue} = couch_work_queue:new(MapQueueOptions),
170    {ok, WriteQueue} = couch_work_queue:new(WriteQueueOptions),
171
172    Parent = self(),
173
174    Mapper = spawn_link(fun() ->
175        try
176            Parent ! couch_set_view_mapreduce:is_doc_used(Group),
177            do_maps(Group, MapQueue, WriteQueue)
178        catch _:Error ->
179            Stacktrace = erlang:get_stacktrace(),
180            ?LOG_ERROR("Set view `~s`, ~s group `~s`, mapper error~n"
181                "error:      ~p~n"
182                "stacktrace: ~p~n",
183                [SetName, Type, DDocId, Error, Stacktrace]),
184            exit(Error)
185        end
186    end),
187
188    receive {ok, IsDocUsed} -> IsDocUsed end,
189
190    Writer = spawn_link(fun() ->
191        BarrierEntryPid ! shutdown,
192        ViewEmptyKVs = [{View, []} || View <- Group#set_view_group.views],
193        WriterAcc2 = init_tmp_files(WriterAcc#writer_acc{
194            parent = Parent,
195            group = Group,
196            write_queue = WriteQueue,
197            view_empty_kvs = ViewEmptyKVs,
198            compactor_running = CompactorRunning,
199            initial_seqs = ?set_seqs(Group),
200            throttle = case lists:member(throttle, Options) of
201                true ->
202                    list_to_integer(
203                        couch_config:get("set_views", "throttle_period", "100"));
204                false ->
205                    0
206                end
207        }),
208        ok = couch_set_view_util:open_raw_read_fd(Group),
209        try
210            WriterAcc3 = do_writes(WriterAcc2),
211            receive
212            {doc_loader_finished, {PartVersions0, RealMaxSeqs}} ->
213                WriterAccStats = WriterAcc3#writer_acc.stats,
214                WriterAccGroup = WriterAcc3#writer_acc.group,
215                WriterAccHeader = WriterAccGroup#set_view_group.index_header,
216                PartVersions = lists:ukeymerge(1, PartVersions0,
217                    WriterAccHeader#set_view_index_header.partition_versions),
218                case WriterAcc3#writer_acc.initial_build of
219                true ->
220                    % The doc loader might not load the mutations up to the
221                    % most recent one, but only to a lower one. Update the
222                    % group header and stats with the correct information.
223                    MaxSeqs = lists:ukeymerge(
224                        1, RealMaxSeqs, WriterAccHeader#set_view_index_header.seqs),
225                    Stats = WriterAccStats#set_view_updater_stats{
226                        seqs = lists:sum([S || {_, S} <- RealMaxSeqs])
227                    };
228                false ->
229                    MaxSeqs = WriterAccHeader#set_view_index_header.seqs,
230                    Stats = WriterAcc3#writer_acc.stats
231                end,
232                FinalWriterAcc = WriterAcc3#writer_acc{
233                    stats = Stats,
234                    group = WriterAccGroup#set_view_group{
235                        index_header = WriterAccHeader#set_view_index_header{
236                            partition_versions = PartVersions,
237                            seqs = MaxSeqs
238                        }
239                    }
240                }
241            end,
242            Parent ! {writer_finished, FinalWriterAcc}
243        catch _:Error ->
244            Stacktrace = erlang:get_stacktrace(),
245            ?LOG_ERROR("Set view `~s`, ~s group `~s`, writer error~n"
246                "error:      ~p~n"
247                "stacktrace: ~p~n",
248                [SetName, Type, DDocId, Error, Stacktrace]),
249            exit(Error)
250        after
251            ok = couch_set_view_util:close_raw_read_fd(Group)
252        end
253    end),
254
255    InitialBuild = WriterAcc#writer_acc.initial_build,
256    NumChanges2 = case InitialBuild of
257        true ->
258            couch_set_view_updater_helper:count_items_from_set(Group, ActiveParts ++ PassiveParts);
259        false ->
260            NumChanges
261    end,
262
263    DocLoader = spawn_link(fun() ->
264        DDocIds = couch_set_view_util:get_ddoc_ids_with_sig(SetName, Group),
265        couch_task_status:add_task([
266            {type, indexer},
267            {set, SetName},
268            {signature, ?l2b(couch_util:to_hex(GroupSig))},
269            {design_documents, DDocIds},
270            {indexer_type, Type},
271            {progress, 0},
272            {changes_done, 0},
273            {initial_build, InitialBuild},
274            {total_changes, NumChanges2}
275        ]),
276        couch_task_status:set_update_frequency(5000),
277        case lists:member(pause, Options) of
278        true ->
279            % For reliable unit testing, to verify that adding new partitions
280            % to the passive state doesn't restart the updater and the updater
281            % can be aware of it and index these new partitions in the same run.
282            receive continue -> ok end;
283        false ->
284            ok
285        end,
286        try
287            {PartVersions, MaxSeqs} = load_changes(
288                Owner, Parent, Group, MapQueue, ActiveParts, PassiveParts,
289                WriterAcc#writer_acc.max_seqs,
290                WriterAcc#writer_acc.initial_build, IsDocUsed),
291            Parent ! {doc_loader_finished, {PartVersions, MaxSeqs}}
292        catch
293        throw:purge ->
294            exit(purge);
295        throw:{rollback, RollbackSeqs} ->
296            exit({rollback, RollbackSeqs});
297        _:Error ->
298            Stacktrace = erlang:get_stacktrace(),
299            ?LOG_ERROR("Set view `~s`, ~s group `~s`, doc loader error~n"
300                "error:      ~p~n"
301                "stacktrace: ~p~n",
302                [SetName, Type, DDocId, Error, Stacktrace]),
303            exit(Error)
304        end,
305        % Since updater progress stats is added from docloader,
306        % this process has to stay till updater has completed.
307        receive
308        updater_finished ->
309            ok
310        end
311    end),
312
313    Result = wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, Group),
314    case Type of
315    main ->
316        ok = couch_index_barrier:leave(couch_main_index_barrier);
317    replica ->
318        ok = couch_index_barrier:leave(couch_replica_index_barrier)
319    end,
320    case Result of
321    {updater_finished, #set_view_updater_result{group = NewGroup}} ->
322        ?LOG_DEBUG("Updater for ~s set view group `~s`, set `~s`, writer finished:~n"
323                   "  start seqs: ~w~n"
324                   "  end seqs:   ~w~n",
325                   [Type, DDocId, SetName, ?set_seqs(Group), ?set_seqs(NewGroup)]);
326    _ ->
327        ok
328    end,
329    DocLoader ! updater_finished,
330    exit(Result).
331
332
333wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup) ->
334    #set_view_group{set_name = SetName, name = DDocId, type = Type} = OldGroup,
335    receive
336    {new_passive_partitions, _} = NewPassivePartitions ->
337        Writer ! NewPassivePartitions,
338        DocLoader ! NewPassivePartitions,
339        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
340    continue ->
341        % Used by unit tests.
342        DocLoader ! continue,
343        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
344    {doc_loader_finished, {PartVersions, MaxSeqs}} ->
345        Writer ! {doc_loader_finished, {PartVersions, MaxSeqs}},
346        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
347    {writer_finished, WriterAcc} ->
348        Stats0 = WriterAcc#writer_acc.stats,
349        Result = #set_view_updater_result{
350            group = WriterAcc#writer_acc.group,
351            state = WriterAcc#writer_acc.state,
352            stats = Stats0#set_view_updater_stats{
353                indexing_time = timer:now_diff(os:timestamp(), StartTime) / 1000000,
354                blocked_time = BlockedTime
355            },
356            tmp_file = case WriterAcc#writer_acc.initial_build of
357                true ->
358                    dict:fetch(build_file, WriterAcc#writer_acc.tmp_files);
359                false ->
360                    ""
361                end
362        },
363        {updater_finished, Result};
364    {compactor_started, Pid, Ref} ->
365        ?LOG_INFO("Set view `~s`, ~s group `~s`, updater received "
366                  "compactor ~p notification, ref ~p, writer ~p",
367                   [SetName, Type, DDocId, Pid, Ref, Writer]),
368        Writer ! {compactor_started, self()},
369        erlang:put(compactor_pid, {Pid, Ref}),
370        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
371    {compactor_started_ack, Writer, GroupSnapshot} ->
372        ?LOG_INFO("Set view `~s`, ~s group `~s`, updater received compaction ack"
373                  " from writer ~p", [SetName, Type, DDocId, Writer]),
374        case erlang:erase(compactor_pid) of
375        {Pid, Ref} ->
376            Pid ! {Ref, {ok, GroupSnapshot}};
377        undefined ->
378            ok
379        end,
380        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
381    {'EXIT', _, Reason} when Reason =/= normal ->
382        couch_util:shutdown_sync(DocLoader),
383        couch_util:shutdown_sync(Mapper),
384        couch_util:shutdown_sync(Writer),
385        {updater_error, Reason};
386    {native_updater_start, Writer} ->
387        % We need control over spawning native updater process
388        % This helps to terminate native os processes correctly
389        Writer ! {ok, native_updater_start},
390        receive
391        {native_updater_pid, NativeUpdater} ->
392            erlang:put(native_updater, NativeUpdater)
393        end,
394        wait_result_loop(StartTime, DocLoader, Mapper, Writer, BlockedTime, OldGroup);
395    stop ->
396        case erlang:erase(native_updater) of
397        undefined ->
398            couch_util:shutdown_sync(DocLoader),
399            couch_util:shutdown_sync(Mapper),
400            couch_util:shutdown_sync(Writer);
401        NativeUpdater ->
402            MRef = erlang:monitor(process, NativeUpdater),
403            NativeUpdater ! stop,
404            receive
405            {'DOWN', MRef, process, NativeUpdater, _} ->
406                couch_util:shutdown_sync(DocLoader),
407                couch_util:shutdown_sync(Mapper),
408                couch_util:shutdown_sync(Writer)
409            end
410        end,
411        exit({updater_error, shutdown})
412    end.
413
414
415dcp_marker_to_string(Type) ->
416    case Type band ?DCP_SNAPSHOT_TYPE_MASK of
417    ?DCP_SNAPSHOT_TYPE_DISK ->
418        "on-disk";
419    ?DCP_SNAPSHOT_TYPE_MEMORY ->
420        "in-memory";
421    _ ->
422        io_lib:format("unknown (~w)", [Type])
423    end.
424
425
426load_changes(Owner, Updater, Group, MapQueue, ActiveParts, PassiveParts,
427        EndSeqs, InitialBuild, IsDocUsed) ->
428    #set_view_group{
429        set_name = SetName,
430        name = DDocId,
431        type = GroupType,
432        index_header = #set_view_index_header{
433            seqs = SinceSeqs,
434            partition_versions = PartVersions0
435        },
436        dcp_pid = DcpPid,
437        category = Category
438    } = Group,
439
440    MaxDocSize = list_to_integer(
441        couch_config:get("set_views", "indexer_max_doc_size", "0")),
442    FoldFun = fun({PartId, EndSeq}, {AccCount, AccSeqs, AccVersions, AccRollbacks}) ->
443        case couch_set_view_util:has_part_seq(PartId, ?set_unindexable_seqs(Group))
444            andalso not lists:member(PartId, ?set_replicas_on_transfer(Group)) of
445        true ->
446            {AccCount, AccSeqs, AccVersions, AccRollbacks};
447        false ->
448            Since = couch_util:get_value(PartId, SinceSeqs, 0),
449            PartVersions = couch_util:get_value(PartId, AccVersions),
450            Flags = case InitialBuild of
451            true ->
452                ?DCP_FLAG_DISKONLY;
453            false ->
454                ?DCP_FLAG_NOFLAG
455            end,
456            % For stream request from 0, If a vbucket got reset in the window
457            % of time between seqno was obtained from stats and stream request
458            % was made, the end_seqno may be higher than current vb high seqno.
459            % Use a special flag to tell server to set end_seqno.
460            Flags2 = case PartVersions of
461            [{0, 0}] ->
462                Flags bor ?DCP_FLAG_USELATEST_ENDSEQNO;
463            _ ->
464                Flags
465            end,
466            Flags3 = case IsDocUsed of
467            doc_fields_unused ->
468                Flags2 bor ?DCP_FLAG_NOVALUE;
469            doc_fields_used ->
470                Flags2
471            end,
472            case AccRollbacks of
473            [] ->
474                case EndSeq =:= Since of
475                true ->
476                    {AccCount, AccSeqs, AccVersions, AccRollbacks};
477                false ->
478                    ChangesWrapper = fun
479                        ({part_versions, _} = NewVersions, Acc) ->
480                            queue_doc(
481                                NewVersions, MapQueue, Group,
482                                MaxDocSize, InitialBuild),
483                            Acc;
484                        ({snapshot_marker, {MarkerStartSeq, MarkerEndSeq, MarkerType}}, {Count, _})
485                            when MarkerType band ?DCP_SNAPSHOT_TYPE_MASK =/= 0 ->
486                            ?LOG_INFO(
487                                "set view `~s`, ~s (~s) group `~s`: received "
488                                "a snapshot marker (~s) for partition ~p from "
489                                "sequence ~p to ~p",
490                                [SetName, GroupType, Category, DDocId,
491                                    dcp_marker_to_string(MarkerType),
492                                    PartId, MarkerStartSeq, MarkerEndSeq]),
493                            case Count of
494                            % Ignore the snapshot marker that is at the
495                            % beginning of the stream.
496                            % If it wasn't ignored, it would lead to an
497                            % additional forced flush which isn't needed. A
498                            % flush is needed if there are several mutations
499                            % of the same document within one batch. As two
500                            % different partitions can't contain the same
501                            % document ID, we are safe to not force flushing
502                            % between two partitions.
503                            0 ->
504                                {Count, MarkerEndSeq};
505                            _ ->
506                                queue_doc(
507                                    snapshot_marker, MapQueue, Group,
508                                    MaxDocSize, InitialBuild),
509                                {Count, MarkerEndSeq}
510                            end;
511                        ({snapshot_marker, {MarkerStartSeq, MarkerEndSeq, MarkerType}}, Acc) ->
512                            ?LOG_ERROR(
513                                "set view `~s`, ~s (~s) group `~s`: received "
514                                "a snapshot marker (~s) for partition ~p from "
515                                "sequence ~p to ~p",
516                                [SetName, GroupType, Category, DDocId,
517                                    dcp_marker_to_string(MarkerType),
518                                    PartId, MarkerStartSeq, MarkerEndSeq]),
519                            throw({error, unknown_snapshot_marker, MarkerType}),
520                            Acc;
521                        (#dcp_doc{} = Item, {Count, AccEndSeq}) ->
522                            queue_doc(
523                                Item, MapQueue, Group,
524                                MaxDocSize, InitialBuild),
525                            {Count + 1, AccEndSeq}
526                        end,
527                    Result = couch_dcp_client:enum_docs_since(
528                        DcpPid, PartId, PartVersions, Since, EndSeq, Flags3,
529                        ChangesWrapper, {0, 0}),
530                    case Result of
531                    {ok, {AccCount2, AccEndSeq}, NewPartVersions} ->
532                        AccSeqs2 = orddict:store(PartId, AccEndSeq, AccSeqs),
533                        AccVersions2 = lists:ukeymerge(
534                            1, [{PartId, NewPartVersions}], AccVersions),
535                        AccRollbacks2 = AccRollbacks;
536                    {rollback, RollbackSeq} ->
537                        AccCount2 = AccCount,
538                        AccSeqs2 = AccSeqs,
539                        AccVersions2 = AccVersions,
540                        AccRollbacks2 = ordsets:add_element(
541                            {PartId, RollbackSeq}, AccRollbacks);
542                    Error ->
543                        AccCount2 = AccCount,
544                        AccSeqs2 = AccSeqs,
545                        AccVersions2 = AccVersions,
546                        AccRollbacks2 = AccRollbacks,
547                        ?LOG_ERROR("set view `~s`, ~s (~s) group `~s` error"
548                            "while loading changes for partition ~p:~n~p~n",
549                            [SetName, GroupType, Category, DDocId, PartId,
550                                Error]),
551                        throw(Error)
552                    end,
553                    {AccCount2, AccSeqs2, AccVersions2, AccRollbacks2}
554                end;
555            _ ->
556                % If there is a rollback needed, don't store any new documents
557                % in the index, but just check for a rollback of another
558                % partition (i.e. a request with start seq == end seq)
559                ChangesWrapper = fun(_, _) -> ok end,
560                Result = couch_dcp_client:enum_docs_since(
561                    DcpPid, PartId, PartVersions, Since, Since, Flags3,
562                    ChangesWrapper, ok),
563                case Result of
564                {ok, _, _} ->
565                    AccRollbacks2 = AccRollbacks;
566                {rollback, RollbackSeq} ->
567                    AccRollbacks2 = ordsets:add_element(
568                        {PartId, RollbackSeq}, AccRollbacks)
569                end,
570                {AccCount, AccSeqs, AccVersions, AccRollbacks2}
571            end
572        end
573    end,
574
575    notify_owner(Owner, {state, updating_active}, Updater),
576    case ActiveParts of
577    [] ->
578        ActiveChangesCount = 0,
579        MaxSeqs = orddict:new(),
580        PartVersions = PartVersions0,
581        Rollbacks = [];
582    _ ->
583        ?LOG_INFO("Updater reading changes from active partitions to "
584                  "update ~s set view group `~s` from set `~s`",
585                  [GroupType, DDocId, SetName]),
586        {ActiveChangesCount, MaxSeqs, PartVersions, Rollbacks} = lists:foldl(
587            FoldFun, {0, orddict:new(), PartVersions0, ordsets:new()},
588            couch_set_view_util:filter_seqs(ActiveParts, EndSeqs))
589    end,
590    case PassiveParts of
591    [] ->
592        FinalChangesCount = ActiveChangesCount,
593        MaxSeqs2 = MaxSeqs,
594        PartVersions2 = PartVersions,
595        Rollbacks2 = Rollbacks;
596    _ ->
597        ?LOG_INFO("Updater reading changes from passive partitions to "
598                  "update ~s set view group `~s` from set `~s`",
599                  [GroupType, DDocId, SetName]),
600        {FinalChangesCount, MaxSeqs2, PartVersions2, Rollbacks2} = lists:foldl(
601            FoldFun, {ActiveChangesCount, MaxSeqs, PartVersions, Rollbacks},
602            couch_set_view_util:filter_seqs(PassiveParts, EndSeqs))
603    end,
604    {FinalChangesCount3, MaxSeqs3, PartVersions3, Rollbacks3} =
605        load_changes_from_passive_parts_in_mailbox(DcpPid,
606            Group, FoldFun, FinalChangesCount, MaxSeqs2, PartVersions2, Rollbacks2),
607
608    case Rollbacks3 of
609    [] ->
610        ok;
611    _ ->
612        throw({rollback, Rollbacks3})
613    end,
614
615    couch_work_queue:close(MapQueue),
616    ?LOG_INFO("Updater for ~s set view group `~s`, set `~s` (~s), read a total
617              of ~p changes and ~p",
618              [GroupType, DDocId, SetName, Category, FinalChangesCount3, IsDocUsed]),
619    ?LOG_DEBUG("Updater for ~s set view group `~s`, set `~s`, max partition seqs found:~n~w",
620               [GroupType, DDocId, SetName, MaxSeqs3]),
621    {PartVersions3, MaxSeqs3}.
622
623
624load_changes_from_passive_parts_in_mailbox(DcpPid,
625        Group, FoldFun, ChangesCount, MaxSeqs0, PartVersions0, Rollbacks) ->
626    #set_view_group{
627        set_name = SetName,
628        name = DDocId,
629        type = GroupType
630    } = Group,
631    receive
632    {new_passive_partitions, Parts0} ->
633        Parts = get_more_passive_partitions(Parts0),
634        AddPartVersions = [{P, [{0, 0}]} || P <- Parts],
635        {ok, AddMaxSeqs} = couch_dcp_client:get_seqs(DcpPid, Parts),
636        PartVersions = lists:ukeymerge(1, AddPartVersions, PartVersions0),
637
638        MaxSeqs = lists:ukeymerge(1, AddMaxSeqs, MaxSeqs0),
639        ?LOG_INFO("Updater reading changes from new passive partitions ~w to "
640                  "update ~s set view group `~s` from set `~s`",
641                  [Parts, GroupType, DDocId, SetName]),
642        {ChangesCount2, MaxSeqs2, PartVersions2, Rollbacks2} = lists:foldl(
643            FoldFun, {ChangesCount, MaxSeqs, PartVersions, Rollbacks}, AddMaxSeqs),
644        load_changes_from_passive_parts_in_mailbox(DcpPid,
645            Group, FoldFun, ChangesCount2, MaxSeqs2, PartVersions2, Rollbacks2)
646    after 0 ->
647        {ChangesCount, MaxSeqs0, PartVersions0, Rollbacks}
648    end.
649
650
651get_more_passive_partitions(Parts) ->
652    receive
653    {new_passive_partitions, Parts2} ->
654        get_more_passive_partitions(Parts ++ Parts2)
655    after 0 ->
656        lists:sort(Parts)
657    end.
658
659
660notify_owner(Owner, Msg, UpdaterPid) ->
661    Owner ! {updater_info, UpdaterPid, Msg}.
662
663
664queue_doc(snapshot_marker, MapQueue, _Group, _MaxDocSize, _InitialBuild) ->
665    couch_work_queue:queue(MapQueue, snapshot_marker);
666queue_doc({part_versions, _} = PartVersions, MapQueue, _Group, _MaxDocSize,
667    _InitialBuild) ->
668    couch_work_queue:queue(MapQueue, PartVersions);
669queue_doc(Doc, MapQueue, Group, MaxDocSize, InitialBuild) ->
670    case Doc#dcp_doc.deleted of
671    true when InitialBuild ->
672        Entry = nil;
673    true ->
674        Entry = Doc;
675    false ->
676        #set_view_group{
677           set_name = SetName,
678           name = DDocId,
679           type = GroupType
680        } = Group,
681        case couch_util:validate_utf8(Doc#dcp_doc.id) of
682        true ->
683            case (MaxDocSize > 0) andalso
684                (iolist_size(Doc#dcp_doc.body) > MaxDocSize) of
685            true ->
686                ?LOG_MAPREDUCE_ERROR("Bucket `~s`, ~s group `~s`, skipping "
687                    "document with ID `~s`: too large body (~p bytes)",
688                    [SetName, GroupType, DDocId,
689                     ?b2l(Doc#dcp_doc.id), iolist_size(Doc#dcp_doc.body)]),
690                Entry = Doc#dcp_doc{deleted = true};
691            false ->
692                Entry = Doc
693            end;
694        false ->
695            % If the id isn't utf8 (memcached allows it), then log an error
696            % message and skip the doc. Send it through the queue anyway
697            % so we record the high seq num in case there are a bunch of
698            % these at the end, we want to keep track of the high seq and
699            % not reprocess again.
700            ?LOG_MAPREDUCE_ERROR("Bucket `~s`, ~s group `~s`, skipping "
701                "document with non-utf8 id. Doc id bytes: ~w",
702                [SetName, GroupType, DDocId, ?b2l(Doc#dcp_doc.id)]),
703            Entry = Doc#dcp_doc{deleted = true}
704        end
705    end,
706    case Entry of
707    nil ->
708        ok;
709    _ ->
710        couch_work_queue:queue(MapQueue, Entry),
711        update_task(1)
712    end.
713
714
715do_maps(Group, MapQueue, WriteQueue) ->
716    #set_view_group{
717        set_name = SetName,
718        name = DDocId,
719        type = Type,
720        mod = Mod
721    } = Group,
722    case couch_work_queue:dequeue(MapQueue) of
723    closed ->
724        couch_work_queue:close(WriteQueue);
725    {ok, Queue, _QueueSize} ->
726        ViewCount = length(Group#set_view_group.views),
727        {Items, _} = lists:foldl(
728            fun(#dcp_doc{deleted = true} = DcpDoc, {Acc, Size}) ->
729                #dcp_doc{
730                    id = Id,
731                    partition = PartId,
732                    seq = Seq
733                } = DcpDoc,
734                Item = {Seq, Id, PartId, []},
735                {[Item | Acc], Size};
736            (#dcp_doc{deleted = false} = DcpDoc, {Acc0, Size0}) ->
737                % When there are a lot of emits per document the memory can
738                % grow almost indefinitely as the queue size is only limited
739                % by the number of documents and not their emits.
740                % In case the accumulator grows huge, queue the items early
741                % into the writer queue. Only take emits into account, the
742                % other cases in this `foldl` won't ever have an significant
743                % size.
744                {Acc, Size} = case Size0 > ?QUEUE_ACC_BATCH_SIZE of
745                true ->
746                    couch_work_queue:queue(WriteQueue, lists:reverse(Acc0)),
747                    {[], 0};
748                false ->
749                    {Acc0, Size0}
750                end,
751                #dcp_doc{
752                    id = Id,
753                    body = Body,
754                    partition = PartId,
755                    rev_seq = RevSeq,
756                    seq = Seq,
757                    cas = Cas,
758                    expiration = Expiration,
759                    flags = Flags,
760                    data_type = DcpDataType
761                } = DcpDoc,
762                DataType = case DcpDataType of
763                ?DCP_DATA_TYPE_RAW ->
764                    ?CONTENT_META_NON_JSON_MODE;
765                ?DCP_DATA_TYPE_JSON ->
766                    ?CONTENT_META_JSON
767                end,
768                Doc = #doc{
769                    id = Id,
770                    rev = {RevSeq, <<Cas:64, Expiration:32, Flags:32>>},
771                    body = Body,
772                    content_meta = DataType,
773                    deleted = false
774                },
775                try
776                    {ok, Result, LogList} = couch_set_view_mapreduce:map(
777                        Doc, PartId, Seq, Group),
778                    {Result2, _} = lists:foldr(
779                        fun({error, Reason}, {AccRes, Pos}) ->
780                            ErrorMsg = "Bucket `~s`, ~s group `~s`, error mapping"
781                                    " document `~s` for view `~s`: ~s",
782                            ViewName = Mod:view_name(Group, Pos),
783                            Args = [SetName, Type, DDocId, Id, ViewName,
784                                    couch_util:to_binary(Reason)],
785                            ?LOG_MAPREDUCE_ERROR(ErrorMsg, Args),
786                            {[[] | AccRes], Pos - 1};
787                        (KVs, {AccRes, Pos}) ->
788                            {[KVs | AccRes], Pos - 1}
789                        end,
790                        {[], ViewCount}, Result),
791                    lists:foreach(
792                        fun(Msg) ->
793                            DebugMsg = "Bucket `~s`, ~s group `~s`, map function"
794                                " log for document `~s`: ~s",
795                            Args = [SetName, Type, DDocId, Id, binary_to_list(Msg)],
796                            ?LOG_MAPREDUCE_ERROR(DebugMsg, Args)
797                        end, LogList),
798                    Item = {Seq, Id, PartId, Result2},
799                    {[Item | Acc], Size + erlang:external_size(Result2)}
800                catch _:{error, Reason} ->
801                    ErrorMsg = "Bucket `~s`, ~s group `~s`, error mapping document `~s`: ~s",
802                    Args = [SetName, Type, DDocId, Id, couch_util:to_binary(Reason)],
803                    ?LOG_MAPREDUCE_ERROR(ErrorMsg, Args),
804                    {[{Seq, Id, PartId, []} | Acc], Size}
805                end;
806            (snapshot_marker, {Acc, Size}) ->
807                {[snapshot_marker | Acc], Size};
808            ({part_versions, _} = PartVersions, {Acc, Size}) ->
809                {[PartVersions | Acc], Size}
810            end,
811            {[], 0}, Queue),
812        ok = couch_work_queue:queue(WriteQueue, lists:reverse(Items)),
813        do_maps(Group, MapQueue, WriteQueue)
814    end.
815
816
817do_writes(Acc) ->
818    #writer_acc{
819        kvs = Kvs,
820        kvs_size = KvsSize,
821        write_queue = WriteQueue,
822        throttle = Throttle
823    } = Acc,
824    ok = timer:sleep(Throttle),
825    case couch_work_queue:dequeue(WriteQueue) of
826    closed ->
827        flush_writes(Acc#writer_acc{final_batch = true});
828    {ok, Queue0, QueueSize} ->
829        Queue = lists:flatten(Queue0),
830        Kvs2 = Kvs ++ Queue,
831        KvsSize2 = KvsSize + QueueSize,
832
833        Acc2 = Acc#writer_acc{
834            kvs = Kvs2,
835            kvs_size = KvsSize2
836        },
837        case should_flush_writes(Acc2) of
838        true ->
839            Acc3 = flush_writes(Acc2),
840            Acc4 = Acc3#writer_acc{kvs = [], kvs_size = 0};
841        false ->
842            Acc4 = Acc2
843        end,
844        do_writes(Acc4)
845    end.
846
847should_flush_writes(Acc) ->
848    #writer_acc{
849        view_empty_kvs = ViewEmptyKvs,
850        kvs_size = KvsSize
851    } = Acc,
852    KvsSize >= (?MIN_BATCH_SIZE_PER_VIEW * length(ViewEmptyKvs)).
853
854
855flush_writes(#writer_acc{kvs = [], initial_build = false} = Acc) ->
856    Acc2 = maybe_update_btrees(Acc),
857    checkpoint(Acc2);
858
859flush_writes(#writer_acc{initial_build = false} = Acc0) ->
860    #writer_acc{
861        kvs = Kvs,
862        view_empty_kvs = ViewEmptyKVs,
863        group = Group,
864        parent = Parent,
865        owner = Owner,
866        final_batch = IsFinalBatch,
867        part_versions = PartVersions
868    } = Acc0,
869    Mod = Group#set_view_group.mod,
870    % Only incremental updates can contain multiple snapshots
871    {MultipleSnapshots, Kvs2} = merge_snapshots(Kvs),
872    Acc1 = case MultipleSnapshots of
873    true ->
874        Acc2 = maybe_update_btrees(Acc0#writer_acc{force_flush = true}),
875        checkpoint(Acc2);
876    false ->
877        Acc0
878    end,
879    #writer_acc{last_seqs = LastSeqs} = Acc1,
880    {ViewKVs, DocIdViewIdKeys, NewLastSeqs, NewPartVersions} =
881        process_map_results(Mod, Kvs2, ViewEmptyKVs, LastSeqs, PartVersions),
882    Acc3 = Acc1#writer_acc{last_seqs = NewLastSeqs, part_versions = NewPartVersions},
883    Acc4 = write_to_tmp_batch_files(ViewKVs, DocIdViewIdKeys, Acc3),
884    #writer_acc{group = NewGroup} = Acc4,
885    case ?set_seqs(NewGroup) =/= ?set_seqs(Group) of
886    true ->
887        Acc5 = checkpoint(Acc4),
888        case (Acc4#writer_acc.state =:= updating_active) andalso
889            lists:any(fun({PartId, _}) ->
890                ((1 bsl PartId) band ?set_pbitmask(Group) =/= 0)
891            end, NewLastSeqs) of
892        true ->
893            notify_owner(Owner, {state, updating_passive}, Parent),
894            Acc5#writer_acc{state = updating_passive};
895        false ->
896            Acc5
897        end;
898    false ->
899        case IsFinalBatch of
900        true ->
901            checkpoint(Acc4);
902        false ->
903            Acc4
904        end
905    end;
906
907flush_writes(#writer_acc{initial_build = true} = WriterAcc) ->
908    #writer_acc{
909        kvs = Kvs,
910        view_empty_kvs = ViewEmptyKVs,
911        tmp_files = TmpFiles,
912        tmp_dir = TmpDir,
913        group = Group,
914        final_batch = IsFinalBatch,
915        max_seqs = MaxSeqs,
916        part_versions = PartVersions,
917        stats = Stats
918    } = WriterAcc,
919    #set_view_group{
920        set_name = SetName,
921        type = Type,
922        name = DDocId,
923        mod = Mod
924    } = Group,
925    {ViewKVs, DocIdViewIdKeys, MaxSeqs2, PartVersions2} = process_map_results(
926        Mod, Kvs, ViewEmptyKVs, MaxSeqs, PartVersions),
927
928    IdRecords = lists:foldr(
929        fun({_DocId, {_PartId, []}}, Acc) ->
930                Acc;
931            (Kv, Acc) ->
932                [{KeyBin, ValBin}] = Mod:convert_back_index_kvs_to_binary([Kv], []),
933                KvBin = [<<(byte_size(KeyBin)):16>>, KeyBin, ValBin],
934                [[<<(iolist_size(KvBin)):32/native>>, KvBin] | Acc]
935        end,
936        [], DocIdViewIdKeys),
937    #set_view_tmp_file_info{fd = IdFd} = dict:fetch(ids_index, TmpFiles),
938    ok = file:write(IdFd, IdRecords),
939
940    {InsertKVCount, TmpFiles2} = Mod:write_kvs(Group, TmpFiles, ViewKVs),
941
942    Stats2 = Stats#set_view_updater_stats{
943        inserted_kvs = Stats#set_view_updater_stats.inserted_kvs + InsertKVCount,
944        inserted_ids = Stats#set_view_updater_stats.inserted_ids + length(DocIdViewIdKeys)
945    },
946    case IsFinalBatch of
947    false ->
948        WriterAcc#writer_acc{
949            max_seqs = MaxSeqs2,
950            stats = Stats2,
951            tmp_files = TmpFiles2
952        };
953    true ->
954        ?LOG_INFO("Updater for set view `~s`, ~s group `~s`, starting btree "
955                  "build phase" , [SetName, Type, DDocId]),
956        {Group2, BuildFd} = Mod:finish_build(Group, TmpFiles2, TmpDir),
957        WriterAcc#writer_acc{
958            tmp_files = dict:store(build_file, BuildFd, TmpFiles2),
959            max_seqs = MaxSeqs2,
960            part_versions = PartVersions2,
961            stats = Stats2,
962            group = Group2
963        }
964    end.
965
966
967process_map_results(Mod, Kvs, ViewEmptyKVs, PartSeqs, PartVersions) ->
968    lists:foldl(
969        fun({Seq, DocId, PartId, []}, {ViewKVsAcc, DocIdViewIdKeysAcc, PartIdSeqs, PartIdVersions}) ->
970            PartIdSeqs2 = update_part_seq(Seq, PartId, PartIdSeqs),
971            {ViewKVsAcc, [{DocId, {PartId, []}} | DocIdViewIdKeysAcc], PartIdSeqs2, PartIdVersions};
972        ({Seq, DocId, PartId, QueryResults}, {ViewKVsAcc, DocIdViewIdKeysAcc, PartIdSeqs, PartIdVersions}) ->
973            {NewViewKVs, NewViewIdKeys} = Mod:view_insert_doc_query_results(
974                    DocId, PartId, QueryResults, ViewKVsAcc, [], []),
975            PartIdSeqs2 = update_part_seq(Seq, PartId, PartIdSeqs),
976            {NewViewKVs, [{DocId, {PartId, NewViewIdKeys}} | DocIdViewIdKeysAcc], PartIdSeqs2, PartIdVersions};
977        (snapshot_marker, _Acc) ->
978            throw({error,
979                <<"The multiple snapshots should have been merged, "
980                  "but theu weren't">>});
981        ({part_versions, {PartId, NewVersions}}, {ViewKVsAcc, DocIdViewIdKeysAcc, PartIdSeqs, PartIdVersions}) ->
982            PartIdVersions2 = update_part_versions(NewVersions, PartId, PartIdVersions),
983            {ViewKVsAcc, DocIdViewIdKeysAcc, PartIdSeqs, PartIdVersions2}
984        end,
985        {ViewEmptyKVs, [], PartSeqs, PartVersions}, Kvs).
986
987
988-spec update_transferred_replicas(#set_view_group{},
989                                  partition_seqs(),
990                                  partition_seqs()) -> #set_view_group{}.
991update_transferred_replicas(Group, _MaxSeqs, _PartIdSeqs) when ?set_replicas_on_transfer(Group) =:= [] ->
992    Group;
993update_transferred_replicas(Group, MaxSeqs, PartIdSeqs) ->
994    #set_view_group{index_header = Header} = Group,
995    RepsTransferred = lists:foldl(
996        fun({PartId, Seq}, A) ->
997            case lists:member(PartId, ?set_replicas_on_transfer(Group))
998                andalso (Seq >= couch_set_view_util:get_part_seq(PartId, MaxSeqs)) of
999            true ->
1000                ordsets:add_element(PartId, A);
1001            false ->
1002                A
1003            end
1004        end,
1005        ordsets:new(), PartIdSeqs),
1006    ReplicasOnTransfer2 = ordsets:subtract(?set_replicas_on_transfer(Group), RepsTransferred),
1007    {Abitmask2, Pbitmask2} = lists:foldl(
1008        fun(Id, {A, P}) ->
1009            Mask = 1 bsl Id,
1010            Mask = ?set_pbitmask(Group) band Mask,
1011            0 = ?set_abitmask(Group) band Mask,
1012            {A bor Mask, P bxor Mask}
1013        end,
1014        {?set_abitmask(Group), ?set_pbitmask(Group)},
1015        RepsTransferred),
1016    Group#set_view_group{
1017        index_header = Header#set_view_index_header{
1018            abitmask = Abitmask2,
1019            pbitmask = Pbitmask2,
1020            replicas_on_transfer = ReplicasOnTransfer2
1021        }
1022    }.
1023
1024
1025-spec update_part_seq(update_seq(), partition_id(), partition_seqs()) -> partition_seqs().
1026update_part_seq(Seq, PartId, Acc) ->
1027    case couch_set_view_util:find_part_seq(PartId, Acc) of
1028    {ok, Max} when Max >= Seq ->
1029        Acc;
1030    _ ->
1031        orddict:store(PartId, Seq, Acc)
1032    end.
1033
1034
1035-spec update_part_versions(partition_version(), partition_id(), partition_versions()) ->
1036    partition_versions().
1037update_part_versions(NewVersions, PartId, PartVersions) ->
1038    orddict:store(PartId, NewVersions, PartVersions).
1039
1040
1041% Incremental updates.
1042write_to_tmp_batch_files(ViewKeyValuesToAdd, DocIdViewIdKeys, WriterAcc) ->
1043    #writer_acc{
1044        tmp_files = TmpFiles,
1045        group = #set_view_group{
1046            id_btree = IdBtree,
1047            mod = Mod
1048        }
1049    } = WriterAcc,
1050
1051    {AddDocIdViewIdKeys0, RemoveDocIds, LookupDocIds} = lists:foldr(
1052        fun({DocId, {PartId, [] = _ViewIdKeys}}, {A, B, C}) ->
1053                BackKey = make_back_index_key(DocId, PartId),
1054                case is_new_partition(PartId, WriterAcc) of
1055                true ->
1056                    {A, [BackKey | B], C};
1057                false ->
1058                    {A, [BackKey | B], [BackKey | C]}
1059                end;
1060            ({DocId, {PartId, _ViewIdKeys}} = KvPairs, {A, B, C}) ->
1061                BackKey = make_back_index_key(DocId, PartId),
1062                case is_new_partition(PartId, WriterAcc) of
1063                true ->
1064                    {[KvPairs | A], B, C};
1065                false ->
1066                    {[KvPairs | A], B, [BackKey | C]}
1067                end
1068        end,
1069        {[], [], []}, DocIdViewIdKeys),
1070
1071    AddDocIdViewIdKeys = Mod:convert_back_index_kvs_to_binary(
1072        AddDocIdViewIdKeys0, []),
1073
1074    IdsData1 = lists:map(
1075        fun(K) -> couch_set_view_updater_helper:encode_op(remove, K) end,
1076        RemoveDocIds),
1077
1078    IdsData2 = lists:foldl(
1079        fun({K, V}, Acc) ->
1080            Bin = couch_set_view_updater_helper:encode_op(insert, K, V),
1081            [Bin | Acc]
1082        end,
1083        IdsData1,
1084        AddDocIdViewIdKeys),
1085
1086    IdTmpFileInfo = dict:fetch(ids_index, TmpFiles),
1087    case IdTmpFileInfo of
1088    #set_view_tmp_file_info{fd = nil} ->
1089        0 = IdTmpFileInfo#set_view_tmp_file_info.size,
1090        IdTmpFilePath = new_sort_file_name(WriterAcc),
1091        {ok, IdTmpFileFd} = file2:open(IdTmpFilePath, [raw, append, binary]),
1092        IdTmpFileSize = 0;
1093    #set_view_tmp_file_info{
1094            fd = IdTmpFileFd, name = IdTmpFilePath, size = IdTmpFileSize} ->
1095        ok
1096    end,
1097
1098    ok = file:write(IdTmpFileFd, IdsData2),
1099
1100    IdTmpFileInfo2 = IdTmpFileInfo#set_view_tmp_file_info{
1101        fd = IdTmpFileFd,
1102        name = IdTmpFilePath,
1103        size = IdTmpFileSize + iolist_size(IdsData2)
1104    },
1105    TmpFiles2 = dict:store(ids_index, IdTmpFileInfo2, TmpFiles),
1106
1107    case LookupDocIds of
1108    [] ->
1109        LookupResults = [];
1110    _ ->
1111        {ok, LookupResults, IdBtree} =
1112            couch_btree:query_modify(IdBtree, LookupDocIds, [], [])
1113    end,
1114    KeysToRemoveByView = lists:foldl(
1115        fun(LookupResult, KeysToRemoveByViewAcc) ->
1116            case LookupResult of
1117            {ok, {<<_Part:16, DocId/binary>>, <<_Part:16, ViewIdKeys/binary>>}} ->
1118                lists:foldl(
1119                    fun({ViewId, Keys}, KeysToRemoveByViewAcc2) ->
1120                        RemoveKeysDict = lists:foldl(
1121                            fun(Key, RemoveKeysDictAcc) ->
1122                                EncodedKey = Mod:encode_key_docid(Key, DocId),
1123                                dict:store(EncodedKey, nil, RemoveKeysDictAcc)
1124                            end,
1125                        couch_util:dict_find(ViewId, KeysToRemoveByViewAcc2, dict:new()), Keys),
1126                        dict:store(ViewId, RemoveKeysDict, KeysToRemoveByViewAcc2)
1127                    end,
1128                    KeysToRemoveByViewAcc, couch_set_view_util:parse_view_id_keys(ViewIdKeys));
1129            {not_found, _} ->
1130                KeysToRemoveByViewAcc
1131            end
1132        end,
1133        dict:new(), LookupResults),
1134
1135    WriterAcc2 = update_tmp_files(
1136        Mod, WriterAcc#writer_acc{tmp_files = TmpFiles2}, ViewKeyValuesToAdd,
1137        KeysToRemoveByView),
1138    maybe_update_btrees(WriterAcc2).
1139
1140
1141% Update the temporary files with the key-values from the indexer. Return
1142% the updated writer accumulator.
1143-spec update_tmp_files(atom(), #writer_acc{},
1144                       [{#set_view{}, [set_view_key_value()]}],
1145                       dict:dict(non_neg_integer(), dict:dict(binary(), nil)))
1146                      -> #writer_acc{}.
1147update_tmp_files(Mod, WriterAcc, ViewKeyValues, KeysToRemoveByView) ->
1148    #writer_acc{
1149       group = Group,
1150       tmp_files = TmpFiles
1151    } = WriterAcc,
1152    TmpFiles2 = lists:foldl(
1153        fun({#set_view{id_num = ViewId}, AddKeyValues}, AccTmpFiles) ->
1154            AddKeyValuesBinaries = Mod:convert_primary_index_kvs_to_binary(
1155                AddKeyValues, Group, []),
1156            KeysToRemoveDict = couch_util:dict_find(
1157                ViewId, KeysToRemoveByView, dict:new()),
1158
1159            case Mod of
1160            % The b-tree replaces nodes with the same key, hence we don't
1161            % need to delete a node that gets updated anyway
1162            mapreduce_view ->
1163                {KeysToRemoveDict2, BatchData} = lists:foldl(
1164                    fun({K, V}, {KeysToRemoveAcc, BinOpAcc}) ->
1165                        Bin = couch_set_view_updater_helper:encode_op(
1166                            insert, K, V),
1167                        BinOpAcc2 = [Bin | BinOpAcc],
1168                        case dict:find(K, KeysToRemoveAcc) of
1169                        {ok, _} ->
1170                            {dict:erase(K, KeysToRemoveAcc), BinOpAcc2};
1171                        _ ->
1172                            {KeysToRemoveAcc, BinOpAcc2}
1173                        end
1174                    end,
1175                    {KeysToRemoveDict, []}, AddKeyValuesBinaries);
1176            % In r-trees there are multiple possible paths to a key. Hence it's
1177            % not easily possible to replace an existing node with a new one,
1178            % as the insertion could happen in a subtree that is different
1179            % from the subtree of the old value.
1180            spatial_view ->
1181                BatchData = [
1182                    couch_set_view_updater_helper:encode_op(insert, K, V) ||
1183                        {K, V} <- AddKeyValuesBinaries],
1184                KeysToRemoveDict2 = KeysToRemoveDict
1185            end,
1186
1187            BatchData2 = dict:fold(
1188                fun(K, _V, BatchAcc) ->
1189                    Bin = couch_set_view_updater_helper:encode_op(remove, K),
1190                    [Bin | BatchAcc]
1191                end,
1192                BatchData, KeysToRemoveDict2),
1193
1194            ViewTmpFileInfo = dict:fetch(ViewId, TmpFiles),
1195            case ViewTmpFileInfo of
1196            #set_view_tmp_file_info{fd = nil} ->
1197                0 = ViewTmpFileInfo#set_view_tmp_file_info.size,
1198                ViewTmpFilePath = new_sort_file_name(WriterAcc),
1199                {ok, ViewTmpFileFd} = file2:open(
1200                    ViewTmpFilePath, [raw, append, binary]),
1201                ViewTmpFileSize = 0;
1202            #set_view_tmp_file_info{fd = ViewTmpFileFd,
1203                                    size = ViewTmpFileSize,
1204                                    name = ViewTmpFilePath} ->
1205                ok
1206            end,
1207            ok = file:write(ViewTmpFileFd, BatchData2),
1208            ViewTmpFileInfo2 = ViewTmpFileInfo#set_view_tmp_file_info{
1209                fd = ViewTmpFileFd,
1210                name = ViewTmpFilePath,
1211                size = ViewTmpFileSize + iolist_size(BatchData2)
1212            },
1213            dict:store(ViewId, ViewTmpFileInfo2, AccTmpFiles)
1214        end,
1215    TmpFiles, ViewKeyValues),
1216    WriterAcc#writer_acc{
1217        tmp_files = TmpFiles2
1218    }.
1219
1220
1221is_new_partition(PartId, #writer_acc{initial_seqs = InitialSeqs}) ->
1222    couch_util:get_value(PartId, InitialSeqs, 0) == 0.
1223
1224
1225% For incremental index updates.
1226maybe_update_btrees(WriterAcc0) ->
1227    #writer_acc{
1228        view_empty_kvs = ViewEmptyKVs,
1229        tmp_files = TmpFiles,
1230        group = Group0,
1231        final_batch = IsFinalBatch,
1232        owner = Owner,
1233        last_seqs = LastSeqs,
1234        part_versions = PartVersions,
1235        force_flush = ForceFlush
1236    } = WriterAcc0,
1237    IdTmpFileInfo = dict:fetch(ids_index, TmpFiles),
1238    ShouldFlushViews =
1239        lists:any(
1240            fun({#set_view{id_num = Id}, _}) ->
1241                ViewTmpFileInfo = dict:fetch(Id, TmpFiles),
1242                ViewTmpFileInfo#set_view_tmp_file_info.size >= ?INC_MAX_TMP_FILE_SIZE
1243            end, ViewEmptyKVs),
1244    ShouldFlush = IsFinalBatch orelse
1245        ForceFlush orelse
1246        ((IdTmpFileInfo#set_view_tmp_file_info.size >= ?INC_MAX_TMP_FILE_SIZE) andalso
1247        ShouldFlushViews),
1248    case ShouldFlush of
1249    false ->
1250        NewLastSeqs1 = LastSeqs,
1251        case erlang:get(updater_worker) of
1252        undefined ->
1253            WriterAcc = WriterAcc0;
1254        UpdaterWorker when is_reference(UpdaterWorker) ->
1255            receive
1256            {UpdaterWorker, UpGroup, UpStats, CompactFiles} ->
1257                send_log_compact_files(Owner, CompactFiles, ?set_seqs(UpGroup),
1258                    ?set_partition_versions(UpGroup)),
1259                erlang:erase(updater_worker),
1260                WriterAcc = check_if_compactor_started(
1261                    WriterAcc0#writer_acc{group = UpGroup, stats = UpStats})
1262            after 0 ->
1263                WriterAcc = WriterAcc0
1264            end
1265        end;
1266    true ->
1267        ok = close_tmp_fd(IdTmpFileInfo),
1268        ok = lists:foreach(
1269            fun(#set_view{id_num = Id}) ->
1270                ViewTmpFileInfo = dict:fetch(Id, TmpFiles),
1271                ok = close_tmp_fd(ViewTmpFileInfo)
1272            end,
1273            Group0#set_view_group.views),
1274        case erlang:erase(updater_worker) of
1275        undefined ->
1276            WriterAcc1 = WriterAcc0;
1277        UpdaterWorker when is_reference(UpdaterWorker) ->
1278            receive
1279            {UpdaterWorker, UpGroup2, UpStats2, CompactFiles2} ->
1280                send_log_compact_files(Owner, CompactFiles2, ?set_seqs(UpGroup2),
1281                    ?set_partition_versions(UpGroup2)),
1282                WriterAcc1 = check_if_compactor_started(
1283                    WriterAcc0#writer_acc{
1284                        group = UpGroup2,
1285                        stats = UpStats2
1286                    })
1287            end
1288        end,
1289
1290
1291        % MB-11472: There is no id sortfile present and hence this is the final batch
1292        % and nothing left to be updated to the btree
1293        case IdTmpFileInfo#set_view_tmp_file_info.name of
1294        nil ->
1295            WriterAcc = WriterAcc1;
1296        _ ->
1297            WriterAcc2 = check_if_compactor_started(WriterAcc1),
1298            NewUpdaterWorker = spawn_updater_worker(WriterAcc2, LastSeqs, PartVersions),
1299            erlang:put(updater_worker, NewUpdaterWorker),
1300
1301            TmpFiles2 = dict:map(
1302                fun(_, _) -> #set_view_tmp_file_info{} end, TmpFiles),
1303            WriterAcc = WriterAcc2#writer_acc{tmp_files = TmpFiles2}
1304        end,
1305        NewLastSeqs1 = orddict:new()
1306    end,
1307    #writer_acc{
1308        stats = NewStats0,
1309        group = NewGroup0
1310    } = WriterAcc,
1311    case IsFinalBatch of
1312    true ->
1313        case erlang:erase(updater_worker) of
1314        undefined ->
1315            NewGroup = NewGroup0,
1316            NewStats = NewStats0;
1317        UpdaterWorker2 when is_reference(UpdaterWorker2) ->
1318            receive
1319            {UpdaterWorker2, NewGroup, NewStats, CompactFiles3} ->
1320                send_log_compact_files(Owner, CompactFiles3, ?set_seqs(NewGroup),
1321                    ?set_partition_versions(NewGroup))
1322            end
1323        end,
1324        NewLastSeqs = orddict:new();
1325    false ->
1326        NewGroup = NewGroup0,
1327        NewStats = NewStats0,
1328        NewLastSeqs = NewLastSeqs1
1329    end,
1330    NewWriterAcc = WriterAcc#writer_acc{
1331        stats = NewStats,
1332        group = NewGroup,
1333        last_seqs = NewLastSeqs
1334    },
1335    NewWriterAcc.
1336
1337
1338send_log_compact_files(_Owner, [], _Seqs, _PartVersions) ->
1339    ok;
1340send_log_compact_files(Owner, Files, Seqs, PartVersions) ->
1341    Init = case erlang:erase(new_compactor) of
1342    true ->
1343        true;
1344    undefined ->
1345        false
1346    end,
1347    ok = gen_server:cast(Owner, {compact_log_files, Files, Seqs, PartVersions, Init}).
1348
1349
1350-spec spawn_updater_worker(#writer_acc{}, partition_seqs(),
1351                           partition_versions()) -> reference().
1352spawn_updater_worker(WriterAcc, PartIdSeqs, PartVersions) ->
1353    Parent = self(),
1354    Ref = make_ref(),
1355    #writer_acc{
1356        group = Group,
1357        parent = UpdaterPid,
1358        max_seqs = MaxSeqs
1359    } = WriterAcc,
1360    % Wait for main updater process to ack
1361    UpdaterPid ! {native_updater_start, self()},
1362    receive
1363    {ok, native_updater_start} ->
1364        ok
1365    end,
1366    Pid = spawn_link(fun() ->
1367        case ?set_cbitmask(Group) of
1368        0 ->
1369            CleanupStart = 0;
1370        _ ->
1371            CleanupStart = os:timestamp()
1372        end,
1373        {ok, NewGroup0, CleanupCount, NewStats, NewCompactFiles} = update_btrees(WriterAcc),
1374        case ?set_cbitmask(Group) of
1375        0 ->
1376            CleanupTime = 0.0;
1377        _ ->
1378            CleanupTime = timer:now_diff(os:timestamp(), CleanupStart) / 1000000,
1379            #set_view_group{
1380                set_name = SetName,
1381                name = DDocId,
1382                type = GroupType
1383            } = Group,
1384            ?LOG_INFO("Updater for set view `~s`, ~s group `~s`, performed cleanup "
1385                      "of ~p key/value pairs in ~.3f seconds",
1386                      [SetName, GroupType, DDocId, CleanupCount, CleanupTime])
1387        end,
1388        NewSeqs = update_seqs(PartIdSeqs, ?set_seqs(Group)),
1389        NewPartVersions = update_versions(PartVersions, ?set_partition_versions(Group)),
1390        Header = NewGroup0#set_view_group.index_header,
1391        NewHeader = Header#set_view_index_header{
1392            seqs = NewSeqs,
1393            partition_versions = NewPartVersions
1394        },
1395        NewGroup = NewGroup0#set_view_group{
1396            index_header = NewHeader
1397        },
1398        NewGroup2 = update_transferred_replicas(NewGroup, MaxSeqs, PartIdSeqs),
1399        NumChanges = count_seqs_done(Group, NewSeqs),
1400        NewStats2 = NewStats#set_view_updater_stats{
1401           seqs = NewStats#set_view_updater_stats.seqs + NumChanges,
1402           cleanup_time = NewStats#set_view_updater_stats.seqs + CleanupTime,
1403           cleanup_kv_count = NewStats#set_view_updater_stats.cleanup_kv_count + CleanupCount
1404        },
1405        Parent ! {Ref, NewGroup2, NewStats2, NewCompactFiles}
1406    end),
1407    UpdaterPid ! {native_updater_pid, Pid},
1408    Ref.
1409
1410% Update id btree and view btrees with current batch of changes
1411update_btrees(WriterAcc) ->
1412    #writer_acc{
1413        stats = Stats,
1414        group = Group0,
1415        tmp_dir = TmpDir,
1416        tmp_files = TmpFiles,
1417        compactor_running = CompactorRunning,
1418        max_insert_batch_size = MaxBatchSize
1419    } = WriterAcc,
1420
1421    % Prepare list of operation logs for each btree
1422    #set_view_tmp_file_info{name = IdFile} = dict:fetch(ids_index, TmpFiles),
1423    ViewFiles = lists:map(
1424        fun(#set_view{id_num = Id}) ->
1425            #set_view_tmp_file_info{
1426                name = ViewFile
1427            } = dict:fetch(Id, TmpFiles),
1428            ViewFile
1429        end, Group0#set_view_group.views),
1430    % `LogFiles` is supplied to the native updater. For spatial views only the
1431    % ID b-tree is updated.
1432    LogFiles = case Group0#set_view_group.mod of
1433    mapreduce_view ->
1434        [IdFile | ViewFiles];
1435    spatial_view ->
1436        [IdFile]
1437    end,
1438
1439    % Remove spatial views from group
1440    % The native updater can currently handle mapreduce views only, but it
1441    % handles the ID b-tree of the spatial view
1442    Group = couch_set_view_util:remove_group_views(Group0, spatial_view),
1443
1444    {ok, NewGroup0, Stats2} = couch_set_view_updater_helper:update_btrees(
1445        Group, TmpDir, LogFiles, MaxBatchSize, false),
1446    {IdsInserted, IdsDeleted, KVsInserted, KVsDeleted, CleanupCount} = Stats2,
1447
1448    % Add back spatial views
1449    NewGroup = couch_set_view_util:update_group_views(
1450        NewGroup0, Group0, spatial_view),
1451
1452    % The native update for the Id b-tree was run, now it's time to run the
1453    % Erlang updater for the spatial views
1454    NewGroup2 = case NewGroup#set_view_group.mod of
1455    mapreduce_view ->
1456        NewGroup;
1457    spatial_view = Mod ->
1458        ok = couch_file:refresh_eof(NewGroup#set_view_group.fd),
1459        Views = Mod:update_spatial(NewGroup#set_view_group.views, ViewFiles,
1460            MaxBatchSize),
1461        NewGroup#set_view_group{
1462            views = Views,
1463            index_header = (NewGroup#set_view_group.index_header)#set_view_index_header{
1464                view_states = [Mod:get_state(V#set_view.indexer) || V <- Views]
1465            }
1466        }
1467    end,
1468
1469    NewStats = Stats#set_view_updater_stats{
1470     inserted_ids = Stats#set_view_updater_stats.inserted_ids + IdsInserted,
1471     deleted_ids = Stats#set_view_updater_stats.deleted_ids + IdsDeleted,
1472     inserted_kvs = Stats#set_view_updater_stats.inserted_kvs + KVsInserted,
1473     deleted_kvs = Stats#set_view_updater_stats.deleted_kvs + KVsDeleted
1474    },
1475
1476    % Remove files if compactor is not running
1477    % Otherwise send them to compactor to apply deltas
1478    CompactFiles = lists:foldr(
1479        fun(SortedFile, AccCompactFiles) ->
1480            case CompactorRunning of
1481            true ->
1482                case filename:extension(SortedFile) of
1483                ".compact" ->
1484                     [SortedFile | AccCompactFiles];
1485                _ ->
1486                    SortedFile2 = new_sort_file_name(TmpDir, true),
1487                    ok = file2:rename(SortedFile, SortedFile2),
1488                    [SortedFile2 | AccCompactFiles]
1489                end;
1490            false ->
1491                ok = file2:delete(SortedFile),
1492                AccCompactFiles
1493            end
1494        end, [], [IdFile | ViewFiles]),
1495    {ok, NewGroup2, CleanupCount, NewStats, CompactFiles}.
1496
1497
1498update_seqs(PartIdSeqs, Seqs) ->
1499    orddict:fold(
1500        fun(PartId, NewSeq, Acc) ->
1501            OldSeq = couch_util:get_value(PartId, Acc, 0),
1502            case NewSeq > OldSeq of
1503            true ->
1504                ok;
1505            false ->
1506                exit({error, <<"New seq smaller or equal than old seq.">>, PartId, OldSeq, NewSeq})
1507            end,
1508            orddict:store(PartId, NewSeq, Acc)
1509        end,
1510        Seqs, PartIdSeqs).
1511
1512update_versions(PartVersions, AllPartVersions) ->
1513    lists:ukeymerge(1, PartVersions, AllPartVersions).
1514
1515
1516update_task(NumChanges) ->
1517    [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
1518    Changes2 = Changes + NumChanges,
1519    Total2 = erlang:max(Total, Changes2),
1520    Progress = (Changes2 * 100) div Total2,
1521    couch_task_status:update([
1522        {progress, Progress},
1523        {changes_done, Changes2},
1524        {total_changes, Total2}
1525    ]).
1526
1527
1528checkpoint(#writer_acc{owner = Owner, parent = Parent, group = Group} = Acc) ->
1529    #set_view_group{
1530        set_name = SetName,
1531        name = DDocId,
1532        type = Type
1533    } = Group,
1534    ?LOG_INFO("Updater checkpointing set view `~s` update for ~s group `~s`",
1535              [SetName, Type, DDocId]),
1536    NewGroup = maybe_fix_group(Group),
1537    ok = couch_file:refresh_eof(NewGroup#set_view_group.fd),
1538    Owner ! {partial_update, Parent, self(), NewGroup},
1539    receive
1540    update_processed ->
1541        ok;
1542    stop ->
1543        exit(shutdown)
1544    end,
1545    Acc#writer_acc{group = NewGroup}.
1546
1547
1548maybe_fix_group(#set_view_group{index_header = Header} = Group) ->
1549    receive
1550    {new_passive_partitions, Parts} ->
1551        Bitmask = couch_set_view_util:build_bitmask(Parts),
1552        {Seqs, PartVersions} = lists:foldl(
1553            fun(PartId, {SeqAcc, PartVersionsAcc} = Acc) ->
1554                case couch_set_view_util:has_part_seq(PartId, SeqAcc) of
1555                true ->
1556                    Acc;
1557                false ->
1558                    {ordsets:add_element({PartId, 0}, SeqAcc),
1559                        ordsets:add_element({PartId, [{0, 0}]},
1560                            PartVersionsAcc)}
1561                end
1562            end,
1563            {?set_seqs(Group), ?set_partition_versions(Group)}, Parts),
1564        Group#set_view_group{
1565            index_header = Header#set_view_index_header{
1566                seqs = Seqs,
1567                pbitmask = ?set_pbitmask(Group) bor Bitmask,
1568                partition_versions = PartVersions
1569            }
1570        }
1571    after 0 ->
1572        Group
1573    end.
1574
1575
1576check_if_compactor_started(#writer_acc{group = Group0} = Acc) ->
1577    receive
1578    {compactor_started, Pid} ->
1579        erlang:put(new_compactor, true),
1580        Group = maybe_fix_group(Group0),
1581        Pid ! {compactor_started_ack, self(), Group},
1582        Acc#writer_acc{compactor_running = true, group = Group}
1583    after 0 ->
1584        Acc
1585    end.
1586
1587
1588init_tmp_files(WriterAcc) ->
1589    #writer_acc{
1590        group = Group, initial_build = Init, tmp_dir = TmpDir
1591    } = WriterAcc,
1592    case WriterAcc#writer_acc.compactor_running of
1593    true ->
1594        ok = couch_set_view_util:delete_sort_files(TmpDir, updater);
1595    false ->
1596        ok = couch_set_view_util:delete_sort_files(TmpDir, all)
1597    end,
1598    Ids = [ids_index | [V#set_view.id_num || V <- Group#set_view_group.views]],
1599    Files = case Init of
1600    true ->
1601        [begin
1602             FileName = new_sort_file_name(WriterAcc),
1603             {ok, Fd} = file2:open(FileName, [raw, append, binary]),
1604             {Id, #set_view_tmp_file_info{fd = Fd, name = FileName}}
1605         end || Id <- Ids];
1606    false ->
1607         [{Id, #set_view_tmp_file_info{}} || Id <- Ids]
1608    end,
1609    WriterAcc#writer_acc{tmp_files = dict:from_list(Files)}.
1610
1611
1612new_sort_file_name(#writer_acc{tmp_dir = TmpDir, compactor_running = Cr}) ->
1613    new_sort_file_name(TmpDir, Cr).
1614
1615new_sort_file_name(TmpDir, true) ->
1616    couch_set_view_util:new_sort_file_path(TmpDir, compactor);
1617new_sort_file_name(TmpDir, false) ->
1618    couch_set_view_util:new_sort_file_path(TmpDir, updater).
1619
1620
1621make_back_index_key(DocId, PartId) ->
1622    <<PartId:16, DocId/binary>>.
1623
1624
1625count_seqs_done(Group, NewSeqs) ->
1626    % NewSeqs might have new passive partitions that Group's seqs doesn't
1627    % have yet (will get them after a checkpoint period).
1628    lists:foldl(
1629        fun({PartId, SeqDone}, Acc) ->
1630            SeqBefore = couch_util:get_value(PartId, ?set_seqs(Group), 0),
1631            Acc + (SeqDone - SeqBefore)
1632        end,
1633        0, NewSeqs).
1634
1635
1636close_tmp_fd(#set_view_tmp_file_info{fd = nil}) ->
1637    ok;
1638close_tmp_fd(#set_view_tmp_file_info{fd = Fd}) ->
1639    ok = file:close(Fd).
1640
1641
1642% DCP introduces the concept of snapshots, where a document mutation is only
1643% guaranteed to be unique within a single snapshot. But the flusher expects
1644% unique mutations within the full batch. Merge multiple snapshots (if there
1645% are any) into a single one. The latest mutation wins.
1646merge_snapshots(KVs) ->
1647    merge_snapshots(KVs, false, []).
1648
1649merge_snapshots([], true, Acc) ->
1650    {true, Acc};
1651merge_snapshots([], false, Acc) ->
1652    % The order of the KVs doesn't really matter, but having them sorted the
1653    % same way will make life easier when debugging
1654    {false, lists:reverse(Acc)};
1655merge_snapshots([snapshot_marker | KVs], _MultipleSnapshots, Acc) ->
1656    merge_snapshots(KVs, true, Acc);
1657merge_snapshots([{part_versions, _} = PartVersions | KVs], MultipleSnapshots, Acc) ->
1658    merge_snapshots(KVs, MultipleSnapshots, [PartVersions | Acc]);
1659merge_snapshots([KV | KVs], true, Acc0) ->
1660    {_Seq, DocId, _PartId, _QueryResults} = KV,
1661    Acc = lists:keystore(DocId, 2, Acc0, KV),
1662    merge_snapshots(KVs, true, Acc);
1663merge_snapshots([KV | KVs], false, Acc) ->
1664    merge_snapshots(KVs, false, [KV | Acc]).
1665