1% -*- Mode: Erlang; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3% Licensed under the Apache License, Version 2.0 (the "License"); you may not
4% use this file except in compliance with the License. You may obtain a copy of
5% the License at
6%
7%   http://www.apache.org/licenses/LICENSE-2.0
8%
9% Unless required by applicable law or agreed to in writing, software
10% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12% License for the specific language governing permissions and limitations under
13% the License.
14
15-module(couch_index_merger).
16
17-export([query_index/2, query_index/3]).
18
19% Only needed for indexer implementation. Those functions should perhaps go into
20% a utils module.
21% The functions dec_counter/1, should_check_rev/2 are also needed by this file
22-export([collect_rows/4, collect_row_count/6,
23    merge_indexes_no_acc/2, merge_indexes_no_limit/1, handle_skip/1,
24    dec_counter/1, void_event/1, should_check_rev/2,
25    ddoc_unchanged/2, ddoc_not_found_msg/2]).
26
27-include("couch_db.hrl").
28-include_lib("couch_index_merger/include/couch_index_merger.hrl").
29-include_lib("couch_index_merger/include/couch_view_merger.hrl").
30-include_lib("couch_set_view/include/couch_set_view.hrl").
31
32-import(couch_util, [
33    get_value/2,
34    to_binary/1
35]).
36
37-define(LOCAL, <<"local">>).
38
39-define(RETRY_INTERVAL, 1000).
40-define(MAX_RETRIES, 30).
41% Default timeout for the internal HTTP requests (during scatter phase)
42-define(DEFAULT_INTERNAL_HTTP_TIMEOUT, 60000).
43
44
45query_index(Mod, #index_merge{http_params = HttpParams, user_ctx = UserCtx} = IndexMergeParams) when HttpParams =/= nil, UserCtx =/= nil ->
46    #index_merge{
47        indexes = Indexes,
48        user_ctx = UserCtx
49    } = IndexMergeParams,
50    {ok, DDoc, IndexName} = get_first_ddoc(Indexes, UserCtx),
51    query_index_loop(Mod, IndexMergeParams, DDoc, IndexName, ?MAX_RETRIES).
52
53% Special and simpler case, trigger a lighter and faster code path.
54query_index(Mod, #index_merge{indexes = [#set_view_spec{}]} = Params0, Req) ->
55    #index_merge{
56        indexes = Indexes,
57        ddoc_revision = DesiredDDocRevision,
58        conn_timeout = Timeout
59    } = Params0,
60    {ok, DDoc, _} = get_first_ddoc(Indexes, Req#httpd.user_ctx),
61    case Req#httpd.method of
62    'GET' ->
63        Params = Params0#index_merge{
64            start_timer = os:timestamp()
65        };
66    'POST' ->
67        Params = Params0,
68        % Force close the socket conservatively if we
69        % do not reply in the stipulated time period
70        try
71            case Timeout =/= nil of
72            true ->
73                {ok, TRef} = timer:kill_after(Timeout),
74                % The query executes in the same process
75                put(tref, TRef);
76            false ->
77                ok
78            end
79        catch
80            Error ->
81                ?LOG_ERROR("Could not enable socket watchdog "
82                           "for design document `~s` err:  ~p",
83                           [?LOG_USERDATA(DDoc#doc.id), ?LOG_USERDATA(Error)])
84        end
85    end,
86    DDocRev = ddoc_rev(DDoc),
87    case should_check_rev(Params, DDoc) of
88    true ->
89        case DesiredDDocRevision of
90        auto ->
91            ok;
92        DDocRev ->
93            ok;
94        _ ->
95            ?LOG_ERROR("View merger, revision mismatch for design document `~s',"
96                       " revision on local node ~s, revision on remote node ~s",
97                       [?LOG_USERDATA(DDoc#doc.id),
98                        rev_str(DesiredDDocRevision),
99                        rev_str(DDocRev)]),
100            throw({error, revision_mismatch})
101        end;
102    false ->
103        ok
104    end,
105    Mod:simple_set_view_query(Params, DDoc, Req);
106
107query_index(Mod, IndexMergeParams0, #httpd{user_ctx = UserCtx} = Req) ->
108    #index_merge{
109        indexes = Indexes,
110        extra = Extra
111    } = IndexMergeParams0,
112    {ok, DDoc, IndexName} = get_first_ddoc(Indexes, UserCtx),
113    IndexMergeParams = IndexMergeParams0#index_merge{
114        start_timer = os:timestamp(),
115        user_ctx = UserCtx,
116        http_params = Mod:parse_http_params(Req, DDoc, IndexName, Extra)
117    },
118    query_index_loop(Mod, IndexMergeParams, DDoc, IndexName, ?MAX_RETRIES).
119
120
121query_index_loop(_Mod, _IndexMergeParams, _DDoc, _IndexName, 0) ->
122    throw({error, revision_sync_failed});
123query_index_loop(Mod, IndexMergeParams, DDoc, IndexName, N) ->
124    try
125        do_query_index(Mod, IndexMergeParams, DDoc, IndexName)
126    catch
127    throw:retry ->
128        timer:sleep(?RETRY_INTERVAL),
129        #index_merge{
130            indexes = Indexes,
131            user_ctx = UserCtx
132        } = IndexMergeParams,
133        {ok, DDoc2, IndexName} = get_first_ddoc(Indexes, UserCtx),
134        query_index_loop(Mod, IndexMergeParams, DDoc2, IndexName, N - 1)
135    end.
136
137
138do_query_index(Mod, IndexMergeParams, DDoc, IndexName) ->
139    #index_merge{
140       indexes = Indexes, callback = Callback, user_acc = UserAcc,
141       ddoc_revision = DesiredDDocRevision, user_ctx = UserCtx,
142       start_timer = StartTimer
143    } = IndexMergeParams,
144
145    DDocRev = ddoc_rev(DDoc),
146    case should_check_rev(IndexMergeParams, DDoc) of
147    true ->
148        case DesiredDDocRevision of
149        auto ->
150            ok;
151        DDocRev ->
152            ok;
153        _ ->
154            ?LOG_ERROR("View merger, revision mismatch for design document `~s',"
155                       " revision on local node ~s, revision on remote node ~s",
156                       [?LOG_USERDATA(DDoc#doc.id),
157                        rev_str(DesiredDDocRevision),
158                        rev_str(DDocRev)]),
159            throw({error, revision_mismatch})
160        end;
161    false ->
162        ok
163    end,
164
165    {LessFun, FoldFun, MergeFun, CollectorFun, Extra2} = Mod:make_funs(
166        DDoc, IndexName, IndexMergeParams),
167    NumFolders = length(Indexes),
168    QueueLessFun = fun
169        (set_view_outdated, _) ->
170            true;
171        (_, set_view_outdated) ->
172            false;
173        (revision_mismatch, _) ->
174            true;
175        (_, revision_mismatch) ->
176            false;
177        ({debug_info, _Url, _Info}, _) ->
178            true;
179        (_, {debug_info, _Url, _Info}) ->
180            false;
181        ({row_count, _}, _) ->
182            true;
183        (_, {row_count, _}) ->
184            false;
185        ({error, _Url, _Reason}, _) ->
186            true;
187        (_, {error, _Url, _Reason}) ->
188            false;
189        (RowA, RowB) ->
190            case LessFun of
191            nil ->
192                % That's where the actual less fun is. But as bounding box
193                % requests don't return a sorted order, we just return true
194                true;
195             _ ->
196                LessFun(RowA, RowB)
197            end
198    end,
199    % We want to trap exits to avoid this process (mochiweb worker) to die.
200    % If the mochiweb worker dies, the client will not get a response back.
201    % Link the queue to the folders, so that if one folder dies, all the others
202    % will be killed and not hang forever (mochiweb reuses workers for different
203    % requests).
204    TrapExitBefore = process_flag(trap_exit, true),
205    {ok, Queue} = couch_view_merger_queue:start_link(NumFolders, QueueLessFun),
206    Folders = lists:foldr(
207        fun(Index, Acc) ->
208            Pid = spawn_link(fun() ->
209                link(Queue),
210                index_folder(Mod, Index, IndexMergeParams, UserCtx, DDoc, Queue, FoldFun)
211            end),
212            [Pid | Acc]
213        end,
214        [], Indexes),
215    Collector = CollectorFun(NumFolders, Callback, UserAcc),
216    {Skip, Limit} = Mod:get_skip_and_limit(IndexMergeParams#index_merge.http_params),
217    MergeParams = #merge_params{
218        index_name = IndexName,
219        queue = Queue,
220        collector = Collector,
221        skip = Skip,
222        limit = Limit,
223        extra = Extra2
224    },
225    try
226        case MergeFun(MergeParams) of
227        set_view_outdated ->
228            throw({error, set_view_outdated});
229        revision_mismatch ->
230            case DesiredDDocRevision of
231            auto ->
232                throw(retry);
233            _ ->
234                ?LOG_ERROR("View merger, revision mismatch for design document `~s',"
235                           " revision on local node ~s, revision on remote node ~s",
236                           [?LOG_USERDATA(DDoc#doc.id),
237                            rev_str(DesiredDDocRevision),
238                            rev_str(DDocRev)]),
239                throw({error, revision_mismatch})
240            end;
241        {ok, Resp} ->
242            Resp;
243        {stop, Resp} ->
244            Resp
245        end
246    after
247        DDocId = DDoc#doc.id,
248        case StartTimer of
249        nil ->
250            start_timer_not_set;
251        _ ->
252            TimeElapsed = timer:now_diff(
253                os:timestamp(), StartTimer) / 1000,
254            couch_view_merger:update_timing_stat(
255                DDocId, IndexName, TimeElapsed)
256        end,
257        unlink(Queue),
258        erlang:erase(reduce_context),
259        lists:foreach(fun erlang:unlink/1, Folders),
260        % Important, shutdown the queue first. This ensures any blocked
261        % HTTP folders (bloked by queue calls) will get an error/exit and
262        % then stream all the remaining data from the socket, otherwise
263        % the socket can't be reused for future requests.
264        QRef = erlang:monitor(process, Queue),
265        exit(Queue, shutdown),
266        FolderRefs = lists:map(fun(Pid) ->
267                Ref = erlang:monitor(process, Pid),
268                exit(Pid, shutdown),
269                Ref
270            end, Folders),
271        lists:foreach(fun(Ref) ->
272                receive {'DOWN', Ref, _, _, _} -> ok end
273            end, [QRef | FolderRefs]),
274        Reason = clean_exit_messages(normal),
275        process_flag(trap_exit, TrapExitBefore),
276        case Reason of
277        normal ->
278            ok;
279        shutdown ->
280            ok;
281        _ ->
282            exit(Reason)
283        end
284    end.
285
286
287clean_exit_messages(FinalReason) ->
288    receive
289    {'EXIT', _Pid, normal} ->
290        clean_exit_messages(FinalReason);
291    {'EXIT', _Pid, shutdown} ->
292        clean_exit_messages(FinalReason);
293    {'EXIT', _Pid, Reason} ->
294        clean_exit_messages(Reason)
295    after 0 ->
296        FinalReason
297    end.
298
299
300get_first_ddoc([], _UserCtx) ->
301    throw({error, <<"A view spec can not consist of merges exclusively.">>});
302
303get_first_ddoc([#set_view_spec{} = Spec | _], _UserCtx) ->
304    #set_view_spec {
305        name = SetName, ddoc_id = Id, view_name = ViewName
306    } = Spec,
307
308    case couch_set_view_ddoc_cache:get_ddoc(SetName, Id) of
309    {ok, DDoc} ->
310        {ok, DDoc, ViewName};
311    {db_open_error, {not_found, _}} ->
312        throw({not_found, db_not_found_msg(?master_dbname(SetName))});
313    {db_open_error, Error} ->
314        throw(Error);
315    {doc_open_error, {not_found, _}} ->
316        throw({not_found, ddoc_not_found_msg(?master_dbname(SetName), Id)})
317    end;
318
319get_first_ddoc([_MergeSpec | Rest], UserCtx) ->
320    get_first_ddoc(Rest, UserCtx).
321
322
323open_db(<<"http://", _/binary>> = DbName, _UserCtx, Timeout) ->
324    HttpDb = #httpdb{
325        url = maybe_add_trailing_slash(DbName),
326        timeout = Timeout
327    },
328    {ok, HttpDb#httpdb{lhttpc_options = lhttpc_options(HttpDb)}};
329open_db(<<"https://", _/binary>> = DbName, _UserCtx, Timeout) ->
330    HttpDb = #httpdb{
331        url = maybe_add_trailing_slash(DbName),
332        timeout = Timeout
333    },
334    {ok, HttpDb#httpdb{lhttpc_options = lhttpc_options(HttpDb)}};
335open_db(DbName, UserCtx, _Timeout) ->
336    case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
337    {ok, _} = Ok ->
338        Ok;
339    {not_found, _} ->
340        throw({not_found, db_not_found_msg(DbName)});
341    Error ->
342        throw(Error)
343    end.
344
345maybe_add_trailing_slash(Url) when is_binary(Url) ->
346    maybe_add_trailing_slash(?b2l(Url));
347maybe_add_trailing_slash(Url) ->
348    case lists:last(Url) of
349    $/ ->
350        Url;
351    _ ->
352        Url ++ "/"
353    end.
354
355get_ddoc(#httpdb{} = HttpDb, Id) ->
356    #httpdb{
357        url = BaseUrl,
358        headers = Headers,
359        timeout = Timeout,
360        lhttpc_options = Options
361    } = HttpDb,
362    Url = BaseUrl ++ ?b2l(Id),
363    case lhttpc:request(Url, "GET", Headers, [], Timeout, Options) of
364    {ok, {{200, _}, _RespHeaders, Body}} ->
365        Doc = couch_doc:from_json_obj({[{<<"meta">>, {[{<<"id">>,Id}]}},
366                {<<"json">>,?JSON_DECODE(Body)}]}),
367        {ok, couch_doc:with_ejson_body(Doc)};
368    {ok, {{_Code, _}, _RespHeaders, Body}} ->
369        {Props} = ?JSON_DECODE(Body),
370        case {get_value(<<"error">>, Props), get_value(<<"reason">>, Props)} of
371        {not_found, _} ->
372            throw({not_found, ddoc_not_found_msg(HttpDb, Id)});
373        Error ->
374            Msg = io_lib:format("Error getting design document `~s` from "
375                "database `~s`: ~s", [Id, db_uri(HttpDb), Error]),
376            throw({error, iolist_to_binary(Msg)})
377        end;
378    {error, Error} ->
379        Msg = io_lib:format("Error getting design document `~s` from database "
380            "`~s`: ~s", [Id, db_uri(HttpDb), to_binary(Error)]),
381        throw({error, iolist_to_binary(Msg)})
382    end;
383get_ddoc(Db, Id) ->
384    case couch_db:open_doc(Db, Id, [ejson_body]) of
385    {ok, _} = Ok ->
386        Ok;
387    {not_found, _} ->
388        throw({not_found, ddoc_not_found_msg(Db#db.name, Id)})
389    end.
390
391
392db_uri(#httpdb{url = Url}) ->
393    db_uri(Url);
394db_uri(#db{name = Name}) ->
395    Name;
396db_uri(Url) when is_binary(Url) ->
397    ?l2b(couch_util:url_strip_password(Url)).
398
399
400db_not_found_msg(DbName) ->
401    iolist_to_binary(io_lib:format(
402        "Database `~s` doesn't exist.", [db_uri(DbName)])).
403
404ddoc_not_found_msg(DbName, DDocId) ->
405    Msg = io_lib:format(
406        "Design document `~s` missing in database `~s`.",
407        [DDocId, db_uri(DbName)]),
408    iolist_to_binary(Msg).
409
410
411lhttpc_options(#httpdb{timeout = T}) ->
412    % TODO: add SSL options like verify and cacertfile, which should
413    % configurable somewhere.
414    [
415        {connect_timeout, T},
416        {connect_options, [{keepalive, true}, {nodelay, true}]},
417        {pool, whereis(couch_index_merger_connection_pool)}
418    ].
419
420
421collect_row_count(RecvCount, AccCount, PreprocessFun, Callback, UserAcc, Item) ->
422    case Item of
423    {error, _DbUrl, _Reason} = Error ->
424        case Callback(Error, UserAcc) of
425        {stop, Resp} ->
426            {stop, Resp};
427        {ok, UserAcc2} ->
428            case RecvCount > 1 of
429            false ->
430                {ok, UserAcc3} = Callback({start, AccCount}, UserAcc2),
431                {ok, fun (Item2) ->
432                    collect_rows(
433                        PreprocessFun, Callback, UserAcc3, Item2)
434                end};
435            true ->
436                {ok, fun (Item2) ->
437                    collect_row_count(
438                        RecvCount - 1, AccCount, PreprocessFun, Callback,
439                        UserAcc2, Item2)
440                end}
441            end
442        end;
443    {row_count, Count} ->
444        AccCount2 = AccCount + Count,
445        case RecvCount > 1 of
446        false ->
447            % TODO: what about offset and update_seq?
448            % TODO: maybe add etag like for regular views? How to
449            %       compute them?
450            {ok, UserAcc2} = Callback({start, AccCount2}, UserAcc),
451            {ok, fun (Item2) ->
452                collect_rows(PreprocessFun, Callback, UserAcc2, Item2)
453            end};
454        true ->
455            {ok, fun (Item2) ->
456                collect_row_count(
457                    RecvCount - 1, AccCount2, PreprocessFun, Callback, UserAcc, Item2)
458            end}
459        end;
460    {debug_info, _From, _Info} = DebugInfo ->
461        {ok, UserAcc2} = Callback(DebugInfo, UserAcc),
462        {ok, fun (Item2) ->
463            collect_row_count(RecvCount, AccCount, PreprocessFun, Callback, UserAcc2, Item2)
464        end};
465    stop ->
466        {_, UserAcc2} = Callback(stop, UserAcc),
467        {stop, UserAcc2}
468    end.
469
470% PreprocessFun is called on every row (which comes from the fold function
471% of the underlying data structure) before it gets passed into the Callback
472% function
473collect_rows(PreprocessFun, Callback, UserAcc, Item) ->
474    case Item of
475    {error, _DbUrl, _Reason} = Error ->
476        case Callback(Error, UserAcc) of
477        {stop, Resp} ->
478            {stop, Resp};
479        {ok, UserAcc2} ->
480            {ok, fun (Item2) ->
481                collect_rows(PreprocessFun, Callback, UserAcc2, Item2)
482            end}
483        end;
484    {row, Row} ->
485        RowEJson = PreprocessFun(Row),
486        {ok, UserAcc2} = Callback({row, RowEJson}, UserAcc),
487        {ok, fun (Item2) ->
488            collect_rows(PreprocessFun, Callback, UserAcc2, Item2)
489        end};
490    {debug_info, _From, _Info} = DebugInfo ->
491        {ok, UserAcc2} = Callback(DebugInfo, UserAcc),
492        {ok, fun (Item2) ->
493            collect_rows(PreprocessFun, Callback, UserAcc2, Item2)
494        end};
495    stop ->
496        {ok, UserAcc2} = Callback(stop, UserAcc),
497        {stop, UserAcc2}
498    end.
499
500merge_indexes_common(Params, RowFun) ->
501    #merge_params{
502        queue = Queue, collector = Col
503    } = Params,
504    case couch_view_merger_queue:pop(Queue) of
505    closed ->
506        {stop, Resp} = Col(stop),
507        {ok, Resp};
508    {ok, {debug_info, _From, _Info} = DebugInfo} ->
509        ok = couch_view_merger_queue:flush(Queue),
510        {ok, Col2} = Col(DebugInfo),
511        merge_indexes_common(Params#merge_params{collector = Col2}, RowFun);
512    {ok, revision_mismatch} ->
513        revision_mismatch;
514    {ok, set_view_outdated} ->
515        set_view_outdated;
516    {ok, {error, _Url, _Reason} = Error} ->
517        ok = couch_view_merger_queue:flush(Queue),
518        case Col(Error) of
519        {ok, Col2} ->
520            merge_indexes_common(Params#merge_params{collector = Col2}, RowFun);
521        {stop, Resp} ->
522            {stop, Resp}
523        end;
524    {ok, {row_count, _} = RowCount} ->
525        ok = couch_view_merger_queue:flush(Queue),
526        {ok, Col2} = Col(RowCount),
527        merge_indexes_common(Params#merge_params{collector = Col2}, RowFun);
528    {ok, MinRow} ->
529        RowFun(Params, MinRow)
530    end.
531
532merge_indexes_no_limit(Params) ->
533    merge_indexes_common(
534      Params,
535      fun (#merge_params{collector=Col}, _MinRow) ->
536          Col(stop)
537      end).
538
539% Simple case when there are no (or we don't care about) accumulated rows
540% MinRowFun is a function that it called if the
541% couch_view_merger_queue returns a row that is neither an error, nor a count.
542merge_indexes_no_acc(Params, MinRowFun) ->
543    merge_indexes_common(
544      Params,
545      fun (AccParams, MinRow) ->
546          AccParams2 = MinRowFun(AccParams, MinRow),
547          {params, AccParams2}
548      end).
549
550handle_skip(Params) ->
551    #merge_params{
552        limit = Limit, skip = Skip, collector = Col,
553        row_acc = [RowToSend | Rest]
554    } = Params,
555    case Skip > 0 of
556    true ->
557        Limit2 = Limit,
558        Col2 = Col;
559    false ->
560        {ok, Col2} = Col({row, RowToSend}),
561        Limit2 = dec_counter(Limit)
562    end,
563    Params#merge_params{
564        skip = dec_counter(Skip), limit = Limit2, row_acc = Rest,
565        collector = Col2
566    }.
567
568dec_counter(0) -> 0;
569dec_counter(N) -> N - 1.
570
571
572index_folder(Mod, #merged_index_spec{} = IndexSpec,
573        MergeParams0, _UserCtx, DDoc, Queue, _FoldFun) ->
574    MergeParams = case MergeParams0#index_merge.conn_timeout of
575    nil ->
576        MergeParams0#index_merge{
577              conn_timeout = ?DEFAULT_INTERNAL_HTTP_TIMEOUT
578        };
579    _ ->
580        MergeParams0
581    end,
582    http_index_folder(Mod, IndexSpec, MergeParams, DDoc, Queue);
583
584index_folder(_Mod, #set_view_spec{} = ViewSpec, MergeParams,
585        UserCtx, DDoc, Queue, FoldFun) ->
586    FoldFun(nil, ViewSpec, MergeParams, UserCtx, DDoc, Queue).
587
588
589% Fold function for remote indexes
590http_index_folder(Mod, IndexSpec, MergeParams, DDoc, Queue) ->
591    % Trap exits, so that when we receive a shutdown message from the parent,
592    % or an error/exit when queing an item/error, we get all the remaining data
593    % from the socket - this is required in order to ensure the connection can
594    % be reused for other requests and for lhttpc to handle the socket back to
595    % connection pool.
596    process_flag(trap_exit, true),
597    try
598        run_http_index_folder(Mod, IndexSpec, MergeParams, DDoc, Queue)
599    catch
600    throw:queue_shutdown ->
601        ok
602    after
603        Streamer = get(streamer_pid),
604        case is_pid(Streamer) andalso is_process_alive(Streamer) of
605        true ->
606            catch empty_socket(Streamer, MergeParams#index_merge.conn_timeout);
607        false ->
608            ok
609        end
610    end.
611
612get_node(Url) ->
613    {_, Loc, _, _, _} = mochiweb_util:urlsplit(Url),
614    case string:tokens(Loc, [$@]) of
615        [_, L] ->
616            L;
617        [L] ->
618            L
619    end.
620
621run_http_index_folder(Mod, IndexSpec, MergeParams, DDoc, Queue) ->
622    {Url, Method, Headers, Body, BaseOptions} =
623        http_index_folder_req_details(Mod, IndexSpec, MergeParams, DDoc),
624    #index_merge{
625        conn_timeout = Timeout
626    } = MergeParams,
627    LhttpcOptions = [{partial_download, [{window_size, 3}]} | BaseOptions],
628
629    case lhttpc:request(Url, Method, Headers, Body, Timeout, LhttpcOptions) of
630    {ok, {{200, _}, _RespHeaders, Pid}} when is_pid(Pid) ->
631        put(streamer_pid, Pid),
632        try
633            case (element(1, os:type()) =:= win32) orelse
634                    (Mod =/= couch_view_merger) of
635            true ->
636                % TODO: make couch_view_parser build and run on Windows
637                % TODO: make couch_view_parser work with spatial views
638                EventFun = Mod:make_event_fun(MergeParams#index_merge.http_params, Queue),
639                DataFun = fun() -> stream_data(Pid, Timeout) end,
640                json_stream_parse:events(DataFun, EventFun);
641            false ->
642                DataFun = fun() -> next_chunk(Pid, Timeout) end,
643                ok = couch_http_view_streamer:parse(DataFun, Queue, get(from_url))
644            end
645        catch throw:{error, Error} ->
646            ok = couch_view_merger_queue:queue(Queue, {error, get_node(Url), Error})
647        after
648            ok = couch_view_merger_queue:done(Queue)
649        end;
650    {ok, {{Code, _}, _RespHeaders, Pid}} when is_pid(Pid) ->
651        put(streamer_pid, Pid),
652        Error = try
653            stream_all(Pid, Timeout, [])
654        catch throw:{error, _Error} ->
655            <<"Error code ", (?l2b(integer_to_list(Code)))/binary>>
656        end,
657        case (catch ?JSON_DECODE(Error)) of
658        {Props} when is_list(Props) ->
659            case {get_value(<<"error">>, Props), get_value(<<"reason">>, Props)} of
660            {<<"not_found">>, Reason} when Reason =/= <<"missing">>, Reason =/= <<"deleted">> ->
661                ok = couch_view_merger_queue:queue(Queue, {error, get_node(Url), Reason});
662            {<<"not_found">>, _} ->
663                ok = couch_view_merger_queue:queue(Queue, {error, get_node(Url), <<"not_found">>});
664            {<<"error">>, <<"revision_mismatch">>} ->
665                ok = couch_view_merger_queue:queue(Queue, revision_mismatch);
666            {<<"error">>, <<"set_view_outdated">>} ->
667                ok = couch_view_merger_queue:queue(Queue, set_view_outdated);
668            {<<"error">>, Reason} when is_binary(Reason) ->
669                ok = couch_view_merger_queue:queue(Queue, {error, get_node(Url), Reason});
670            ErrorTuple ->
671                ok = couch_view_merger_queue:queue(Queue, {error, get_node(Url), to_binary(ErrorTuple)})
672            end;
673        _ ->
674            ok = couch_view_merger_queue:queue(Queue, {error, get_node(Url), to_binary(Error)})
675        end,
676        ok = couch_view_merger_queue:done(Queue);
677    {error, Error} ->
678        ok = couch_view_merger_queue:queue(Queue, {error, get_node(Url), Error}),
679        ok = couch_view_merger_queue:done(Queue)
680    end.
681
682
683http_index_folder_req_details(Mod, IndexSpec, MergeParams, DDoc) ->
684    #merged_index_spec{
685        url = MergeUrl0,
686        ejson_spec = {EJson}
687    } = IndexSpec,
688    #index_merge{
689        conn_timeout = Timeout,
690        http_params = ViewArgs,
691        extra = Extra
692    } = MergeParams,
693    {ok, HttpDb} = open_db(MergeUrl0, nil, Timeout),
694    #httpdb{
695        url = Url,
696        lhttpc_options = Options,
697        headers = Headers
698    } = HttpDb,
699
700    MergeUrl = Url ++ Mod:view_qs(ViewArgs, MergeParams),
701    EJson1 = Mod:process_extra_params(Extra, EJson),
702
703    EJson2 = case couch_index_merger:should_check_rev(MergeParams, DDoc) of
704    true ->
705        P = fun (Tuple) -> element(1, Tuple) =/= <<"ddoc_revision">> end,
706        [{<<"ddoc_revision">>, ddoc_rev_str(DDoc)} |
707            lists:filter(P, EJson1)];
708    false ->
709        EJson1
710    end,
711
712    Body = {EJson2},
713    put(from_url, ?l2b(Url)),
714    {MergeUrl, "POST", Headers, ?JSON_ENCODE(Body), Options}.
715
716
717stream_data(Pid, Timeout) ->
718    case lhttpc:get_body_part(Pid, Timeout) of
719    {ok, {http_eob, _Trailers}} ->
720         {<<>>, fun() -> throw({error, <<"more view data expected">>}) end};
721    {ok, Data} ->
722         {Data, fun() -> stream_data(Pid, Timeout) end};
723    {error, _} = Error ->
724         throw(Error)
725    end.
726
727
728next_chunk(Pid, Timeout) ->
729    case lhttpc:get_body_part(Pid, Timeout) of
730    {ok, {http_eob, _Trailers}} ->
731         eof;
732    {ok, _Data} = Ok ->
733         Ok;
734    {error, _} = Error ->
735         throw(Error)
736    end.
737
738
739stream_all(Pid, Timeout, Acc) ->
740    case stream_data(Pid, Timeout) of
741    {<<>>, _} ->
742        iolist_to_binary(lists:reverse(Acc));
743    {Data, _} ->
744        stream_all(Pid, Timeout, [Data | Acc])
745    end.
746
747
748empty_socket(Pid, Timeout) ->
749    case stream_data(Pid, Timeout) of
750    {<<>>, _} ->
751        ok;
752    {_Data, _} ->
753        empty_socket(Pid, Timeout)
754    end.
755
756
757void_event(_Ev) ->
758    fun void_event/1.
759
760ddoc_rev(nil) ->
761    nil;
762ddoc_rev(#doc{rev = Rev}) ->
763    Rev.
764
765ddoc_rev_str(DDoc) ->
766    rev_str(ddoc_rev(DDoc)).
767
768should_check_rev(#index_merge{ddoc_revision = DDocRevision}, DDoc) ->
769    DDocRevision =/= nil andalso DDoc =/= nil.
770
771rev_str(nil) ->
772    "nil";
773rev_str(auto) ->
774    "auto";
775rev_str(DocRev) ->
776    couch_doc:rev_to_str(DocRev).
777
778ddoc_unchanged(DbName, DDoc) when is_binary(DbName) ->
779    case couch_db:open_int(DbName, []) of
780    {ok, Db} ->
781        try
782            DDocId = DDoc#doc.id,
783            {ok, MaybeUpdatedDDoc} = get_ddoc(Db, DDocId),
784            ddoc_rev(DDoc) =:= ddoc_rev(MaybeUpdatedDDoc)
785        after
786            couch_db:close(Db)
787        end;
788    {not_found, _} ->
789        throw(ddoc_db_not_found)
790    end;
791ddoc_unchanged(Db, DDoc) ->
792    DbName = couch_db:name(Db),
793    case couch_db:open_int(DbName, []) of
794    {ok, Db1} ->
795        try
796            case couch_db:get_update_seq(Db) =:= couch_db:get_update_seq(Db1) of
797            true ->
798                %% nothing changed
799                true;
800            false ->
801                %% design document may have changed
802                DDocId = DDoc#doc.id,
803                {ok, MaybeUpdatedDDoc} = get_ddoc(Db1, DDocId),
804                ddoc_rev(DDoc) =:= ddoc_rev(MaybeUpdatedDDoc)
805            end
806        after
807            couch_db:close(Db1)
808        end;
809    {not_found, _} ->
810        throw(ddoc_db_not_found)
811    end.
812