1// Copyright (c) 2014 Couchbase, Inc. 2// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file 3// except in compliance with the License. You may obtain a copy of the License at 4// http://www.apache.org/licenses/LICENSE-2.0 5// Unless required by applicable law or agreed to in writing, software distributed under the 6// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 7// either express or implied. See the License for the specific language governing permissions 8// and limitations under the License. 9 10package execution 11 12import ( 13 "container/heap" 14 "encoding/json" 15 16 "github.com/couchbase/query/plan" 17 "github.com/couchbase/query/value" 18) 19 20type OrderLimit struct { 21 *Order 22 offset *Offset // offset is optional 23 limit *Limit // limit must present 24 numReturnedRows int 25 fallbackNum int 26 ignoreInput bool 27 fallback bool 28 numProcessedRows uint64 29} 30 31func NewOrderLimit(plan *plan.Order, context *Context) *OrderLimit { 32 var rv *OrderLimit 33 if plan.Offset() == nil { 34 rv = &OrderLimit{ 35 Order: NewOrder(plan, context), 36 offset: nil, 37 limit: NewLimit(plan.Limit(), context), 38 numReturnedRows: 0, 39 fallbackNum: plan.FallbackNum(), 40 ignoreInput: false, 41 fallback: false, 42 numProcessedRows: 0, 43 } 44 } else { 45 rv = &OrderLimit{ 46 Order: NewOrder(plan, context), 47 offset: NewOffset(plan.Offset(), context), 48 limit: NewLimit(plan.Limit(), context), 49 numReturnedRows: 0, 50 fallbackNum: plan.FallbackNum(), 51 ignoreInput: false, 52 fallback: false, 53 numProcessedRows: 0, 54 } 55 } 56 57 rv.output = rv 58 return rv 59} 60 61func (this *OrderLimit) Copy() Operator { 62 var rv *OrderLimit 63 64 if this.offset == nil { 65 rv = &OrderLimit{ 66 Order: this.Order.Copy().(*Order), 67 offset: nil, 68 limit: this.limit.Copy().(*Limit), 69 numReturnedRows: this.numReturnedRows, 70 ignoreInput: this.ignoreInput, 71 fallback: this.fallback, 72 numProcessedRows: this.numProcessedRows, 73 } 74 } else { 75 rv = &OrderLimit{ 76 Order: this.Order.Copy().(*Order), 77 offset: this.offset.Copy().(*Offset), 78 limit: this.limit.Copy().(*Limit), 79 numReturnedRows: this.numReturnedRows, 80 ignoreInput: this.ignoreInput, 81 fallback: this.fallback, 82 numProcessedRows: this.numProcessedRows, 83 } 84 } 85 return rv 86} 87 88func (this *OrderLimit) RunOnce(context *Context, parent value.Value) { 89 defer this.releaseValues() 90 this.runConsumer(this, context, parent) 91} 92 93func (this *OrderLimit) beforeItems(context *Context, parent value.Value) bool { 94 context.AddPhaseOperator(SORT) 95 this.numReturnedRows = 0 96 this.fallback = false 97 this.numProcessedRows = 0 98 this.setupTerms(context) 99 res := true 100 101 if this.offset != nil { 102 // There is an offset in the query. 103 res = this.offset.beforeItems(context, parent) 104 if !res { 105 return res 106 } 107 offset := this.offset.offset 108 if offset > int64(this.fallbackNum) { 109 // Fall back to the standard sort. 110 this.fallback = true 111 } else { 112 this.numReturnedRows += int(offset) 113 } 114 } 115 116 res = res && this.limit.beforeItems(context, parent) 117 if !res { 118 return res 119 } 120 limit := this.limit.limit 121 this.ignoreInput = limit <= 0 122 if !this.ignoreInput && !this.fallback && limit > int64(this.fallbackNum-this.numReturnedRows) { 123 // Fallback to the standard sort. 124 this.fallback = true 125 } 126 127 if !this.ignoreInput && !this.fallback { 128 this.numReturnedRows += int(limit) 129 } 130 131 // Will ignore input rows if numReturnedRows is not positive. 132 this.ignoreInput = this.ignoreInput || this.numReturnedRows <= 0 133 134 // Allocate more space if necessary. 135 if this.numReturnedRows > cap(this.values) { 136 values := make(value.AnnotatedValues, len(this.values), this.numReturnedRows) 137 copy(values, this.values) 138 this.releaseValues() 139 this.values = values 140 } 141 return res 142} 143 144func (this *OrderLimit) processItem(item value.AnnotatedValue, context *Context) bool { 145 this.numProcessedRows++ 146 if this.fallback { 147 return this.Order.processItem(item, context) 148 } 149 if this.ignoreInput { 150 return true 151 } 152 153 // Prune the item that does not need to enter the heap. 154 if len(this.values) == this.numReturnedRows && !this.lessThan(item, this.values[0]) { 155 return true 156 } 157 158 // Push the current item into the maximum heap. 159 heap.Push(this, item) 160 if len(this.values) > this.numReturnedRows { 161 // Pop and discard the largest item out of the maximum heap. 162 heap.Pop(this) 163 } 164 return true 165} 166 167func (this *OrderLimit) afterItems(context *Context) { 168 defer func() { 169 if this.offset != nil { 170 this.offset.afterItems(context) 171 } 172 this.limit.afterItems(context) 173 }() 174 175 // Deal with the case no data item is needed at all: 176 // when offset is too large. 177 len := len(this.values) 178 offset := int64(0) 179 if this.offset != nil { 180 offset = this.offset.offset 181 } 182 if offset >= int64(len) { 183 this.values = this.values[0:0] 184 } 185 186 this.Order.afterItems(context) 187 188 // Set the sort count to the number of processed rows. 189 context.AddPhaseCount(SORT, this.numProcessedRows) 190 context.SetSortCount(this.numProcessedRows) 191} 192 193func (this *OrderLimit) Less(i, j int) bool { 194 // Since the heap is a maximum heap, it needs to returns the reversal of Less in Order. 195 return this.Order.Less(j, i) 196} 197 198func (this *OrderLimit) Push(item interface{}) { 199 this.values = append(this.values, item.(value.AnnotatedValue)) 200} 201 202func (this *OrderLimit) Pop() interface{} { 203 index := len(this.values) - 1 204 item := this.values[index] 205 this.values = this.values[0:index:cap(this.values)] 206 return item 207} 208 209func (this *OrderLimit) MarshalJSON() ([]byte, error) { 210 r := this.plan.MarshalBase(func(r map[string]interface{}) { 211 this.marshalTimes(r) 212 }) 213 return json.Marshal(r) 214} 215 216func (this *OrderLimit) reopen(context *Context) { 217 this.Order.reopen(context) 218 this.limit.baseReopen(context) 219} 220 221func (this *OrderLimit) Done() { 222 this.Order.Done() 223 this.Order = nil 224 if this.limit != nil { 225 this.limit.Done() 226 this.limit = nil 227 } 228 if this.offset != nil { 229 this.offset.Done() 230 this.offset = nil 231 } 232} 233