1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2014 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17#include "config.h"
18
19#include "atomic.h"
20#include "failover-table.h"
21#include "locks.h"
22#define STATWRITER_NAMESPACE failovers
23#include "statwriter.h"
24#undef STATWRITER_NAMESPACE
25
26FailoverTable::FailoverTable(size_t capacity)
27    : max_entries(capacity), provider(true) {
28    createEntry(0);
29    cacheTableJSON();
30}
31
32FailoverTable::FailoverTable(const std::string& json, size_t capacity)
33    : max_entries(capacity), provider(true) {
34    loadFromJSON(json);
35    cb_assert(table.size() > 0);
36}
37
38FailoverTable::~FailoverTable() { }
39
40failover_entry_t FailoverTable::getLatestEntry() {
41    LockHolder lh(lock);
42    return table.front();
43}
44
45void FailoverTable::createEntry(uint64_t high_seqno) {
46    LockHolder lh(lock);
47    // Our failover table represents only *our* branch of history.
48    // We must remove branches we've diverged from.
49    table_t::iterator itr = table.begin();
50    for (; itr != table.end(); ++itr) {
51        if (itr->by_seqno > high_seqno) {
52            itr = table.erase(itr);
53        }
54    }
55
56    failover_entry_t entry;
57    entry.vb_uuid = (provider.next() >> 16);
58    entry.by_seqno = high_seqno;
59    table.push_front(entry);
60
61    // Cap the size of the table
62    while (table.size() > max_entries) {
63        table.pop_back();
64    }
65    cacheTableJSON();
66}
67
68bool FailoverTable::needsRollback(uint64_t start_seqno,
69                                  uint64_t cur_seqno,
70                                  uint64_t vb_uuid,
71                                  uint64_t snap_start_seqno,
72                                  uint64_t snap_end_seqno,
73                                  uint64_t purge_seqno,
74                                  uint64_t* rollback_seqno) {
75    LockHolder lh(lock);
76    if (start_seqno == 0) {
77        return false;
78    }
79
80    *rollback_seqno = 0;
81    /* There may be items that are purged during compaction. We need
82     to rollback to seq no 0 in that case */
83    if (snap_start_seqno < purge_seqno) {
84        return true;
85    }
86    table_t::reverse_iterator itr;
87    for (itr = table.rbegin(); itr != table.rend(); ++itr) {
88        if (itr->vb_uuid == vb_uuid) {
89            uint64_t upper = cur_seqno;
90
91            ++itr;
92            if (itr != table.rend()) {
93                upper = itr->by_seqno;
94            }
95
96            if (snap_end_seqno <= upper) {
97                return false;
98            }
99
100            if (snap_start_seqno == start_seqno && upper == start_seqno) {
101                return false;
102            }
103
104            if (snap_start_seqno >= upper) {
105                *rollback_seqno = upper;
106                return true;
107            }
108
109            if (snap_start_seqno == start_seqno) {
110                return false;
111            }
112
113            *rollback_seqno = snap_start_seqno;
114            return true;
115        }
116    }
117
118    return true;
119}
120
121void FailoverTable::pruneEntries(uint64_t seqno) {
122    LockHolder lh(lock);
123    table_t::iterator it = table.begin();
124    for (; it != table.end(); ++it) {
125        if (it->by_seqno > seqno) {
126            it = table.erase(it);
127        }
128    }
129
130    cacheTableJSON();
131}
132
133std::string FailoverTable::toJSON() {
134    LockHolder lh(lock);
135    return cachedTableJSON;
136}
137
138void FailoverTable::cacheTableJSON() {
139    cJSON* list = cJSON_CreateArray();
140    table_t::iterator it;
141    for(it = table.begin(); it != table.end(); it++) {
142        cJSON* obj = cJSON_CreateObject();
143        cJSON_AddNumberToObject(obj, "id", (*it).vb_uuid);
144        cJSON_AddNumberToObject(obj, "seq", (*it).by_seqno);
145        cJSON_AddItemToArray(list, obj);
146    }
147    char* json = cJSON_PrintUnformatted(list);
148    cachedTableJSON = json;
149    free(json);
150    cJSON_Delete(list);
151}
152
153void FailoverTable::addStats(const void* cookie, uint16_t vbid,
154                             ADD_STAT add_stat) {
155    LockHolder lh(lock);
156    char statname[80] = {0};
157    snprintf(statname, 80, "vb_%d:num_entries", vbid);
158    add_casted_stat(statname, table.size(), add_stat, cookie);
159
160    table_t::iterator it;
161    int entrycounter = 0;
162    for(it = table.begin(); it != table.end(); ++it) {
163        snprintf(statname, 80, "vb_%d:%d:id", vbid, entrycounter);
164        add_casted_stat(statname, it->vb_uuid, add_stat, cookie);
165        snprintf(statname, 80, "vb_%d:%d:seq", vbid, entrycounter);
166        add_casted_stat(statname, it->by_seqno, add_stat, cookie);
167        entrycounter++;
168    }
169}
170
171ENGINE_ERROR_CODE FailoverTable::addFailoverLog(const void* cookie,
172                                                dcp_add_failover_log callback) {
173    LockHolder lh(lock);
174    ENGINE_ERROR_CODE rv = ENGINE_SUCCESS;
175    size_t logsize = table.size();
176
177    cb_assert(logsize > 0);
178    vbucket_failover_t *logentries = new vbucket_failover_t[logsize];
179    vbucket_failover_t *logentry = logentries;
180
181    table_t::iterator itr;
182    for(itr = table.begin(); itr != table.end(); ++itr) {
183        logentry->uuid = itr->vb_uuid;
184        logentry->seqno = itr->by_seqno;
185        logentry++;
186    }
187    EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
188    rv = callback(logentries, logsize, cookie);
189    ObjectRegistry::onSwitchThread(epe);
190    delete[] logentries;
191
192    return rv;
193}
194
195bool FailoverTable::loadFromJSON(cJSON *json) {
196    if (json->type != cJSON_Array) {
197        return false;
198    }
199
200    for (cJSON* it = json->child; it != NULL; it = it->next) {
201        if (it->type != cJSON_Object) {
202            return false;
203        }
204
205        cJSON* jid = cJSON_GetObjectItem(it, "id");
206        cJSON* jseq = cJSON_GetObjectItem(it, "seq");
207
208        if (jid && jid->type != cJSON_Number) {
209            return false;
210        }
211        if (jseq && jseq->type != cJSON_Number){
212                return false;
213        }
214
215        failover_entry_t entry;
216        entry.vb_uuid = (uint64_t) jid->valuedouble;
217        entry.by_seqno = (uint64_t) jseq->valuedouble;
218        table.push_back(entry);
219    }
220
221    return true;
222}
223
224bool FailoverTable::loadFromJSON(const std::string& json) {
225    cJSON* parsed = cJSON_Parse(json.c_str());
226    bool ret = true;
227
228    if (parsed) {
229        ret = loadFromJSON(parsed);
230        cachedTableJSON = json;
231        cJSON_Delete(parsed);
232    }
233
234    return ret;
235}
236
237void FailoverTable::replaceFailoverLog(uint8_t* bytes, uint32_t length) {
238    LockHolder lh(lock);
239    cb_assert((length % 16) == 0 && length != 0);
240    table.clear();
241
242    for (; length > 0; length -=16) {
243        failover_entry_t entry;
244        memcpy(&entry.by_seqno, bytes + length - 8, sizeof(uint64_t));
245        memcpy(&entry.vb_uuid, bytes + length - 16, sizeof(uint64_t));
246        entry.by_seqno = ntohll(entry.by_seqno);
247        entry.vb_uuid = ntohll(entry.vb_uuid);
248        table.push_front(entry);
249    }
250    cacheTableJSON();
251}
252