1//  Copyright (c) 2016 Couchbase, Inc.
2//  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
3//  except in compliance with the License. You may obtain a copy of the License at
4//    http://www.apache.org/licenses/LICENSE-2.0
5//  Unless required by applicable law or agreed to in writing, software distributed under the
6//  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
7//  either express or implied. See the License for the specific language governing permissions
8//  and limitations under the License.
9
10/*
11GenCache provides a highly concurrent, resizable set of structures, implemented as an
12an array of doubly linked lists for sequential access complemented by maps for direct
13access.
14
15The ForEach method is not meant to provide a snapshot of the current state of affairs
16but rather an almost accurate picture: deletes and inserts are allowed as the scan
17takes place.
18
19Since the cache will be maintained by LRU purging, and certain types of access to the cache
20will move elements at the top of the bucket, we do maintain two lists: LRU (for cleaning)
21and scan (for access): a single list for both operations proved to be inadequate in avoiding
22skipping whole swathes of entries or reporting an element twice, caused by entries moving
23about in the bucket as the scan occurs.
24*/
25package util
26
27import (
28	"sync"
29
30	atomic "github.com/couchbase/go-couchbase/platform"
31)
32
33type genSubList struct {
34	next *genElem // next points to head (list), goes in the direction of head (element)
35	prev *genElem // prev points to tail (list), goes in the direction of tail (element)
36}
37
38type listType int
39
40const (
41	_LRU listType = iota
42	_SCAN
43	_LISTTYPES // Sizer
44)
45
46type genElem struct {
47	ID       string
48	lists    [_LISTTYPES]genSubList
49	lastMRU  uint32
50	refcount int32
51	deleted  bool
52	contents interface{}
53}
54
55const _CACHE_SIZE = 1 << 8
56const _CACHES = 16
57
58type Operation int
59
60const (
61	IGNORE Operation = iota
62	AMEND
63	REPLACE
64)
65
66type GenCache struct {
67
68	// one lock per cache bucket to aid concurrency
69	locks [_CACHES]sync.RWMutex
70
71	// this shows intent to lock exclusively
72	lockers [_CACHES]int32
73
74	// doubly linked lists for scans and ejections
75	lists [_CACHES][_LISTTYPES]genSubList
76
77	// maps for direct access
78	maps [_CACHES]map[string]*genElem
79
80	// MRU operation counter
81	lastMRU uint32
82
83	// max size, for LRU lists
84	limit   int
85	curSize int32
86}
87
88func NewGenCache(l int) *GenCache {
89	rv := &GenCache{
90		limit: l,
91	}
92
93	for b := 0; b < _CACHES; b++ {
94		rv.maps[b] = make(map[string]*genElem, _CACHE_SIZE)
95	}
96	return rv
97}
98
99// Add (or update, if ID found) entry, eject old entry if we are controlling sie
100func (this *GenCache) Add(entry interface{}, id string, process func(interface{}) Operation) {
101	cacheNum := HashString(id, _CACHES)
102	this.lock(cacheNum)
103
104	elem, ok := this.maps[cacheNum][id]
105	if ok {
106		op := REPLACE
107
108		// If the element has been found, process the existing entry,
109		// determine any conflict, and skip if required
110		// The process function may alter the entry contents as
111		// required rather than switching it to the new entry
112		if process != nil {
113			if op = process(elem.contents); op == IGNORE {
114				this.locks[cacheNum].Unlock()
115				return
116			}
117		}
118
119		// Move to the front
120		this.promote(elem, cacheNum)
121
122		if op == REPLACE {
123			elem.contents = entry
124		}
125
126		this.locks[cacheNum].Unlock()
127
128	} else {
129		ditchOther := false
130
131		// In order not to have to acquire a different lock
132		// we try to ditch the LRU entry from this hash node:
133		// it makes the list a bit lopsided at the lower end
134		// but it buys us performance
135		elem = this.lists[cacheNum][_LRU].prev
136		if this.limit > 0 && int(this.curSize) >= this.limit {
137			if elem != nil {
138				this.remove(elem, cacheNum)
139			} else {
140
141				// if we had nothing locally, we'll drop
142				// an entry from another bucket once we
143				// have unlocked this one
144				ditchOther = true
145			}
146		} else {
147			atomic.AddInt32(&this.curSize, 1)
148		}
149		elem = &genElem{
150			contents: entry,
151			ID:       id,
152		}
153		this.add(elem, cacheNum)
154		this.maps[cacheNum][id] = elem
155		this.locks[cacheNum].Unlock()
156
157		// we needed to limit the cache, but our bucket was empty,
158		// so we need to find a sacrificial victim somewhere else
159		// we choose the one with the highest number of entries
160		// for efficiency, we are a bit liberal with locks
161		if ditchOther {
162			count := 0
163			newCacheNum := -1
164
165			for c := 0; c < _CACHES; c++ {
166				l := len(this.maps[c])
167				if l > count {
168					count = l
169					newCacheNum = c
170				}
171			}
172
173			if newCacheNum != -1 {
174				this.lock(newCacheNum)
175				elem = this.lists[newCacheNum][_LRU].prev
176				if elem != nil {
177					this.remove(elem, newCacheNum)
178					ditchOther = false
179				}
180				this.locks[newCacheNum].Unlock()
181			}
182
183			// after all this, we still didn't find another victim
184			// (not even ourselves!), so we need to adjust the count,
185			// as it's off by 1
186			if ditchOther {
187				atomic.AddInt32(&this.curSize, 1)
188			}
189		}
190	}
191}
192
193// Remove entry
194func (this *GenCache) Delete(id string, cleanup func(interface{})) bool {
195	cacheNum := HashString(id, _CACHES)
196	this.lock(cacheNum)
197	defer this.locks[cacheNum].Unlock()
198
199	elem, ok := this.maps[cacheNum][id]
200	if ok {
201		if cleanup != nil {
202			cleanup(elem.contents)
203		}
204		this.remove(elem, cacheNum)
205		atomic.AddInt32(&this.curSize, -1)
206		return true
207	}
208	return false
209}
210
211// Returns an element's contents by id
212func (this *GenCache) Get(id string, process func(interface{})) interface{} {
213	cacheNum := HashString(id, _CACHES)
214	this.locks[cacheNum].RLock()
215	defer this.locks[cacheNum].RUnlock()
216	elem, ok := this.maps[cacheNum][id]
217	if !ok {
218		return nil
219	} else {
220		if process != nil {
221			process(elem.contents)
222		}
223		return elem.contents
224	}
225}
226
227// Returns an element's contents by id and places it at the top of the bucket
228// Also useful to manipulate an element with an exclusive lock
229func (this *GenCache) Use(id string, process func(interface{})) interface{} {
230
231	// if no processing is involved and the cache is in no danger of being
232	// cleaned, we can can just use a shared lock for performance
233	if process == nil && !this.testMRU(0) {
234		return this.Get(id, nil)
235	}
236	cacheNum := HashString(id, _CACHES)
237	this.lock(cacheNum)
238	defer this.locks[cacheNum].Unlock()
239	elem, ok := this.maps[cacheNum][id]
240	if !ok {
241		return nil
242	} else {
243
244		// Move to the front
245		this.promote(elem, cacheNum)
246
247		if process != nil {
248			process(elem.contents)
249		}
250		return elem.contents
251	}
252}
253
254// List Size
255func (this *GenCache) Size() int {
256	return int(this.curSize)
257}
258
259// LRU cleanup limit
260func (this *GenCache) Limit() int {
261	return this.limit
262}
263
264// Set the list limit
265func (this *GenCache) SetLimit(limit int) {
266
267	// this we ought to do with a lock, however
268	// we only envisage one request to change the limit
269	// every blue moon and it's only Add that's using it
270	// to keep the list compact: in the worse case we
271	// skip ditching entries, which is done here anyhow...
272	this.limit = limit
273
274	// reign in entries a bit
275	c := 0
276	for this.limit > 0 && int(this.curSize) > this.limit {
277		this.lock(c)
278		elem := this.lists[c][_LRU].prev
279		if elem != nil {
280			this.remove(elem, c)
281			atomic.AddInt32(&this.curSize, -1)
282		}
283		this.locks[c].Unlock()
284		c = (c + 1) % _CACHES
285	}
286}
287
288// Return a slice with all the entry id's
289func (this *GenCache) Names() []string {
290	i := 0
291
292	// we have emergency extra space not to have to append
293	// if we can avoid it
294	l := int(this.curSize)
295	sz := _CACHES + l
296	n := make([]string, l, sz)
297	this.ForEach(func(id string, entry interface{}) bool {
298		if i < l {
299			n[i] = id
300		} else {
301			n = append(n, id)
302		}
303		i++
304		return true
305	}, nil)
306	return n
307}
308
309// Scan the list
310//
311// As noted in the starting comments, this is not a consistent snapshot
312// but rather a a low cost, almost accurate view
313//
314// For each element, we cater for actions with the bucket locked (must be non blocking)
315// and blocking actions with the bucket available
316// Since, for blocking operations, the entry is not guaranteed to exist, any data needed by them
317// must be set up in the non blocking part
318// both functions should return false if processing needs to stop
319func (this *GenCache) ForEach(nonBlocking func(string, interface{}) bool,
320	blocking func() bool) {
321	cont := true
322
323	for b := 0; b < _CACHES; b++ {
324		sharedLock := true
325		this.locks[b].RLock()
326		nextElem := this.lists[b][_SCAN].prev
327		if nextElem == nil {
328			this.locks[b].RUnlock()
329			continue
330		}
331
332		// mark tail element as in use, so that they don't disappear mid scan
333		atomic.AddInt32(&nextElem.refcount, 1)
334		for {
335			elem := nextElem
336			nextElem = elem.lists[_SCAN].next
337
338			// mark next element as in use so that it doesn't get removed from
339			// the list and we get lost mid scan...
340			if nextElem != nil {
341				atomic.AddInt32(&nextElem.refcount, 1)
342			}
343
344			// somebody had deleted the element  in the interim, so skip it
345			if elem.deleted {
346
347				// and if no longer referenced, get rid of it for real
348				if elem.refcount == 1 {
349
350					// promote the lock
351					this.locks[b].RUnlock()
352					sharedLock = false
353					this.lock(b)
354
355					// if we are still the only referencer, remove
356					if elem.refcount == 1 {
357						this.lists[b][_SCAN].ditch(elem, _SCAN)
358					}
359				}
360
361			} else {
362
363				// perform the non blocking action
364				if nonBlocking != nil {
365					cont = nonBlocking(elem.ID, elem.contents)
366				}
367			}
368
369			// release current element
370			atomic.AddInt32(&elem.refcount, -1)
371
372			// unlock the cache
373			if sharedLock {
374
375				// if we don't have waiters or blocking actions we can just continue
376				if nextElem != nil && cont && blocking == nil && this.lockers[b] == 0 {
377					continue
378				}
379				this.locks[b].RUnlock()
380			} else {
381				this.locks[b].Unlock()
382			}
383
384			// peform the blocking action
385			if cont && !elem.deleted && blocking != nil {
386				cont = blocking()
387			}
388
389			// things went wrong, or got settled early
390			if !cont {
391				return
392			}
393
394			// end of this bucket, onto the next
395			if nextElem == nil {
396				break
397			}
398
399			// restart the scan
400			this.locks[b].RLock()
401			sharedLock = true
402		}
403	}
404}
405
406// show intent to lock the cacheline and proceed with exclusive lock
407func (this *GenCache) lock(cacheNum int) {
408	atomic.AddInt32(&this.lockers[cacheNum], 1)
409	this.locks[cacheNum].Lock()
410	atomic.AddInt32(&this.lockers[cacheNum], -1)
411}
412
413// mark next MRU operation id
414func (this *GenCache) nextMRU() {
415	atomic.AddUint32(&this.lastMRU, 1)
416}
417
418// test is MRU promotion is needed
419// the general idea is that MRU maintenance is expensive, so we will only bother
420// to do it if an entry is in danger of being cleaned
421func (this *GenCache) testMRU(MRU uint32) bool {
422
423	// handle wraparounds
424	return this.lastMRU < MRU ||
425
426		// if we are in the bottom half, move up
427		int(this.lastMRU-MRU) > this.limit/2
428}
429
430// in all of the following methods, the bucket is expected to be already exclusively locked
431func (this *GenCache) add(elem *genElem, cacheNum int) {
432	this.nextMRU()
433	elem.lastMRU = this.lastMRU
434	this.lists[cacheNum][_LRU].insert(elem, _LRU)
435	this.lists[cacheNum][_SCAN].insert(elem, _SCAN)
436}
437
438func (this *GenCache) promote(elem *genElem, cacheNum int) {
439	if this.testMRU(elem.lastMRU) {
440		this.nextMRU()
441		elem.lastMRU = this.lastMRU
442		this.lists[cacheNum][_LRU].ditch(elem, _LRU)
443		this.lists[cacheNum][_LRU].insert(elem, _LRU)
444	}
445}
446
447func (this *GenCache) remove(elem *genElem, cacheNum int) {
448	delete(this.maps[cacheNum], elem.ID)
449	this.lists[cacheNum][_LRU].ditch(elem, _LRU)
450	if elem.refcount > 0 {
451		elem.deleted = true
452	} else {
453		this.lists[cacheNum][_SCAN].ditch(elem, _SCAN)
454	}
455}
456
457func (this *genSubList) insert(elem *genElem, list listType) {
458	elem.lists[list].next = nil
459	if this.next == nil {
460		this.next = elem
461		this.prev = elem
462		elem.lists[list].prev = nil
463	} else {
464		elem.lists[list].prev = this.next
465		elem.lists[list].prev.lists[list].next = elem
466		this.next = elem
467	}
468
469}
470
471func (this *genSubList) ditch(elem *genElem, list listType) {
472
473	// corner cases: head
474	if elem == this.next {
475		this.next = elem.lists[list].prev
476
477		// ...and tail
478		if elem == this.prev {
479			this.prev = elem.lists[list].next
480		} else {
481			elem.lists[list].prev.lists[list].next = nil
482		}
483
484		// tail
485	} else if elem == this.prev {
486		this.prev = elem.lists[list].next
487		elem.lists[list].next.lists[list].prev = nil
488
489		// middle
490	} else {
491		prev := elem.lists[list].prev
492		next := elem.lists[list].next
493		prev.lists[list].next = next
494		next.lists[list].prev = prev
495	}
496
497	// help the GC
498	elem.lists[list].next = nil
499	elem.lists[list].prev = nil
500}
501