1% -*- Mode: Erlang; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3% Licensed under the Apache License, Version 2.0 (the "License"); you may not
4% use this file except in compliance with the License. You may obtain a copy of
5% the License at
6%
7%   http://www.apache.org/licenses/LICENSE-2.0
8%
9% Unless required by applicable law or agreed to in writing, software
10% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12% License for the specific language governing permissions and limitations under
13% the License.
14
15-module(couch_index_merger).
16
17-export([query_index/2, query_index/3]).
18
19% Only needed for indexer implementation. Those functions should perhaps go into
20% a utils module.
21% The functions dec_counter/1, should_check_rev/2 are also needed by this file
22-export([collect_rows/4, collect_row_count/6,
23    merge_indexes_no_acc/2, merge_indexes_no_limit/1, handle_skip/1,
24    dec_counter/1, void_event/1, should_check_rev/2,
25    ddoc_unchanged/2, ddoc_not_found_msg/2]).
26
27-include("couch_db.hrl").
28-include_lib("couch_index_merger/include/couch_index_merger.hrl").
29-include_lib("couch_index_merger/include/couch_view_merger.hrl").
30-include_lib("couch_set_view/include/couch_set_view.hrl").
31
32-import(couch_util, [
33    get_value/2,
34    to_binary/1
35]).
36
37-define(LOCAL, <<"local">>).
38
39-define(RETRY_INTERVAL, 1000).
40-define(MAX_RETRIES, 30).
41% Default timeout for the internal HTTP requests (during scatter phase)
42-define(DEFAULT_INTERNAL_HTTP_TIMEOUT, 60000).
43
44
45query_index(Mod, #index_merge{http_params = HttpParams, user_ctx = UserCtx} = IndexMergeParams) when HttpParams =/= nil, UserCtx =/= nil ->
46    #index_merge{
47        indexes = Indexes,
48        user_ctx = UserCtx
49    } = IndexMergeParams,
50    {ok, DDoc, IndexName} = get_first_ddoc(Indexes, UserCtx),
51    query_index_loop(Mod, IndexMergeParams, DDoc, IndexName, ?MAX_RETRIES).
52
53% Special and simpler case, trigger a lighter and faster code path.
54query_index(Mod, #index_merge{indexes = [#set_view_spec{}]} = Params0, Req) ->
55    #index_merge{
56        indexes = Indexes,
57        ddoc_revision = DesiredDDocRevision
58    } = Params0,
59    Params = case Req#httpd.method of
60    'GET' ->
61        Params0#index_merge{
62            start_timer = os:timestamp()
63        };
64    'POST' ->
65        Params0
66    end,
67    {ok, DDoc, _} = get_first_ddoc(Indexes, Req#httpd.user_ctx),
68    DDocRev = ddoc_rev(DDoc),
69    case should_check_rev(Params, DDoc) of
70    true ->
71        case DesiredDDocRevision of
72        auto ->
73            ok;
74        DDocRev ->
75            ok;
76        _ ->
77            ?LOG_ERROR("View merger, revision mismatch for design document `~s',"
78                       " revision on local node ~s, revision on remote node ~s",
79                       [?LOG_USERDATA(DDoc#doc.id),
80                        rev_str(DesiredDDocRevision),
81                        rev_str(DDocRev)]),
82            throw({error, revision_mismatch})
83        end;
84    false ->
85        ok
86    end,
87    Mod:simple_set_view_query(Params, DDoc, Req);
88
89query_index(Mod, IndexMergeParams0, #httpd{user_ctx = UserCtx} = Req) ->
90    #index_merge{
91        indexes = Indexes,
92        extra = Extra
93    } = IndexMergeParams0,
94    {ok, DDoc, IndexName} = get_first_ddoc(Indexes, UserCtx),
95    IndexMergeParams = IndexMergeParams0#index_merge{
96        start_timer = os:timestamp(),
97        user_ctx = UserCtx,
98        http_params = Mod:parse_http_params(Req, DDoc, IndexName, Extra)
99    },
100    query_index_loop(Mod, IndexMergeParams, DDoc, IndexName, ?MAX_RETRIES).
101
102
103query_index_loop(_Mod, _IndexMergeParams, _DDoc, _IndexName, 0) ->
104    throw({error, revision_sync_failed});
105query_index_loop(Mod, IndexMergeParams, DDoc, IndexName, N) ->
106    try
107        do_query_index(Mod, IndexMergeParams, DDoc, IndexName)
108    catch
109    throw:retry ->
110        timer:sleep(?RETRY_INTERVAL),
111        #index_merge{
112            indexes = Indexes,
113            user_ctx = UserCtx
114        } = IndexMergeParams,
115        {ok, DDoc2, IndexName} = get_first_ddoc(Indexes, UserCtx),
116        query_index_loop(Mod, IndexMergeParams, DDoc2, IndexName, N - 1)
117    end.
118
119
120do_query_index(Mod, IndexMergeParams, DDoc, IndexName) ->
121    #index_merge{
122       indexes = Indexes, callback = Callback, user_acc = UserAcc,
123       ddoc_revision = DesiredDDocRevision, user_ctx = UserCtx,
124       start_timer = StartTimer
125    } = IndexMergeParams,
126
127    DDocRev = ddoc_rev(DDoc),
128    case should_check_rev(IndexMergeParams, DDoc) of
129    true ->
130        case DesiredDDocRevision of
131        auto ->
132            ok;
133        DDocRev ->
134            ok;
135        _ ->
136            ?LOG_ERROR("View merger, revision mismatch for design document `~s',"
137                       " revision on local node ~s, revision on remote node ~s",
138                       [?LOG_USERDATA(DDoc#doc.id),
139                        rev_str(DesiredDDocRevision),
140                        rev_str(DDocRev)]),
141            throw({error, revision_mismatch})
142        end;
143    false ->
144        ok
145    end,
146
147    {LessFun, FoldFun, MergeFun, CollectorFun, Extra2} = Mod:make_funs(
148        DDoc, IndexName, IndexMergeParams),
149    NumFolders = length(Indexes),
150    QueueLessFun = fun
151        (set_view_outdated, _) ->
152            true;
153        (_, set_view_outdated) ->
154            false;
155        (revision_mismatch, _) ->
156            true;
157        (_, revision_mismatch) ->
158            false;
159        ({debug_info, _Url, _Info}, _) ->
160            true;
161        (_, {debug_info, _Url, _Info}) ->
162            false;
163        ({row_count, _}, _) ->
164            true;
165        (_, {row_count, _}) ->
166            false;
167        ({error, _Url, _Reason}, _) ->
168            true;
169        (_, {error, _Url, _Reason}) ->
170            false;
171        (RowA, RowB) ->
172            case LessFun of
173            nil ->
174                % That's where the actual less fun is. But as bounding box
175                % requests don't return a sorted order, we just return true
176                true;
177             _ ->
178                LessFun(RowA, RowB)
179            end
180    end,
181    % We want to trap exits to avoid this process (mochiweb worker) to die.
182    % If the mochiweb worker dies, the client will not get a response back.
183    % Link the queue to the folders, so that if one folder dies, all the others
184    % will be killed and not hang forever (mochiweb reuses workers for different
185    % requests).
186    TrapExitBefore = process_flag(trap_exit, true),
187    {ok, Queue} = couch_view_merger_queue:start_link(NumFolders, QueueLessFun),
188    Folders = lists:foldr(
189        fun(Index, Acc) ->
190            Pid = spawn_link(fun() ->
191                link(Queue),
192                index_folder(Mod, Index, IndexMergeParams, UserCtx, DDoc, Queue, FoldFun)
193            end),
194            [Pid | Acc]
195        end,
196        [], Indexes),
197    Collector = CollectorFun(NumFolders, Callback, UserAcc),
198    {Skip, Limit} = Mod:get_skip_and_limit(IndexMergeParams#index_merge.http_params),
199    MergeParams = #merge_params{
200        index_name = IndexName,
201        queue = Queue,
202        collector = Collector,
203        skip = Skip,
204        limit = Limit,
205        extra = Extra2
206    },
207    try
208        case MergeFun(MergeParams) of
209        set_view_outdated ->
210            throw({error, set_view_outdated});
211        revision_mismatch ->
212            case DesiredDDocRevision of
213            auto ->
214                throw(retry);
215            _ ->
216                ?LOG_ERROR("View merger, revision mismatch for design document `~s',"
217                           " revision on local node ~s, revision on remote node ~s",
218                           [?LOG_USERDATA(DDoc#doc.id),
219                            rev_str(DesiredDDocRevision),
220                            rev_str(DDocRev)]),
221                throw({error, revision_mismatch})
222            end;
223        {ok, Resp} ->
224            Resp;
225        {stop, Resp} ->
226            Resp
227        end
228    after
229        DDocId = DDoc#doc.id,
230        case StartTimer of
231        nil ->
232            start_timer_not_set;
233        _ ->
234            TimeElapsed = timer:now_diff(
235                os:timestamp(), StartTimer) / 1000,
236            couch_view_merger:update_timing_stat(
237                DDocId, IndexName, TimeElapsed)
238        end,
239        unlink(Queue),
240        erlang:erase(reduce_context),
241        lists:foreach(fun erlang:unlink/1, Folders),
242        % Important, shutdown the queue first. This ensures any blocked
243        % HTTP folders (bloked by queue calls) will get an error/exit and
244        % then stream all the remaining data from the socket, otherwise
245        % the socket can't be reused for future requests.
246        QRef = erlang:monitor(process, Queue),
247        exit(Queue, shutdown),
248        FolderRefs = lists:map(fun(Pid) ->
249                Ref = erlang:monitor(process, Pid),
250                exit(Pid, shutdown),
251                Ref
252            end, Folders),
253        lists:foreach(fun(Ref) ->
254                receive {'DOWN', Ref, _, _, _} -> ok end
255            end, [QRef | FolderRefs]),
256        Reason = clean_exit_messages(normal),
257        process_flag(trap_exit, TrapExitBefore),
258        case Reason of
259        normal ->
260            ok;
261        shutdown ->
262            ok;
263        _ ->
264            exit(Reason)
265        end
266    end.
267
268
269clean_exit_messages(FinalReason) ->
270    receive
271    {'EXIT', _Pid, normal} ->
272        clean_exit_messages(FinalReason);
273    {'EXIT', _Pid, shutdown} ->
274        clean_exit_messages(FinalReason);
275    {'EXIT', _Pid, Reason} ->
276        clean_exit_messages(Reason)
277    after 0 ->
278        FinalReason
279    end.
280
281
282get_first_ddoc([], _UserCtx) ->
283    throw({error, <<"A view spec can not consist of merges exclusively.">>});
284
285get_first_ddoc([#set_view_spec{} = Spec | _], _UserCtx) ->
286    #set_view_spec {
287        name = SetName, ddoc_id = Id, view_name = ViewName
288    } = Spec,
289
290    case couch_set_view_ddoc_cache:get_ddoc(SetName, Id) of
291    {ok, DDoc} ->
292        {ok, DDoc, ViewName};
293    {db_open_error, {not_found, _}} ->
294        throw({not_found, db_not_found_msg(?master_dbname(SetName))});
295    {db_open_error, Error} ->
296        throw(Error);
297    {doc_open_error, {not_found, _}} ->
298        throw({not_found, ddoc_not_found_msg(?master_dbname(SetName), Id)})
299    end;
300
301get_first_ddoc([_MergeSpec | Rest], UserCtx) ->
302    get_first_ddoc(Rest, UserCtx).
303
304
305open_db(<<"http://", _/binary>> = DbName, _UserCtx, Timeout) ->
306    HttpDb = #httpdb{
307        url = maybe_add_trailing_slash(DbName),
308        timeout = Timeout
309    },
310    {ok, HttpDb#httpdb{lhttpc_options = lhttpc_options(HttpDb)}};
311open_db(<<"https://", _/binary>> = DbName, _UserCtx, Timeout) ->
312    HttpDb = #httpdb{
313        url = maybe_add_trailing_slash(DbName),
314        timeout = Timeout
315    },
316    {ok, HttpDb#httpdb{lhttpc_options = lhttpc_options(HttpDb)}};
317open_db(DbName, UserCtx, _Timeout) ->
318    case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
319    {ok, _} = Ok ->
320        Ok;
321    {not_found, _} ->
322        throw({not_found, db_not_found_msg(DbName)});
323    Error ->
324        throw(Error)
325    end.
326
327maybe_add_trailing_slash(Url) when is_binary(Url) ->
328    maybe_add_trailing_slash(?b2l(Url));
329maybe_add_trailing_slash(Url) ->
330    case lists:last(Url) of
331    $/ ->
332        Url;
333    _ ->
334        Url ++ "/"
335    end.
336
337get_ddoc(#httpdb{} = HttpDb, Id) ->
338    #httpdb{
339        url = BaseUrl,
340        headers = Headers,
341        timeout = Timeout,
342        lhttpc_options = Options
343    } = HttpDb,
344    Url = BaseUrl ++ ?b2l(Id),
345    case lhttpc:request(Url, "GET", Headers, [], Timeout, Options) of
346    {ok, {{200, _}, _RespHeaders, Body}} ->
347        Doc = couch_doc:from_json_obj({[{<<"meta">>, {[{<<"id">>,Id}]}},
348                {<<"json">>,?JSON_DECODE(Body)}]}),
349        {ok, couch_doc:with_ejson_body(Doc)};
350    {ok, {{_Code, _}, _RespHeaders, Body}} ->
351        {Props} = ?JSON_DECODE(Body),
352        case {get_value(<<"error">>, Props), get_value(<<"reason">>, Props)} of
353        {not_found, _} ->
354            throw({not_found, ddoc_not_found_msg(HttpDb, Id)});
355        Error ->
356            Msg = io_lib:format("Error getting design document `~s` from "
357                "database `~s`: ~s", [Id, db_uri(HttpDb), Error]),
358            throw({error, iolist_to_binary(Msg)})
359        end;
360    {error, Error} ->
361        Msg = io_lib:format("Error getting design document `~s` from database "
362            "`~s`: ~s", [Id, db_uri(HttpDb), to_binary(Error)]),
363        throw({error, iolist_to_binary(Msg)})
364    end;
365get_ddoc(Db, Id) ->
366    case couch_db:open_doc(Db, Id, [ejson_body]) of
367    {ok, _} = Ok ->
368        Ok;
369    {not_found, _} ->
370        throw({not_found, ddoc_not_found_msg(Db#db.name, Id)})
371    end.
372
373
374db_uri(#httpdb{url = Url}) ->
375    db_uri(Url);
376db_uri(#db{name = Name}) ->
377    Name;
378db_uri(Url) when is_binary(Url) ->
379    ?l2b(couch_util:url_strip_password(Url)).
380
381
382db_not_found_msg(DbName) ->
383    iolist_to_binary(io_lib:format(
384        "Database `~s` doesn't exist.", [db_uri(DbName)])).
385
386ddoc_not_found_msg(DbName, DDocId) ->
387    Msg = io_lib:format(
388        "Design document `~s` missing in database `~s`.",
389        [DDocId, db_uri(DbName)]),
390    iolist_to_binary(Msg).
391
392
393lhttpc_options(#httpdb{timeout = T}) ->
394    % TODO: add SSL options like verify and cacertfile, which should
395    % configurable somewhere.
396    [
397        {connect_timeout, T},
398        {connect_options, [{keepalive, true}, {nodelay, true}]},
399        {pool, whereis(couch_index_merger_connection_pool)}
400    ].
401
402
403collect_row_count(RecvCount, AccCount, PreprocessFun, Callback, UserAcc, Item) ->
404    case Item of
405    {error, _DbUrl, _Reason} = Error ->
406        case Callback(Error, UserAcc) of
407        {stop, Resp} ->
408            {stop, Resp};
409        {ok, UserAcc2} ->
410            case RecvCount > 1 of
411            false ->
412                {ok, UserAcc3} = Callback({start, AccCount}, UserAcc2),
413                {ok, fun (Item2) ->
414                    collect_rows(
415                        PreprocessFun, Callback, UserAcc3, Item2)
416                end};
417            true ->
418                {ok, fun (Item2) ->
419                    collect_row_count(
420                        RecvCount - 1, AccCount, PreprocessFun, Callback,
421                        UserAcc2, Item2)
422                end}
423            end
424        end;
425    {row_count, Count} ->
426        AccCount2 = AccCount + Count,
427        case RecvCount > 1 of
428        false ->
429            % TODO: what about offset and update_seq?
430            % TODO: maybe add etag like for regular views? How to
431            %       compute them?
432            {ok, UserAcc2} = Callback({start, AccCount2}, UserAcc),
433            {ok, fun (Item2) ->
434                collect_rows(PreprocessFun, Callback, UserAcc2, Item2)
435            end};
436        true ->
437            {ok, fun (Item2) ->
438                collect_row_count(
439                    RecvCount - 1, AccCount2, PreprocessFun, Callback, UserAcc, Item2)
440            end}
441        end;
442    {debug_info, _From, _Info} = DebugInfo ->
443        {ok, UserAcc2} = Callback(DebugInfo, UserAcc),
444        {ok, fun (Item2) ->
445            collect_row_count(RecvCount, AccCount, PreprocessFun, Callback, UserAcc2, Item2)
446        end};
447    stop ->
448        {_, UserAcc2} = Callback(stop, UserAcc),
449        {stop, UserAcc2}
450    end.
451
452% PreprocessFun is called on every row (which comes from the fold function
453% of the underlying data structure) before it gets passed into the Callback
454% function
455collect_rows(PreprocessFun, Callback, UserAcc, Item) ->
456    case Item of
457    {error, _DbUrl, _Reason} = Error ->
458        case Callback(Error, UserAcc) of
459        {stop, Resp} ->
460            {stop, Resp};
461        {ok, UserAcc2} ->
462            {ok, fun (Item2) ->
463                collect_rows(PreprocessFun, Callback, UserAcc2, Item2)
464            end}
465        end;
466    {row, Row} ->
467        RowEJson = PreprocessFun(Row),
468        {ok, UserAcc2} = Callback({row, RowEJson}, UserAcc),
469        {ok, fun (Item2) ->
470            collect_rows(PreprocessFun, Callback, UserAcc2, Item2)
471        end};
472    {debug_info, _From, _Info} = DebugInfo ->
473        {ok, UserAcc2} = Callback(DebugInfo, UserAcc),
474        {ok, fun (Item2) ->
475            collect_rows(PreprocessFun, Callback, UserAcc2, Item2)
476        end};
477    stop ->
478        {ok, UserAcc2} = Callback(stop, UserAcc),
479        {stop, UserAcc2}
480    end.
481
482merge_indexes_common(Params, RowFun) ->
483    #merge_params{
484        queue = Queue, collector = Col
485    } = Params,
486    case couch_view_merger_queue:pop(Queue) of
487    closed ->
488        {stop, Resp} = Col(stop),
489        {ok, Resp};
490    {ok, {debug_info, _From, _Info} = DebugInfo} ->
491        ok = couch_view_merger_queue:flush(Queue),
492        {ok, Col2} = Col(DebugInfo),
493        merge_indexes_common(Params#merge_params{collector = Col2}, RowFun);
494    {ok, revision_mismatch} ->
495        revision_mismatch;
496    {ok, set_view_outdated} ->
497        set_view_outdated;
498    {ok, {error, _Url, _Reason} = Error} ->
499        ok = couch_view_merger_queue:flush(Queue),
500        case Col(Error) of
501        {ok, Col2} ->
502            merge_indexes_common(Params#merge_params{collector = Col2}, RowFun);
503        {stop, Resp} ->
504            {stop, Resp}
505        end;
506    {ok, {row_count, _} = RowCount} ->
507        ok = couch_view_merger_queue:flush(Queue),
508        {ok, Col2} = Col(RowCount),
509        merge_indexes_common(Params#merge_params{collector = Col2}, RowFun);
510    {ok, MinRow} ->
511        RowFun(Params, MinRow)
512    end.
513
514merge_indexes_no_limit(Params) ->
515    merge_indexes_common(
516      Params,
517      fun (#merge_params{collector=Col}, _MinRow) ->
518          Col(stop)
519      end).
520
521% Simple case when there are no (or we don't care about) accumulated rows
522% MinRowFun is a function that it called if the
523% couch_view_merger_queue returns a row that is neither an error, nor a count.
524merge_indexes_no_acc(Params, MinRowFun) ->
525    merge_indexes_common(
526      Params,
527      fun (AccParams, MinRow) ->
528          AccParams2 = MinRowFun(AccParams, MinRow),
529          {params, AccParams2}
530      end).
531
532handle_skip(Params) ->
533    #merge_params{
534        limit = Limit, skip = Skip, collector = Col,
535        row_acc = [RowToSend | Rest]
536    } = Params,
537    case Skip > 0 of
538    true ->
539        Limit2 = Limit,
540        Col2 = Col;
541    false ->
542        {ok, Col2} = Col({row, RowToSend}),
543        Limit2 = dec_counter(Limit)
544    end,
545    Params#merge_params{
546        skip = dec_counter(Skip), limit = Limit2, row_acc = Rest,
547        collector = Col2
548    }.
549
550dec_counter(0) -> 0;
551dec_counter(N) -> N - 1.
552
553
554index_folder(Mod, #merged_index_spec{} = IndexSpec,
555        MergeParams0, _UserCtx, DDoc, Queue, _FoldFun) ->
556    MergeParams = case MergeParams0#index_merge.conn_timeout of
557    nil ->
558        MergeParams0#index_merge{
559              conn_timeout = ?DEFAULT_INTERNAL_HTTP_TIMEOUT
560        };
561    _ ->
562        MergeParams0
563    end,
564    http_index_folder(Mod, IndexSpec, MergeParams, DDoc, Queue);
565
566index_folder(_Mod, #set_view_spec{} = ViewSpec, MergeParams,
567        UserCtx, DDoc, Queue, FoldFun) ->
568    FoldFun(nil, ViewSpec, MergeParams, UserCtx, DDoc, Queue).
569
570
571% Fold function for remote indexes
572http_index_folder(Mod, IndexSpec, MergeParams, DDoc, Queue) ->
573    % Trap exits, so that when we receive a shutdown message from the parent,
574    % or an error/exit when queing an item/error, we get all the remaining data
575    % from the socket - this is required in order to ensure the connection can
576    % be reused for other requests and for lhttpc to handle the socket back to
577    % connection pool.
578    process_flag(trap_exit, true),
579    try
580        run_http_index_folder(Mod, IndexSpec, MergeParams, DDoc, Queue)
581    catch
582    throw:queue_shutdown ->
583        ok
584    after
585        Streamer = get(streamer_pid),
586        case is_pid(Streamer) andalso is_process_alive(Streamer) of
587        true ->
588            catch empty_socket(Streamer, MergeParams#index_merge.conn_timeout);
589        false ->
590            ok
591        end
592    end.
593
594get_node(Url) ->
595    {_, Loc, _, _, _} = mochiweb_util:urlsplit(Url),
596    case string:tokens(Loc, [$@]) of
597        [_, L] ->
598            L;
599        [L] ->
600            L
601    end.
602
603run_http_index_folder(Mod, IndexSpec, MergeParams, DDoc, Queue) ->
604    {Url, Method, Headers, Body, BaseOptions} =
605        http_index_folder_req_details(Mod, IndexSpec, MergeParams, DDoc),
606    #index_merge{
607        conn_timeout = Timeout
608    } = MergeParams,
609    LhttpcOptions = [{partial_download, [{window_size, 3}]} | BaseOptions],
610
611    case lhttpc:request(Url, Method, Headers, Body, Timeout, LhttpcOptions) of
612    {ok, {{200, _}, _RespHeaders, Pid}} when is_pid(Pid) ->
613        put(streamer_pid, Pid),
614        try
615            case (element(1, os:type()) =:= win32) orelse
616                    (Mod =/= couch_view_merger) of
617            true ->
618                % TODO: make couch_view_parser build and run on Windows
619                % TODO: make couch_view_parser work with spatial views
620                EventFun = Mod:make_event_fun(MergeParams#index_merge.http_params, Queue),
621                DataFun = fun() -> stream_data(Pid, Timeout) end,
622                json_stream_parse:events(DataFun, EventFun);
623            false ->
624                DataFun = fun() -> next_chunk(Pid, Timeout) end,
625                ok = couch_http_view_streamer:parse(DataFun, Queue, get(from_url))
626            end
627        catch throw:{error, Error} ->
628            ok = couch_view_merger_queue:queue(Queue, {error, get_node(Url), Error})
629        after
630            ok = couch_view_merger_queue:done(Queue)
631        end;
632    {ok, {{Code, _}, _RespHeaders, Pid}} when is_pid(Pid) ->
633        put(streamer_pid, Pid),
634        Error = try
635            stream_all(Pid, Timeout, [])
636        catch throw:{error, _Error} ->
637            <<"Error code ", (?l2b(integer_to_list(Code)))/binary>>
638        end,
639        case (catch ?JSON_DECODE(Error)) of
640        {Props} when is_list(Props) ->
641            case {get_value(<<"error">>, Props), get_value(<<"reason">>, Props)} of
642            {<<"not_found">>, Reason} when Reason =/= <<"missing">>, Reason =/= <<"deleted">> ->
643                ok = couch_view_merger_queue:queue(Queue, {error, get_node(Url), Reason});
644            {<<"not_found">>, _} ->
645                ok = couch_view_merger_queue:queue(Queue, {error, get_node(Url), <<"not_found">>});
646            {<<"error">>, <<"revision_mismatch">>} ->
647                ok = couch_view_merger_queue:queue(Queue, revision_mismatch);
648            {<<"error">>, <<"set_view_outdated">>} ->
649                ok = couch_view_merger_queue:queue(Queue, set_view_outdated);
650            {<<"error">>, Reason} when is_binary(Reason) ->
651                ok = couch_view_merger_queue:queue(Queue, {error, get_node(Url), Reason});
652            ErrorTuple ->
653                ok = couch_view_merger_queue:queue(Queue, {error, get_node(Url), to_binary(ErrorTuple)})
654            end;
655        _ ->
656            ok = couch_view_merger_queue:queue(Queue, {error, get_node(Url), to_binary(Error)})
657        end,
658        ok = couch_view_merger_queue:done(Queue);
659    {error, Error} ->
660        ok = couch_view_merger_queue:queue(Queue, {error, get_node(Url), Error}),
661        ok = couch_view_merger_queue:done(Queue)
662    end.
663
664
665http_index_folder_req_details(Mod, IndexSpec, MergeParams, DDoc) ->
666    #merged_index_spec{
667        url = MergeUrl0,
668        ejson_spec = {EJson}
669    } = IndexSpec,
670    #index_merge{
671        conn_timeout = Timeout,
672        http_params = ViewArgs,
673        extra = Extra
674    } = MergeParams,
675    {ok, HttpDb} = open_db(MergeUrl0, nil, Timeout),
676    #httpdb{
677        url = Url,
678        lhttpc_options = Options,
679        headers = Headers
680    } = HttpDb,
681
682    MergeUrl = Url ++ Mod:view_qs(ViewArgs, MergeParams),
683    EJson1 = Mod:process_extra_params(Extra, EJson),
684
685    EJson2 = case couch_index_merger:should_check_rev(MergeParams, DDoc) of
686    true ->
687        P = fun (Tuple) -> element(1, Tuple) =/= <<"ddoc_revision">> end,
688        [{<<"ddoc_revision">>, ddoc_rev_str(DDoc)} |
689            lists:filter(P, EJson1)];
690    false ->
691        EJson1
692    end,
693
694    Body = {EJson2},
695    put(from_url, ?l2b(Url)),
696    {MergeUrl, "POST", Headers, ?JSON_ENCODE(Body), Options}.
697
698
699stream_data(Pid, Timeout) ->
700    case lhttpc:get_body_part(Pid, Timeout) of
701    {ok, {http_eob, _Trailers}} ->
702         {<<>>, fun() -> throw({error, <<"more view data expected">>}) end};
703    {ok, Data} ->
704         {Data, fun() -> stream_data(Pid, Timeout) end};
705    {error, _} = Error ->
706         throw(Error)
707    end.
708
709
710next_chunk(Pid, Timeout) ->
711    case lhttpc:get_body_part(Pid, Timeout) of
712    {ok, {http_eob, _Trailers}} ->
713         eof;
714    {ok, _Data} = Ok ->
715         Ok;
716    {error, _} = Error ->
717         throw(Error)
718    end.
719
720
721stream_all(Pid, Timeout, Acc) ->
722    case stream_data(Pid, Timeout) of
723    {<<>>, _} ->
724        iolist_to_binary(lists:reverse(Acc));
725    {Data, _} ->
726        stream_all(Pid, Timeout, [Data | Acc])
727    end.
728
729
730empty_socket(Pid, Timeout) ->
731    case stream_data(Pid, Timeout) of
732    {<<>>, _} ->
733        ok;
734    {_Data, _} ->
735        empty_socket(Pid, Timeout)
736    end.
737
738
739void_event(_Ev) ->
740    fun void_event/1.
741
742ddoc_rev(nil) ->
743    nil;
744ddoc_rev(#doc{rev = Rev}) ->
745    Rev.
746
747ddoc_rev_str(DDoc) ->
748    rev_str(ddoc_rev(DDoc)).
749
750should_check_rev(#index_merge{ddoc_revision = DDocRevision}, DDoc) ->
751    DDocRevision =/= nil andalso DDoc =/= nil.
752
753rev_str(nil) ->
754    "nil";
755rev_str(auto) ->
756    "auto";
757rev_str(DocRev) ->
758    couch_doc:rev_to_str(DocRev).
759
760ddoc_unchanged(DbName, DDoc) when is_binary(DbName) ->
761    case couch_db:open_int(DbName, []) of
762    {ok, Db} ->
763        try
764            DDocId = DDoc#doc.id,
765            {ok, MaybeUpdatedDDoc} = get_ddoc(Db, DDocId),
766            ddoc_rev(DDoc) =:= ddoc_rev(MaybeUpdatedDDoc)
767        after
768            couch_db:close(Db)
769        end;
770    {not_found, _} ->
771        throw(ddoc_db_not_found)
772    end;
773ddoc_unchanged(Db, DDoc) ->
774    DbName = couch_db:name(Db),
775    case couch_db:open_int(DbName, []) of
776    {ok, Db1} ->
777        try
778            case couch_db:get_update_seq(Db) =:= couch_db:get_update_seq(Db1) of
779            true ->
780                %% nothing changed
781                true;
782            false ->
783                %% design document may have changed
784                DDocId = DDoc#doc.id,
785                {ok, MaybeUpdatedDDoc} = get_ddoc(Db1, DDocId),
786                ddoc_rev(DDoc) =:= ddoc_rev(MaybeUpdatedDDoc)
787            end
788        after
789            couch_db:close(Db1)
790        end;
791    {not_found, _} ->
792        throw(ddoc_db_not_found)
793    end.
794