1// Copyright (c) 2018 Couchbase, Inc. 2// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file 3// except in compliance with the License. You may obtain a copy of the License at 4// http://www.apache.org/licenses/LICENSE-2.0 5// Unless required by applicable law or agreed to in writing, software distributed under the 6// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 7// either express or implied. See the License for the specific language governing permissions 8// and limitations under the License. 9 10package prepareds 11 12import ( 13 "bytes" 14 "compress/gzip" 15 "encoding/base64" 16 "fmt" 17 "io/ioutil" 18 "math" 19 "sync" 20 "time" 21 22 atomic "github.com/couchbase/go-couchbase/platform" 23 json "github.com/couchbase/go_json" 24 "github.com/couchbase/query/algebra" 25 "github.com/couchbase/query/datastore" 26 "github.com/couchbase/query/distributed" 27 "github.com/couchbase/query/errors" 28 "github.com/couchbase/query/logging" 29 "github.com/couchbase/query/parser/n1ql" 30 "github.com/couchbase/query/plan" 31 "github.com/couchbase/query/planner" 32 "github.com/couchbase/query/util" 33 "github.com/couchbase/query/value" 34) 35 36// prepared statements cache retrieval options 37const ( 38 OPT_TRACK = 1 << iota // track statement in cache 39 OPT_REMOTE // check with remote node, if available 40 OPT_VERIFY // verify that the plan is still valid 41) 42 43type preparedCache struct { 44 cache *util.GenCache 45} 46 47type CacheEntry struct { 48 Prepared *plan.Prepared 49 LastUse time.Time 50 Uses int32 51 ServiceTime atomic.AlignedUint64 52 RequestTime atomic.AlignedUint64 53 MinServiceTime atomic.AlignedUint64 54 MinRequestTime atomic.AlignedUint64 55 MaxServiceTime atomic.AlignedUint64 56 MaxRequestTime atomic.AlignedUint64 57 // FIXME add moving averages, latency 58 // This requires the use of metrics 59 60 sync.Mutex // for concurrent checking 61 populated bool 62} 63 64var prepareds = &preparedCache{} 65var store datastore.Datastore 66var systemstore datastore.Datastore 67var namespace string 68 69// init prepareds cache 70 71func PreparedsInit(limit int) { 72 prepareds.cache = util.NewGenCache(limit) 73} 74 75func PreparedsReprepareInit(ds, sy datastore.Datastore, ns string) { 76 store = ds 77 systemstore = sy 78 namespace = ns 79} 80 81// configure prepareds cache 82 83func PreparedsLimit() int { 84 return prepareds.cache.Limit() 85} 86 87func PreparedsSetLimit(limit int) { 88 prepareds.cache.SetLimit(limit) 89} 90 91func (this *preparedCache) get(name value.Value, track bool) *CacheEntry { 92 var cv interface{} 93 94 if name.Type() != value.STRING || !name.Truth() { 95 return nil 96 } 97 98 n := name.Actual().(string) 99 if track { 100 cv = prepareds.cache.Use(n, nil) 101 } else { 102 cv = prepareds.cache.Get(n, nil) 103 } 104 rv, ok := cv.(*CacheEntry) 105 if ok { 106 if track { 107 atomic.AddInt32(&rv.Uses, 1) 108 109 // this is not exactly accurate, but since the MRU queue is 110 // managed properly, we'd rather be inaccurate and make the 111 // change outside of the lock than take a performance hit 112 rv.LastUse = time.Now() 113 } 114 return rv 115 } 116 return nil 117} 118 119func (this *preparedCache) add(prepared *plan.Prepared, populated bool, process func(*CacheEntry) bool) { 120 121 // prepare a new entry, if statement does not exist 122 ce := &CacheEntry{ 123 Prepared: prepared, 124 MinServiceTime: math.MaxUint64, 125 MinRequestTime: math.MaxUint64, 126 populated: populated, 127 } 128 prepareds.cache.Add(ce, prepared.Name(), func(entry interface{}) util.Operation { 129 var op util.Operation = util.AMEND 130 var cont bool = true 131 132 // check existing entry, amend if all good, ignore otherwise 133 oldEntry := entry.(*CacheEntry) 134 if process != nil { 135 cont = process(oldEntry) 136 } 137 if cont { 138 oldEntry.Prepared = prepared 139 oldEntry.populated = false 140 } else { 141 op = util.IGNORE 142 } 143 return op 144 }) 145} 146 147func CountPrepareds() int { 148 return prepareds.cache.Size() 149} 150 151func NamePrepareds() []string { 152 return prepareds.cache.Names() 153} 154 155func PreparedsForeach(nonBlocking func(string, *CacheEntry) bool, 156 blocking func() bool) { 157 dummyF := func(id string, r interface{}) bool { 158 return nonBlocking(id, r.(*CacheEntry)) 159 } 160 prepareds.cache.ForEach(dummyF, blocking) 161} 162 163func PreparedDo(name string, f func(*CacheEntry)) { 164 var process func(interface{}) = nil 165 166 if f != nil { 167 process = func(entry interface{}) { 168 ce := entry.(*CacheEntry) 169 f(ce) 170 } 171 } 172 _ = prepareds.cache.Get(name, process) 173} 174 175func AddPrepared(prepared *plan.Prepared) errors.Error { 176 added := true 177 178 prepareds.add(prepared, false, func(ce *CacheEntry) bool { 179 if ce.Prepared.Text() != prepared.Text() { 180 added = false 181 } 182 return added 183 }) 184 if !added { 185 return errors.NewPreparedNameError( 186 fmt.Sprintf("duplicate name: %s", prepared.Name())) 187 } else { 188 distributePrepared(prepared.Name(), prepared.EncodedPlan()) 189 return nil 190 } 191} 192 193func DeletePrepared(name string) errors.Error { 194 if prepareds.cache.Delete(name, nil) { 195 return nil 196 } 197 return errors.NewNoSuchPreparedError(name) 198} 199 200func GetPrepared(prepared_stmt value.Value, options uint32, phaseTime *time.Duration) (*plan.Prepared, errors.Error) { 201 var err errors.Error 202 203 track := (options & OPT_TRACK) != 0 204 remote := (options & OPT_REMOTE) != 0 205 verify := (options & OPT_VERIFY) != 0 206 switch prepared_stmt.Type() { 207 case value.STRING: 208 var prepared *plan.Prepared 209 210 host, name := distributed.RemoteAccess().SplitKey(prepared_stmt.Actual().(string)) 211 ce := prepareds.get(value.NewValue(name), track) 212 if ce != nil { 213 prepared = ce.Prepared 214 } 215 if prepared == nil && remote && host != "" && host != distributed.RemoteAccess().WhoAmI() { 216 distributed.RemoteAccess().GetRemoteDoc(host, name, "prepareds", "GET", 217 func(doc map[string]interface{}) { 218 encoded_plan, ok := doc["encoded_plan"].(string) 219 if ok { 220 prepared, err = DecodePrepared(name, encoded_plan, false, false, phaseTime) 221 } 222 }, 223 func(warn errors.Error) { 224 }, distributed.NO_CREDS, "") 225 } else if prepared != nil && verify { 226 var good bool 227 228 // things have already been set up 229 // take the short way home 230 if ce.populated { 231 232 // note that it's fine to check and repopulate without a lock 233 // since the structure of the plan tree won't change, nor the 234 // keyspaces and indexers, the worse that is going to happen is 235 // two requests amending the same counter 236 good = prepared.MetadataCheck() 237 238 // counters have changed. fetch new values 239 if !good { 240 good = prepared.Verify() 241 } 242 } else { 243 244 // we have to proceed under a lock to avoid multiple 245 // requests populating metadata counters at the same time 246 ce.Lock() 247 248 // check again, somebody might have done it in the interim 249 if ce.populated { 250 good = true 251 } else { 252 253 // nada - have to go the long way 254 good = prepared.Verify() 255 if good { 256 ce.populated = true 257 } 258 } 259 ce.Unlock() 260 } 261 262 // after all this, it did not work out! 263 // here we are going to accept multiple requests creating a new 264 // plan concurrently as we don't have a good way to serialize 265 // without blocking the whole prepared cacheline 266 // locking will occur at adding time: both requests will insert, 267 // the last wins 268 if !good { 269 prepared, err = reprepare(prepared, phaseTime) 270 if err == nil { 271 err = AddPrepared(prepared) 272 } 273 } 274 } 275 if err != nil { 276 return nil, err 277 } 278 if prepared == nil { 279 return nil, errors.NewNoSuchPreparedError(name) 280 } 281 return prepared, nil 282 case value.OBJECT: 283 name_value, has_name := prepared_stmt.Field("name") 284 if has_name { 285 if ce := prepareds.get(name_value, track); ce != nil { 286 return ce.Prepared, nil 287 } 288 } 289 prepared_bytes, err := prepared_stmt.MarshalJSON() 290 if err != nil { 291 return nil, errors.NewUnrecognizedPreparedError(err) 292 } 293 return unmarshalPrepared(prepared_bytes, phaseTime) 294 default: 295 return nil, errors.NewUnrecognizedPreparedError(fmt.Errorf("Invalid prepared stmt %v", prepared_stmt)) 296 } 297} 298 299func RecordPreparedMetrics(prepared *plan.Prepared, requestTime, serviceTime time.Duration) { 300 if prepared == nil { 301 return 302 } 303 name := prepared.Name() 304 if name == "" { 305 return 306 } 307 308 // cache get had already moved this entry to the top of the LRU 309 // no need to do it again 310 _ = prepareds.cache.Get(name, func(entry interface{}) { 311 ce := entry.(*CacheEntry) 312 atomic.AddUint64(&ce.ServiceTime, uint64(serviceTime)) 313 util.TestAndSetUint64(&ce.MinServiceTime, uint64(serviceTime), 314 func(old, new uint64) bool { return old > new }, 0) 315 util.TestAndSetUint64(&ce.MaxServiceTime, uint64(serviceTime), 316 func(old, new uint64) bool { return old < new }, 0) 317 atomic.AddUint64(&ce.RequestTime, uint64(requestTime)) 318 util.TestAndSetUint64(&ce.MinRequestTime, uint64(requestTime), 319 func(old, new uint64) bool { return old > new }, 0) 320 util.TestAndSetUint64(&ce.MaxRequestTime, uint64(requestTime), 321 func(old, new uint64) bool { return old < new }, 0) 322 }) 323} 324 325func DecodePrepared(prepared_name string, prepared_stmt string, track bool, distribute bool, phaseTime *time.Duration) (*plan.Prepared, errors.Error) { 326 added := true 327 328 decoded, err := base64.StdEncoding.DecodeString(prepared_stmt) 329 if err != nil { 330 return nil, errors.NewPreparedDecodingError(err) 331 } 332 var buf bytes.Buffer 333 buf.Write(decoded) 334 reader, err := gzip.NewReader(&buf) 335 if err != nil { 336 return nil, errors.NewPreparedDecodingError(err) 337 } 338 prepared_bytes, err := ioutil.ReadAll(reader) 339 if err != nil { 340 return nil, errors.NewPreparedDecodingError(err) 341 } 342 prepared, err := unmarshalPrepared(prepared_bytes, phaseTime) 343 if err != nil { 344 return nil, errors.NewPreparedDecodingError(err) 345 } 346 347 prepared.SetEncodedPlan(prepared_stmt) 348 349 // MB-19509 we now have to check that the encoded plan matches 350 // the prepared statement named in the rest API 351 _, prepared_key := distributed.RemoteAccess().SplitKey(prepared_name) 352 if prepared.Name() != "" && prepared_name != "" && 353 prepared_key != prepared.Name() { 354 return nil, errors.NewEncodingNameMismatchError(prepared_name) 355 } 356 357 if prepared.Name() == "" { 358 return prepared, nil 359 } 360 361 // we don't trust anything strangers give us. 362 // check the plan and populate metadata counters 363 // reprepare if no good 364 good := prepared.Verify() 365 if !good { 366 newPrepared, prepErr := reprepare(prepared, phaseTime) 367 if prepErr == nil { 368 prepared = newPrepared 369 } else { 370 return nil, prepErr 371 } 372 } 373 374 when := time.Now() 375 prepareds.add(prepared, good, 376 func(oldEntry *CacheEntry) bool { 377 378 // MB-19509: if the entry exists already, the new plan must 379 // also be for the same statement as we have in the cache 380 if oldEntry.Prepared != prepared && 381 oldEntry.Prepared.Text() != prepared.Text() { 382 added = false 383 return added 384 } 385 386 // track the entry if required, whether we amend the plan or 387 // not, as at the end of the statement we will record the 388 // metrics anyway 389 if track { 390 atomic.AddInt32(&oldEntry.Uses, 1) 391 oldEntry.LastUse = when 392 } 393 394 // MB-19659: this is where we decide plan conflict. 395 // the current behaviour is to always use the new plan 396 // and amend the cache 397 // This is still to be finalized 398 return added 399 }) 400 401 if added { 402 if distribute { 403 distributePrepared(prepared.Name(), prepared_stmt) 404 } 405 return prepared, nil 406 } else { 407 return nil, errors.NewPreparedEncodingMismatchError(prepared_name) 408 } 409} 410 411func unmarshalPrepared(bytes []byte, phaseTime *time.Duration) (*plan.Prepared, errors.Error) { 412 prepared := plan.NewPrepared(nil, nil) 413 err := prepared.UnmarshalJSON(bytes) 414 if err != nil { 415 416 // if we failed to unmarshall, we find the statement 417 // and try preparing from scratch 418 text, err1 := json.FirstFind(bytes, "text") 419 if text != nil && err1 == nil { 420 var stmt string 421 422 err1 = json.Unmarshal(text, &stmt) 423 if err1 == nil { 424 prepared.SetText(stmt) 425 pl, _ := reprepare(prepared, phaseTime) 426 if pl != nil { 427 return pl, nil 428 } 429 } 430 } 431 return nil, errors.NewUnrecognizedPreparedError(fmt.Errorf("JSON unmarshalling error: %v", err)) 432 } 433 return prepared, nil 434} 435 436func distributePrepared(name, plan string) { 437 go distributed.RemoteAccess().DoRemoteOps([]string{}, "prepareds", "PUT", name, plan, 438 func(warn errors.Error) { 439 if warn != nil { 440 logging.Infof("failed to distribute statement <ud>%v</ud>: %v", name, warn) 441 } 442 }, distributed.NO_CREDS, "") 443} 444 445func reprepare(prepared *plan.Prepared, phaseTime *time.Duration) (*plan.Prepared, errors.Error) { 446 parse := time.Now() 447 stmt, err := n1ql.ParseStatement(prepared.Text()) 448 if phaseTime != nil { 449 *phaseTime += time.Since(parse) 450 } 451 if err != nil { 452 453 // this should never happen: the statement parsed to start with 454 return nil, errors.NewReprepareError(err) 455 } 456 prep := time.Now() 457 pl, err := planner.BuildPrepared(stmt.(*algebra.Prepare).Statement(), store, systemstore, namespace, false, 458 459 // building prepared statements should not depend on args 460 nil, nil, prepared.IndexApiVersion(), prepared.FeatureControls()) 461 if phaseTime != nil { 462 *phaseTime += time.Since(prep) 463 } 464 if err != nil { 465 return nil, errors.NewReprepareError(err) 466 } 467 468 pl.SetName(prepared.Name()) 469 pl.SetText(prepared.Text()) 470 pl.SetType(prepared.Type()) 471 pl.SetIndexApiVersion(prepared.IndexApiVersion()) 472 pl.SetFeatureControls(prepared.FeatureControls()) 473 474 json_bytes, err := pl.MarshalJSON() 475 if err != nil { 476 return nil, errors.NewReprepareError(err) 477 } 478 pl.BuildEncodedPlan(json_bytes) 479 return pl, nil 480} 481