1//  Copyright (c) 2014 Couchbase, Inc.
2//  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
3//  except in compliance with the License. You may obtain a copy of the License at
4//    http://www.apache.org/licenses/LICENSE-2.0
5//  Unless required by applicable law or agreed to in writing, software distributed under the
6//  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
7//  either express or implied. See the License for the specific language governing permissions
8//  and limitations under the License.
9
10package execution
11
12import (
13	"container/heap"
14	"encoding/json"
15
16	"github.com/couchbase/query/plan"
17	"github.com/couchbase/query/value"
18)
19
20type OrderLimit struct {
21	*Order
22	offset           *Offset // offset is optional
23	limit            *Limit  // limit must present
24	numReturnedRows  int
25	fallbackNum      int
26	ignoreInput      bool
27	fallback         bool
28	numProcessedRows uint64
29}
30
31func NewOrderLimit(plan *plan.Order, context *Context) *OrderLimit {
32	var rv *OrderLimit
33	if plan.Offset() == nil {
34		rv = &OrderLimit{
35			Order:            NewOrder(plan, context),
36			offset:           nil,
37			limit:            NewLimit(plan.Limit(), context),
38			numReturnedRows:  0,
39			fallbackNum:      plan.FallbackNum(),
40			ignoreInput:      false,
41			fallback:         false,
42			numProcessedRows: 0,
43		}
44	} else {
45		rv = &OrderLimit{
46			Order:            NewOrder(plan, context),
47			offset:           NewOffset(plan.Offset(), context),
48			limit:            NewLimit(plan.Limit(), context),
49			numReturnedRows:  0,
50			fallbackNum:      plan.FallbackNum(),
51			ignoreInput:      false,
52			fallback:         false,
53			numProcessedRows: 0,
54		}
55	}
56
57	rv.output = rv
58	return rv
59}
60
61func (this *OrderLimit) Copy() Operator {
62	var rv *OrderLimit
63
64	if this.offset == nil {
65		rv = &OrderLimit{
66			Order:            this.Order.Copy().(*Order),
67			offset:           nil,
68			limit:            this.limit.Copy().(*Limit),
69			numReturnedRows:  this.numReturnedRows,
70			ignoreInput:      this.ignoreInput,
71			fallback:         this.fallback,
72			numProcessedRows: this.numProcessedRows,
73		}
74	} else {
75		rv = &OrderLimit{
76			Order:            this.Order.Copy().(*Order),
77			offset:           this.offset.Copy().(*Offset),
78			limit:            this.limit.Copy().(*Limit),
79			numReturnedRows:  this.numReturnedRows,
80			ignoreInput:      this.ignoreInput,
81			fallback:         this.fallback,
82			numProcessedRows: this.numProcessedRows,
83		}
84	}
85	return rv
86}
87
88func (this *OrderLimit) RunOnce(context *Context, parent value.Value) {
89	defer this.releaseValues()
90	this.runConsumer(this, context, parent)
91}
92
93func (this *OrderLimit) beforeItems(context *Context, parent value.Value) bool {
94	context.AddPhaseOperator(SORT)
95	this.numReturnedRows = 0
96	this.fallback = false
97	this.numProcessedRows = 0
98	this.setupTerms(context)
99	res := true
100
101	if this.offset != nil {
102		// There is an offset in the query.
103		res = this.offset.beforeItems(context, parent)
104		if !res {
105			return res
106		}
107		offset := this.offset.offset
108		if offset > int64(this.fallbackNum) {
109			// Fall back to the standard sort.
110			this.fallback = true
111		} else {
112			this.numReturnedRows += int(offset)
113		}
114	}
115
116	res = res && this.limit.beforeItems(context, parent)
117	if !res {
118		return res
119	}
120	limit := this.limit.limit
121	this.ignoreInput = limit <= 0
122	if !this.ignoreInput && !this.fallback && limit > int64(this.fallbackNum-this.numReturnedRows) {
123		// Fallback to the standard sort.
124		this.fallback = true
125	}
126
127	if !this.ignoreInput && !this.fallback {
128		this.numReturnedRows += int(limit)
129	}
130
131	// Will ignore input rows if numReturnedRows is not positive.
132	this.ignoreInput = this.ignoreInput || this.numReturnedRows <= 0
133
134	// Allocate more space if necessary.
135	if this.numReturnedRows > cap(this.values) {
136		values := make(value.AnnotatedValues, len(this.values), this.numReturnedRows)
137		copy(values, this.values)
138		this.releaseValues()
139		this.values = values
140	}
141	return res
142}
143
144func (this *OrderLimit) processItem(item value.AnnotatedValue, context *Context) bool {
145	this.numProcessedRows++
146	if this.fallback {
147		return this.Order.processItem(item, context)
148	}
149	if this.ignoreInput {
150		return true
151	}
152
153	// Prune the item that does not need to enter the heap.
154	if len(this.values) == this.numReturnedRows && !this.lessThan(item, this.values[0]) {
155		return true
156	}
157
158	// Push the current item into the maximum heap.
159	heap.Push(this, item)
160	if len(this.values) > this.numReturnedRows {
161		// Pop and discard the largest item out of the maximum heap.
162		heap.Pop(this)
163	}
164	return true
165}
166
167func (this *OrderLimit) afterItems(context *Context) {
168	defer func() {
169		if this.offset != nil {
170			this.offset.afterItems(context)
171		}
172		this.limit.afterItems(context)
173	}()
174
175	// Deal with the case no data item is needed at all:
176	// when offset is too large.
177	len := len(this.values)
178	offset := int64(0)
179	if this.offset != nil {
180		offset = this.offset.offset
181	}
182	if offset >= int64(len) {
183		this.values = this.values[0:0]
184	}
185
186	this.Order.afterItems(context)
187
188	// Set the sort count to the number of processed rows.
189	context.AddPhaseCount(SORT, this.numProcessedRows)
190	context.SetSortCount(this.numProcessedRows)
191}
192
193func (this *OrderLimit) Less(i, j int) bool {
194	// Since the heap is a maximum heap, it needs to returns the reversal of Less in Order.
195	return this.Order.Less(j, i)
196}
197
198func (this *OrderLimit) Push(item interface{}) {
199	this.values = append(this.values, item.(value.AnnotatedValue))
200}
201
202func (this *OrderLimit) Pop() interface{} {
203	index := len(this.values) - 1
204	item := this.values[index]
205	this.values = this.values[0:index:cap(this.values)]
206	return item
207}
208
209func (this *OrderLimit) MarshalJSON() ([]byte, error) {
210	r := this.plan.MarshalBase(func(r map[string]interface{}) {
211		this.marshalTimes(r)
212	})
213	return json.Marshal(r)
214}
215
216func (this *OrderLimit) reopen(context *Context) {
217	this.Order.reopen(context)
218	this.limit.baseReopen(context)
219}
220
221func (this *OrderLimit) Done() {
222	this.Order.Done()
223	this.Order = nil
224	if this.limit != nil {
225		this.limit.Done()
226		this.limit = nil
227	}
228	if this.offset != nil {
229		this.offset.Done()
230		this.offset = nil
231	}
232}
233