1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Copyright 2014 Couchbase, Inc
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 #include "config.h"
18
19 #include "atomic.h"
20 #include "failover-table.h"
21 #include "locks.h"
22 #define STATWRITER_NAMESPACE failovers
23 #include "statwriter.h"
24 #undef STATWRITER_NAMESPACE
25
FailoverTable(size_t capacity)26 FailoverTable::FailoverTable(size_t capacity)
27 : max_entries(capacity), provider(true) {
28 createEntry(0);
29 cacheTableJSON();
30 }
31
FailoverTable(const std::string& json, size_t capacity)32 FailoverTable::FailoverTable(const std::string& json, size_t capacity)
33 : max_entries(capacity), provider(true) {
34 loadFromJSON(json);
35 cb_assert(table.size() > 0);
36 }
37
~FailoverTable()38 FailoverTable::~FailoverTable() { }
39
getLatestEntry()40 failover_entry_t FailoverTable::getLatestEntry() {
41 LockHolder lh(lock);
42 return table.front();
43 }
44
createEntry(uint64_t high_seqno)45 void FailoverTable::createEntry(uint64_t high_seqno) {
46 LockHolder lh(lock);
47 // Our failover table represents only *our* branch of history.
48 // We must remove branches we've diverged from.
49 table_t::iterator itr = table.begin();
50 for (; itr != table.end(); ++itr) {
51 if (itr->by_seqno > high_seqno) {
52 itr = table.erase(itr);
53 }
54 }
55
56 failover_entry_t entry;
57 entry.vb_uuid = (provider.next() >> 16);
58 entry.by_seqno = high_seqno;
59 table.push_front(entry);
60
61 // Cap the size of the table
62 while (table.size() > max_entries) {
63 table.pop_back();
64 }
65 cacheTableJSON();
66 }
67
needsRollback(uint64_t start_seqno, uint64_t cur_seqno, uint64_t vb_uuid, uint64_t snap_start_seqno, uint64_t snap_end_seqno, uint64_t purge_seqno, uint64_t* rollback_seqno)68 bool FailoverTable::needsRollback(uint64_t start_seqno,
69 uint64_t cur_seqno,
70 uint64_t vb_uuid,
71 uint64_t snap_start_seqno,
72 uint64_t snap_end_seqno,
73 uint64_t purge_seqno,
74 uint64_t* rollback_seqno) {
75 LockHolder lh(lock);
76 if (start_seqno == 0) {
77 return false;
78 }
79
80 *rollback_seqno = 0;
81 /* There may be items that are purged during compaction. We need
82 to rollback to seq no 0 in that case */
83 if (snap_start_seqno < purge_seqno) {
84 return true;
85 }
86 table_t::reverse_iterator itr;
87 for (itr = table.rbegin(); itr != table.rend(); ++itr) {
88 if (itr->vb_uuid == vb_uuid) {
89 uint64_t upper = cur_seqno;
90
91 ++itr;
92 if (itr != table.rend()) {
93 upper = itr->by_seqno;
94 }
95
96 if (snap_end_seqno <= upper) {
97 return false;
98 }
99
100 if (snap_start_seqno == start_seqno && upper == start_seqno) {
101 return false;
102 }
103
104 if (snap_start_seqno >= upper) {
105 *rollback_seqno = upper;
106 return true;
107 }
108
109 if (snap_start_seqno == start_seqno) {
110 return false;
111 }
112
113 *rollback_seqno = snap_start_seqno;
114 return true;
115 }
116 }
117
118 return true;
119 }
120
pruneEntries(uint64_t seqno)121 void FailoverTable::pruneEntries(uint64_t seqno) {
122 LockHolder lh(lock);
123 table_t::iterator it = table.begin();
124 for (; it != table.end(); ++it) {
125 if (it->by_seqno > seqno) {
126 it = table.erase(it);
127 }
128 }
129
130 cacheTableJSON();
131 }
132
toJSON()133 std::string FailoverTable::toJSON() {
134 LockHolder lh(lock);
135 return cachedTableJSON;
136 }
137
cacheTableJSON()138 void FailoverTable::cacheTableJSON() {
139 cJSON* list = cJSON_CreateArray();
140 table_t::iterator it;
141 for(it = table.begin(); it != table.end(); it++) {
142 cJSON* obj = cJSON_CreateObject();
143 cJSON_AddNumberToObject(obj, "id", (*it).vb_uuid);
144 cJSON_AddNumberToObject(obj, "seq", (*it).by_seqno);
145 cJSON_AddItemToArray(list, obj);
146 }
147 char* json = cJSON_PrintUnformatted(list);
148 cachedTableJSON = json;
149 free(json);
150 cJSON_Delete(list);
151 }
152
addStats(const void* cookie, uint16_t vbid, ADD_STAT add_stat)153 void FailoverTable::addStats(const void* cookie, uint16_t vbid,
154 ADD_STAT add_stat) {
155 LockHolder lh(lock);
156 char statname[80] = {0};
157 snprintf(statname, 80, "vb_%d:num_entries", vbid);
158 add_casted_stat(statname, table.size(), add_stat, cookie);
159
160 table_t::iterator it;
161 int entrycounter = 0;
162 for(it = table.begin(); it != table.end(); ++it) {
163 snprintf(statname, 80, "vb_%d:%d:id", vbid, entrycounter);
164 add_casted_stat(statname, it->vb_uuid, add_stat, cookie);
165 snprintf(statname, 80, "vb_%d:%d:seq", vbid, entrycounter);
166 add_casted_stat(statname, it->by_seqno, add_stat, cookie);
167 entrycounter++;
168 }
169 }
170
addFailoverLog(const void* cookie, dcp_add_failover_log callback)171 ENGINE_ERROR_CODE FailoverTable::addFailoverLog(const void* cookie,
172 dcp_add_failover_log callback) {
173 LockHolder lh(lock);
174 ENGINE_ERROR_CODE rv = ENGINE_SUCCESS;
175 size_t logsize = table.size();
176
177 cb_assert(logsize > 0);
178 vbucket_failover_t *logentries = new vbucket_failover_t[logsize];
179 vbucket_failover_t *logentry = logentries;
180
181 table_t::iterator itr;
182 for(itr = table.begin(); itr != table.end(); ++itr) {
183 logentry->uuid = itr->vb_uuid;
184 logentry->seqno = itr->by_seqno;
185 logentry++;
186 }
187 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
188 rv = callback(logentries, logsize, cookie);
189 ObjectRegistry::onSwitchThread(epe);
190 delete[] logentries;
191
192 return rv;
193 }
194
loadFromJSON(cJSON *json)195 bool FailoverTable::loadFromJSON(cJSON *json) {
196 if (json->type != cJSON_Array) {
197 return false;
198 }
199
200 for (cJSON* it = json->child; it != NULL; it = it->next) {
201 if (it->type != cJSON_Object) {
202 return false;
203 }
204
205 cJSON* jid = cJSON_GetObjectItem(it, "id");
206 cJSON* jseq = cJSON_GetObjectItem(it, "seq");
207
208 if (jid && jid->type != cJSON_Number) {
209 return false;
210 }
211 if (jseq && jseq->type != cJSON_Number){
212 return false;
213 }
214
215 failover_entry_t entry;
216 entry.vb_uuid = (uint64_t) jid->valuedouble;
217 entry.by_seqno = (uint64_t) jseq->valuedouble;
218 table.push_back(entry);
219 }
220
221 return true;
222 }
223
loadFromJSON(const std::string& json)224 bool FailoverTable::loadFromJSON(const std::string& json) {
225 cJSON* parsed = cJSON_Parse(json.c_str());
226 bool ret = true;
227
228 if (parsed) {
229 ret = loadFromJSON(parsed);
230 cachedTableJSON = json;
231 cJSON_Delete(parsed);
232 }
233
234 return ret;
235 }
236
replaceFailoverLog(uint8_t* bytes, uint32_t length)237 void FailoverTable::replaceFailoverLog(uint8_t* bytes, uint32_t length) {
238 LockHolder lh(lock);
239 cb_assert((length % 16) == 0 && length != 0);
240 table.clear();
241
242 for (; length > 0; length -=16) {
243 failover_entry_t entry;
244 memcpy(&entry.by_seqno, bytes + length - 8, sizeof(uint64_t));
245 memcpy(&entry.vb_uuid, bytes + length - 16, sizeof(uint64_t));
246 entry.by_seqno = ntohll(entry.by_seqno);
247 entry.vb_uuid = ntohll(entry.vb_uuid);
248 table.push_front(entry);
249 }
250 cacheTableJSON();
251 }
252