1//  Copyright (c) 2018 Couchbase, Inc.
2//  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
3//  except in compliance with the License. You may obtain a copy of the License at
4//    http://www.apache.org/licenses/LICENSE-2.0
5//  Unless required by applicable law or agreed to in writing, software distributed under the
6//  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
7//  either express or implied. See the License for the specific language governing permissions
8//  and limitations under the License.
9
10package prepareds
11
12import (
13	"bytes"
14	"compress/gzip"
15	"encoding/base64"
16	"fmt"
17	"io/ioutil"
18	"math"
19	"sync"
20	"time"
21
22	atomic "github.com/couchbase/go-couchbase/platform"
23	json "github.com/couchbase/go_json"
24	"github.com/couchbase/query/algebra"
25	"github.com/couchbase/query/datastore"
26	"github.com/couchbase/query/distributed"
27	"github.com/couchbase/query/errors"
28	"github.com/couchbase/query/logging"
29	"github.com/couchbase/query/parser/n1ql"
30	"github.com/couchbase/query/plan"
31	"github.com/couchbase/query/planner"
32	"github.com/couchbase/query/util"
33	"github.com/couchbase/query/value"
34)
35
36// prepared statements cache retrieval options
37const (
38	OPT_TRACK  = 1 << iota // track statement in cache
39	OPT_REMOTE             // check with remote node, if available
40	OPT_VERIFY             // verify that the plan is still valid
41)
42
43type preparedCache struct {
44	cache *util.GenCache
45}
46
47type CacheEntry struct {
48	Prepared       *plan.Prepared
49	LastUse        time.Time
50	Uses           int32
51	ServiceTime    atomic.AlignedUint64
52	RequestTime    atomic.AlignedUint64
53	MinServiceTime atomic.AlignedUint64
54	MinRequestTime atomic.AlignedUint64
55	MaxServiceTime atomic.AlignedUint64
56	MaxRequestTime atomic.AlignedUint64
57	// FIXME add moving averages, latency
58	// This requires the use of metrics
59
60	sync.Mutex // for concurrent checking
61	populated  bool
62}
63
64var prepareds = &preparedCache{}
65var store datastore.Datastore
66var systemstore datastore.Datastore
67var namespace string
68
69// init prepareds cache
70
71func PreparedsInit(limit int) {
72	prepareds.cache = util.NewGenCache(limit)
73}
74
75func PreparedsReprepareInit(ds, sy datastore.Datastore, ns string) {
76	store = ds
77	systemstore = sy
78	namespace = ns
79}
80
81// configure prepareds cache
82
83func PreparedsLimit() int {
84	return prepareds.cache.Limit()
85}
86
87func PreparedsSetLimit(limit int) {
88	prepareds.cache.SetLimit(limit)
89}
90
91func (this *preparedCache) get(name value.Value, track bool) *CacheEntry {
92	var cv interface{}
93
94	if name.Type() != value.STRING || !name.Truth() {
95		return nil
96	}
97
98	n := name.Actual().(string)
99	if track {
100		cv = prepareds.cache.Use(n, nil)
101	} else {
102		cv = prepareds.cache.Get(n, nil)
103	}
104	rv, ok := cv.(*CacheEntry)
105	if ok {
106		if track {
107			atomic.AddInt32(&rv.Uses, 1)
108
109			// this is not exactly accurate, but since the MRU queue is
110			// managed properly, we'd rather be inaccurate and make the
111			// change outside of the lock than take a performance hit
112			rv.LastUse = time.Now()
113		}
114		return rv
115	}
116	return nil
117}
118
119func (this *preparedCache) add(prepared *plan.Prepared, populated bool, process func(*CacheEntry) bool) {
120
121	// prepare a new entry, if statement does not exist
122	ce := &CacheEntry{
123		Prepared:       prepared,
124		MinServiceTime: math.MaxUint64,
125		MinRequestTime: math.MaxUint64,
126		populated:      populated,
127	}
128	prepareds.cache.Add(ce, prepared.Name(), func(entry interface{}) util.Operation {
129		var op util.Operation = util.AMEND
130		var cont bool = true
131
132		// check existing entry, amend if all good, ignore otherwise
133		oldEntry := entry.(*CacheEntry)
134		if process != nil {
135			cont = process(oldEntry)
136		}
137		if cont {
138			oldEntry.Prepared = prepared
139			oldEntry.populated = false
140		} else {
141			op = util.IGNORE
142		}
143		return op
144	})
145}
146
147func CountPrepareds() int {
148	return prepareds.cache.Size()
149}
150
151func NamePrepareds() []string {
152	return prepareds.cache.Names()
153}
154
155func PreparedsForeach(nonBlocking func(string, *CacheEntry) bool,
156	blocking func() bool) {
157	dummyF := func(id string, r interface{}) bool {
158		return nonBlocking(id, r.(*CacheEntry))
159	}
160	prepareds.cache.ForEach(dummyF, blocking)
161}
162
163func PreparedDo(name string, f func(*CacheEntry)) {
164	var process func(interface{}) = nil
165
166	if f != nil {
167		process = func(entry interface{}) {
168			ce := entry.(*CacheEntry)
169			f(ce)
170		}
171	}
172	_ = prepareds.cache.Get(name, process)
173}
174
175func AddPrepared(prepared *plan.Prepared) errors.Error {
176	added := true
177
178	prepareds.add(prepared, false, func(ce *CacheEntry) bool {
179		if ce.Prepared.Text() != prepared.Text() {
180			added = false
181		}
182		return added
183	})
184	if !added {
185		return errors.NewPreparedNameError(
186			fmt.Sprintf("duplicate name: %s", prepared.Name()))
187	} else {
188		distributePrepared(prepared.Name(), prepared.EncodedPlan())
189		return nil
190	}
191}
192
193func DeletePrepared(name string) errors.Error {
194	if prepareds.cache.Delete(name, nil) {
195		return nil
196	}
197	return errors.NewNoSuchPreparedError(name)
198}
199
200func GetPrepared(prepared_stmt value.Value, options uint32, phaseTime *time.Duration) (*plan.Prepared, errors.Error) {
201	var err errors.Error
202
203	track := (options & OPT_TRACK) != 0
204	remote := (options & OPT_REMOTE) != 0
205	verify := (options & OPT_VERIFY) != 0
206	switch prepared_stmt.Type() {
207	case value.STRING:
208		var prepared *plan.Prepared
209
210		host, name := distributed.RemoteAccess().SplitKey(prepared_stmt.Actual().(string))
211		ce := prepareds.get(value.NewValue(name), track)
212		if ce != nil {
213			prepared = ce.Prepared
214		}
215		if prepared == nil && remote && host != "" && host != distributed.RemoteAccess().WhoAmI() {
216			distributed.RemoteAccess().GetRemoteDoc(host, name, "prepareds", "GET",
217				func(doc map[string]interface{}) {
218					encoded_plan, ok := doc["encoded_plan"].(string)
219					if ok {
220						prepared, err = DecodePrepared(name, encoded_plan, false, false, phaseTime)
221					}
222				},
223				func(warn errors.Error) {
224				}, distributed.NO_CREDS, "")
225		} else if prepared != nil && verify {
226			var good bool
227
228			// things have already been set up
229			// take the short way home
230			if ce.populated {
231
232				// note that it's fine to check and repopulate without a lock
233				// since the structure of the plan tree won't change, nor the
234				// keyspaces and indexers, the worse that is going to happen is
235				// two requests amending the same counter
236				good = prepared.MetadataCheck()
237
238				// counters have changed. fetch new values
239				if !good {
240					good = prepared.Verify()
241				}
242			} else {
243
244				// we have to proceed under a lock to avoid multiple
245				// requests populating metadata counters at the same time
246				ce.Lock()
247
248				// check again, somebody might have done it in the interim
249				if ce.populated {
250					good = true
251				} else {
252
253					// nada - have to go the long way
254					good = prepared.Verify()
255					if good {
256						ce.populated = true
257					}
258				}
259				ce.Unlock()
260			}
261
262			// after all this, it did not work out!
263			// here we are going to accept multiple requests creating a new
264			// plan concurrently as we don't have a good way to serialize
265			// without blocking the whole prepared cacheline
266			// locking will occur at adding time: both requests will insert,
267			// the last wins
268			if !good {
269				prepared, err = reprepare(prepared, phaseTime)
270				if err == nil {
271					err = AddPrepared(prepared)
272				}
273			}
274		}
275		if err != nil {
276			return nil, err
277		}
278		if prepared == nil {
279			return nil, errors.NewNoSuchPreparedError(name)
280		}
281		return prepared, nil
282	case value.OBJECT:
283		name_value, has_name := prepared_stmt.Field("name")
284		if has_name {
285			if ce := prepareds.get(name_value, track); ce != nil {
286				return ce.Prepared, nil
287			}
288		}
289		prepared_bytes, err := prepared_stmt.MarshalJSON()
290		if err != nil {
291			return nil, errors.NewUnrecognizedPreparedError(err)
292		}
293		return unmarshalPrepared(prepared_bytes, phaseTime)
294	default:
295		return nil, errors.NewUnrecognizedPreparedError(fmt.Errorf("Invalid prepared stmt %v", prepared_stmt))
296	}
297}
298
299func RecordPreparedMetrics(prepared *plan.Prepared, requestTime, serviceTime time.Duration) {
300	if prepared == nil {
301		return
302	}
303	name := prepared.Name()
304	if name == "" {
305		return
306	}
307
308	// cache get had already moved this entry to the top of the LRU
309	// no need to do it again
310	_ = prepareds.cache.Get(name, func(entry interface{}) {
311		ce := entry.(*CacheEntry)
312		atomic.AddUint64(&ce.ServiceTime, uint64(serviceTime))
313		util.TestAndSetUint64(&ce.MinServiceTime, uint64(serviceTime),
314			func(old, new uint64) bool { return old > new }, 0)
315		util.TestAndSetUint64(&ce.MaxServiceTime, uint64(serviceTime),
316			func(old, new uint64) bool { return old < new }, 0)
317		atomic.AddUint64(&ce.RequestTime, uint64(requestTime))
318		util.TestAndSetUint64(&ce.MinRequestTime, uint64(requestTime),
319			func(old, new uint64) bool { return old > new }, 0)
320		util.TestAndSetUint64(&ce.MaxRequestTime, uint64(requestTime),
321			func(old, new uint64) bool { return old < new }, 0)
322	})
323}
324
325func DecodePrepared(prepared_name string, prepared_stmt string, track bool, distribute bool, phaseTime *time.Duration) (*plan.Prepared, errors.Error) {
326	added := true
327
328	decoded, err := base64.StdEncoding.DecodeString(prepared_stmt)
329	if err != nil {
330		return nil, errors.NewPreparedDecodingError(err)
331	}
332	var buf bytes.Buffer
333	buf.Write(decoded)
334	reader, err := gzip.NewReader(&buf)
335	if err != nil {
336		return nil, errors.NewPreparedDecodingError(err)
337	}
338	prepared_bytes, err := ioutil.ReadAll(reader)
339	if err != nil {
340		return nil, errors.NewPreparedDecodingError(err)
341	}
342	prepared, err := unmarshalPrepared(prepared_bytes, phaseTime)
343	if err != nil {
344		return nil, errors.NewPreparedDecodingError(err)
345	}
346
347	prepared.SetEncodedPlan(prepared_stmt)
348
349	// MB-19509 we now have to check that the encoded plan matches
350	// the prepared statement named in the rest API
351	_, prepared_key := distributed.RemoteAccess().SplitKey(prepared_name)
352	if prepared.Name() != "" && prepared_name != "" &&
353		prepared_key != prepared.Name() {
354		return nil, errors.NewEncodingNameMismatchError(prepared_name)
355	}
356
357	if prepared.Name() == "" {
358		return prepared, nil
359	}
360
361	// we don't trust anything strangers give us.
362	// check the plan and populate metadata counters
363	// reprepare if no good
364	good := prepared.Verify()
365	if !good {
366		newPrepared, prepErr := reprepare(prepared, phaseTime)
367		if prepErr == nil {
368			prepared = newPrepared
369		} else {
370			return nil, prepErr
371		}
372	}
373
374	when := time.Now()
375	prepareds.add(prepared, good,
376		func(oldEntry *CacheEntry) bool {
377
378			// MB-19509: if the entry exists already, the new plan must
379			// also be for the same statement as we have in the cache
380			if oldEntry.Prepared != prepared &&
381				oldEntry.Prepared.Text() != prepared.Text() {
382				added = false
383				return added
384			}
385
386			// track the entry if required, whether we amend the plan or
387			// not, as at the end of the statement we will record the
388			// metrics anyway
389			if track {
390				atomic.AddInt32(&oldEntry.Uses, 1)
391				oldEntry.LastUse = when
392			}
393
394			// MB-19659: this is where we decide plan conflict.
395			// the current behaviour is to always use the new plan
396			// and amend the cache
397			// This is still to be finalized
398			return added
399		})
400
401	if added {
402		if distribute {
403			distributePrepared(prepared.Name(), prepared_stmt)
404		}
405		return prepared, nil
406	} else {
407		return nil, errors.NewPreparedEncodingMismatchError(prepared_name)
408	}
409}
410
411func unmarshalPrepared(bytes []byte, phaseTime *time.Duration) (*plan.Prepared, errors.Error) {
412	prepared := plan.NewPrepared(nil, nil)
413	err := prepared.UnmarshalJSON(bytes)
414	if err != nil {
415
416		// if we failed to unmarshall, we find  the statement
417		// and try preparing from scratch
418		text, err1 := json.FirstFind(bytes, "text")
419		if text != nil && err1 == nil {
420			var stmt string
421
422			err1 = json.Unmarshal(text, &stmt)
423			if err1 == nil {
424				prepared.SetText(stmt)
425				pl, _ := reprepare(prepared, phaseTime)
426				if pl != nil {
427					return pl, nil
428				}
429			}
430		}
431		return nil, errors.NewUnrecognizedPreparedError(fmt.Errorf("JSON unmarshalling error: %v", err))
432	}
433	return prepared, nil
434}
435
436func distributePrepared(name, plan string) {
437	go distributed.RemoteAccess().DoRemoteOps([]string{}, "prepareds", "PUT", name, plan,
438		func(warn errors.Error) {
439			if warn != nil {
440				logging.Infof("failed to distribute statement <ud>%v</ud>: %v", name, warn)
441			}
442		}, distributed.NO_CREDS, "")
443}
444
445func reprepare(prepared *plan.Prepared, phaseTime *time.Duration) (*plan.Prepared, errors.Error) {
446	parse := time.Now()
447	stmt, err := n1ql.ParseStatement(prepared.Text())
448	if phaseTime != nil {
449		*phaseTime += time.Since(parse)
450	}
451	if err != nil {
452
453		// this should never happen: the statement parsed to start with
454		return nil, errors.NewReprepareError(err)
455	}
456	prep := time.Now()
457	pl, err := planner.BuildPrepared(stmt.(*algebra.Prepare).Statement(), store, systemstore, namespace, false,
458
459		// building prepared statements should not depend on args
460		nil, nil, prepared.IndexApiVersion(), prepared.FeatureControls())
461	if phaseTime != nil {
462		*phaseTime += time.Since(prep)
463	}
464	if err != nil {
465		return nil, errors.NewReprepareError(err)
466	}
467
468	pl.SetName(prepared.Name())
469	pl.SetText(prepared.Text())
470	pl.SetType(prepared.Type())
471	pl.SetIndexApiVersion(prepared.IndexApiVersion())
472	pl.SetFeatureControls(prepared.FeatureControls())
473
474	json_bytes, err := pl.MarshalJSON()
475	if err != nil {
476		return nil, errors.NewReprepareError(err)
477	}
478	pl.BuildEncodedPlan(json_bytes)
479	return pl, nil
480}
481