1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 * Hash table
4 *
5 */
6#include "config.h"
7#include <errno.h>
8#include <fcntl.h>
9#include <logger/logger.h>
10#include <platform/crc32c.h>
11#include <platform/platform.h>
12#include <platform/strerror.h>
13#include <stdio.h>
14#include <stdlib.h>
15#include <string.h>
16#include <mutex>
17#include <vector>
18
19#include "default_engine_internal.h"
20
21#define hashsize(n) ((size_t)1<<(n))
22#define hashmask(n) (hashsize(n)-1)
23
24struct Assoc {
25    Assoc(unsigned int hp) : hashpower(hp) {
26        primary_hashtable.resize(hashsize(hashpower));
27    }
28
29    /* how many powers of 2's worth of buckets we use */
30    unsigned int hashpower;
31
32
33    /* Main hash table. This is where we look except during expansion. */
34    std::vector<hash_item*> primary_hashtable;
35
36    /*
37     * Previous hash table. During expansion, we look here for keys that haven't
38     * been moved over to the primary yet.
39     */
40    std::vector<hash_item*> old_hashtable;
41
42    /* Number of items in the hash table. */
43    unsigned int hash_items{0};
44
45    /* Flag: Are we in the middle of expanding now? */
46    bool expanding{false};
47
48    /*
49     * During expansion we migrate values with bucket granularity; this is how
50     * far we've gotten so far. Ranges from 0 .. hashsize(hashpower - 1) - 1.
51     */
52    unsigned int expand_bucket{0};
53
54    /*
55     * serialise access to the hashtable
56     */
57    std::mutex mutex;
58};
59
60/* One hashtable for all */
61static struct Assoc* global_assoc = nullptr;
62
63/* assoc factory. returns one new assoc or NULL if out-of-memory */
64static struct Assoc* assoc_consruct(int hashpower) {
65    try {
66        return new Assoc(hashpower);
67    } catch (const std::bad_alloc&) {
68        return nullptr;
69    }
70}
71
72ENGINE_ERROR_CODE assoc_init(struct default_engine *engine) {
73    /*
74        construct and save away one assoc for use by all buckets.
75    */
76    if (global_assoc == nullptr) {
77        global_assoc = assoc_consruct(16);
78    }
79    return (global_assoc != NULL) ? ENGINE_SUCCESS : ENGINE_ENOMEM;
80}
81
82void assoc_destroy() {
83    if (global_assoc != nullptr) {
84        while (global_assoc->expanding) {
85            usleep(250);
86        }
87        delete global_assoc;
88        global_assoc = nullptr;
89    }
90}
91
92hash_item *assoc_find(uint32_t hash, const hash_key *key) {
93    hash_item *it;
94    unsigned int oldbucket;
95    hash_item *ret = NULL;
96    int depth = 0;
97    std::lock_guard<std::mutex> guard(global_assoc->mutex);
98    if (global_assoc->expanding &&
99        (oldbucket = (hash & hashmask(global_assoc->hashpower - 1))) >= global_assoc->expand_bucket)
100    {
101        it = global_assoc->old_hashtable[oldbucket];
102    } else {
103        it = global_assoc->primary_hashtable[hash & hashmask(global_assoc->hashpower)];
104    }
105
106    while (it) {
107        const hash_key* it_key = item_get_key(it);
108        if ((hash_key_get_key_len(key) == hash_key_get_key_len(it_key)) &&
109            (memcmp(hash_key_get_key(key),
110                    hash_key_get_key(it_key),
111                    hash_key_get_key_len(key)) == 0)) {
112            ret = it;
113            break;
114        }
115        it = it->h_next;
116        ++depth;
117    }
118    MEMCACHED_ASSOC_FIND(hash_key_get_key(key), hash_key_get_key_len(key), depth);
119    return ret;
120}
121
122/*
123    returns the address of the item pointer before the key.  if *item == 0,
124    the item wasn't found
125    assoc->lock is assumed to be held by the caller.
126*/
127static hash_item** _hashitem_before(uint32_t hash, const hash_key* key) {
128    hash_item **pos;
129    unsigned int oldbucket;
130
131    if (global_assoc->expanding &&
132        (oldbucket = (hash & hashmask(global_assoc->hashpower - 1))) >= global_assoc->expand_bucket)
133    {
134        pos = &global_assoc->old_hashtable[oldbucket];
135    } else {
136        pos = &global_assoc->primary_hashtable[hash & hashmask(global_assoc->hashpower)];
137    }
138
139    while (*pos) {
140        const hash_key* pos_key = item_get_key(*pos);
141        if ((hash_key_get_key_len(key) != hash_key_get_key_len(pos_key)) ||
142            (memcmp(hash_key_get_key(key),
143                    hash_key_get_key(pos_key),
144                    hash_key_get_key_len(key)))) {
145             pos = &(*pos)->h_next;
146        } else {
147            break;
148        }
149    }
150
151    return pos;
152}
153
154static void assoc_maintenance_thread(void *arg);
155
156/*
157    grows the hashtable to the next power of 2.
158    assoc->lock is assumed to be held by the caller.
159*/
160static void assoc_expand() {
161    global_assoc->old_hashtable.swap(global_assoc->primary_hashtable);
162
163    try {
164        global_assoc->primary_hashtable.resize(hashsize(global_assoc->hashpower + 1));
165    } catch (const std::bad_alloc&) {
166        global_assoc->primary_hashtable.swap(global_assoc->old_hashtable);
167        /* Bad news, but we can keep running. */
168        return;
169    }
170
171    int ret = 0;
172    cb_thread_t tid;
173
174    global_assoc->hashpower++;
175    global_assoc->expanding = true;
176    global_assoc->expand_bucket = 0;
177
178    /* start a thread to do the expansion */
179    if ((ret = cb_create_named_thread(&tid, assoc_maintenance_thread,
180                                      nullptr, 1, "mc:assoc_maint")) != 0)
181    {
182        LOG_ERROR("Can't create thread for rebalance assoc table: {}",
183                  cb_strerror());
184        global_assoc->hashpower--;
185        global_assoc->expanding = false;
186        global_assoc->primary_hashtable.swap(global_assoc->old_hashtable);
187        global_assoc->old_hashtable.resize(0);
188        global_assoc->old_hashtable.shrink_to_fit();
189    }
190}
191
192/* Note: this isn't an assoc_update.  The key must not already exist to call this */
193int assoc_insert(uint32_t hash, hash_item *it) {
194    unsigned int oldbucket;
195
196    cb_assert(assoc_find(hash, item_get_key(it)) == 0);  /* shouldn't have duplicately named things defined */
197
198    std::lock_guard<std::mutex> guard(global_assoc->mutex);
199    if (global_assoc->expanding &&
200        (oldbucket = (hash & hashmask(global_assoc->hashpower - 1))) >= global_assoc->expand_bucket)
201    {
202        it->h_next = global_assoc->old_hashtable[oldbucket];
203        global_assoc->old_hashtable[oldbucket] = it;
204    } else {
205        it->h_next = global_assoc->primary_hashtable[hash & hashmask(global_assoc->hashpower)];
206        global_assoc->primary_hashtable[hash & hashmask(global_assoc->hashpower)] = it;
207    }
208
209    global_assoc->hash_items++;
210    if (! global_assoc->expanding && global_assoc->hash_items > (hashsize(global_assoc->hashpower) * 3) / 2) {
211        assoc_expand();
212    }
213    MEMCACHED_ASSOC_INSERT(hash_key_get_key(item_get_key(it)), hash_key_get_key_len(item_get_key(it)), global_assoc->hash_items);
214    return 1;
215}
216
217void assoc_delete(uint32_t hash, const hash_key *key) {
218    std::lock_guard<std::mutex> guard(global_assoc->mutex);
219    hash_item **before = _hashitem_before(hash, key);
220
221    if (*before) {
222        hash_item *nxt;
223        global_assoc->hash_items--;
224        /* The DTrace probe cannot be triggered as the last instruction
225         * due to possible tail-optimization by the compiler
226         */
227        MEMCACHED_ASSOC_DELETE(hash_key_get_key(key),
228                               hash_key_get_key_len(key),
229                               global_assoc->hash_items);
230        nxt = (*before)->h_next;
231        (*before)->h_next = 0;   /* probably pointless, but whatever. */
232        *before = nxt;
233        return;
234    }
235    /* Note:  we never actually get here.  the callers don't delete things
236       they can't find. */
237    cb_assert(*before != 0);
238}
239
240
241
242#define DEFAULT_HASH_BULK_MOVE 1
243int hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
244
245static void assoc_maintenance_thread(void *arg) {
246    bool done = false;
247    do {
248        int ii;
249        std::lock_guard<std::mutex> guard(global_assoc->mutex);
250
251        for (ii = 0; ii < hash_bulk_move && global_assoc->expanding; ++ii) {
252            hash_item *it, *next;
253            int bucket;
254
255            for (it = global_assoc->old_hashtable[global_assoc->expand_bucket];
256                 NULL != it; it = next) {
257                next = it->h_next;
258                const hash_key* key = item_get_key(it);
259                bucket = crc32c(hash_key_get_key(key),
260                                hash_key_get_key_len(key),
261                                0) & hashmask(global_assoc->hashpower);
262                it->h_next = global_assoc->primary_hashtable[bucket];
263                global_assoc->primary_hashtable[bucket] = it;
264            }
265
266            global_assoc->old_hashtable[global_assoc->expand_bucket] = NULL;
267            global_assoc->expand_bucket++;
268            if (global_assoc->expand_bucket == hashsize(global_assoc->hashpower - 1)) {
269                global_assoc->expanding = false;
270                global_assoc->old_hashtable.resize(0);
271                global_assoc->old_hashtable.shrink_to_fit();
272                LOG_INFO("Hash table expansion done");
273            }
274        }
275        if (!global_assoc->expanding) {
276            done = true;
277        }
278    } while (!done);
279}
280
281bool assoc_expanding() {
282    std::lock_guard<std::mutex> guard(global_assoc->mutex);
283    return global_assoc->expanding;
284}
285