1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2014 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 #include "config.h"
18 
19 #include "atomic.h"
20 #include "failover-table.h"
21 #include "locks.h"
22 #define STATWRITER_NAMESPACE failovers
23 #include "statwriter.h"
24 #undef STATWRITER_NAMESPACE
25 
FailoverTable(size_t capacity)26 FailoverTable::FailoverTable(size_t capacity)
27     : max_entries(capacity), provider(true) {
28     createEntry(0);
29     cacheTableJSON();
30 }
31 
FailoverTable(const std::string& json, size_t capacity)32 FailoverTable::FailoverTable(const std::string& json, size_t capacity)
33     : max_entries(capacity), provider(true) {
34     loadFromJSON(json);
35     cb_assert(table.size() > 0);
36 }
37 
~FailoverTable()38 FailoverTable::~FailoverTable() { }
39 
getLatestEntry()40 failover_entry_t FailoverTable::getLatestEntry() {
41     LockHolder lh(lock);
42     return table.front();
43 }
44 
createEntry(uint64_t high_seqno)45 void FailoverTable::createEntry(uint64_t high_seqno) {
46     LockHolder lh(lock);
47     // Our failover table represents only *our* branch of history.
48     // We must remove branches we've diverged from.
49     table_t::iterator itr = table.begin();
50     for (; itr != table.end(); ++itr) {
51         if (itr->by_seqno > high_seqno) {
52             itr = table.erase(itr);
53         }
54     }
55 
56     failover_entry_t entry;
57     entry.vb_uuid = (provider.next() >> 16);
58     entry.by_seqno = high_seqno;
59     table.push_front(entry);
60 
61     // Cap the size of the table
62     while (table.size() > max_entries) {
63         table.pop_back();
64     }
65     cacheTableJSON();
66 }
67 
needsRollback(uint64_t start_seqno, uint64_t cur_seqno, uint64_t vb_uuid, uint64_t snap_start_seqno, uint64_t snap_end_seqno, uint64_t purge_seqno, uint64_t* rollback_seqno)68 bool FailoverTable::needsRollback(uint64_t start_seqno,
69                                   uint64_t cur_seqno,
70                                   uint64_t vb_uuid,
71                                   uint64_t snap_start_seqno,
72                                   uint64_t snap_end_seqno,
73                                   uint64_t purge_seqno,
74                                   uint64_t* rollback_seqno) {
75     LockHolder lh(lock);
76     if (start_seqno == 0) {
77         return false;
78     }
79 
80     *rollback_seqno = 0;
81     /* There may be items that are purged during compaction. We need
82      to rollback to seq no 0 in that case */
83     if (snap_start_seqno < purge_seqno) {
84         return true;
85     }
86     table_t::reverse_iterator itr;
87     for (itr = table.rbegin(); itr != table.rend(); ++itr) {
88         if (itr->vb_uuid == vb_uuid) {
89             uint64_t upper = cur_seqno;
90 
91             ++itr;
92             if (itr != table.rend()) {
93                 upper = itr->by_seqno;
94             }
95 
96             if (snap_end_seqno <= upper) {
97                 return false;
98             }
99 
100             if (snap_start_seqno == start_seqno && upper == start_seqno) {
101                 return false;
102             }
103 
104             if (snap_start_seqno >= upper) {
105                 *rollback_seqno = upper;
106                 return true;
107             }
108 
109             if (snap_start_seqno == start_seqno) {
110                 return false;
111             }
112 
113             *rollback_seqno = snap_start_seqno;
114             return true;
115         }
116     }
117 
118     return true;
119 }
120 
pruneEntries(uint64_t seqno)121 void FailoverTable::pruneEntries(uint64_t seqno) {
122     LockHolder lh(lock);
123     table_t::iterator it = table.begin();
124     for (; it != table.end(); ++it) {
125         if (it->by_seqno > seqno) {
126             it = table.erase(it);
127         }
128     }
129 
130     cacheTableJSON();
131 }
132 
toJSON()133 std::string FailoverTable::toJSON() {
134     LockHolder lh(lock);
135     return cachedTableJSON;
136 }
137 
cacheTableJSON()138 void FailoverTable::cacheTableJSON() {
139     cJSON* list = cJSON_CreateArray();
140     table_t::iterator it;
141     for(it = table.begin(); it != table.end(); it++) {
142         cJSON* obj = cJSON_CreateObject();
143         cJSON_AddNumberToObject(obj, "id", (*it).vb_uuid);
144         cJSON_AddNumberToObject(obj, "seq", (*it).by_seqno);
145         cJSON_AddItemToArray(list, obj);
146     }
147     char* json = cJSON_PrintUnformatted(list);
148     cachedTableJSON = json;
149     free(json);
150     cJSON_Delete(list);
151 }
152 
addStats(const void* cookie, uint16_t vbid, ADD_STAT add_stat)153 void FailoverTable::addStats(const void* cookie, uint16_t vbid,
154                              ADD_STAT add_stat) {
155     LockHolder lh(lock);
156     char statname[80] = {0};
157     snprintf(statname, 80, "vb_%d:num_entries", vbid);
158     add_casted_stat(statname, table.size(), add_stat, cookie);
159 
160     table_t::iterator it;
161     int entrycounter = 0;
162     for(it = table.begin(); it != table.end(); ++it) {
163         snprintf(statname, 80, "vb_%d:%d:id", vbid, entrycounter);
164         add_casted_stat(statname, it->vb_uuid, add_stat, cookie);
165         snprintf(statname, 80, "vb_%d:%d:seq", vbid, entrycounter);
166         add_casted_stat(statname, it->by_seqno, add_stat, cookie);
167         entrycounter++;
168     }
169 }
170 
addFailoverLog(const void* cookie, dcp_add_failover_log callback)171 ENGINE_ERROR_CODE FailoverTable::addFailoverLog(const void* cookie,
172                                                 dcp_add_failover_log callback) {
173     LockHolder lh(lock);
174     ENGINE_ERROR_CODE rv = ENGINE_SUCCESS;
175     size_t logsize = table.size();
176 
177     cb_assert(logsize > 0);
178     vbucket_failover_t *logentries = new vbucket_failover_t[logsize];
179     vbucket_failover_t *logentry = logentries;
180 
181     table_t::iterator itr;
182     for(itr = table.begin(); itr != table.end(); ++itr) {
183         logentry->uuid = itr->vb_uuid;
184         logentry->seqno = itr->by_seqno;
185         logentry++;
186     }
187     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
188     rv = callback(logentries, logsize, cookie);
189     ObjectRegistry::onSwitchThread(epe);
190     delete[] logentries;
191 
192     return rv;
193 }
194 
loadFromJSON(cJSON *json)195 bool FailoverTable::loadFromJSON(cJSON *json) {
196     if (json->type != cJSON_Array) {
197         return false;
198     }
199 
200     for (cJSON* it = json->child; it != NULL; it = it->next) {
201         if (it->type != cJSON_Object) {
202             return false;
203         }
204 
205         cJSON* jid = cJSON_GetObjectItem(it, "id");
206         cJSON* jseq = cJSON_GetObjectItem(it, "seq");
207 
208         if (jid && jid->type != cJSON_Number) {
209             return false;
210         }
211         if (jseq && jseq->type != cJSON_Number){
212                 return false;
213         }
214 
215         failover_entry_t entry;
216         entry.vb_uuid = (uint64_t) jid->valuedouble;
217         entry.by_seqno = (uint64_t) jseq->valuedouble;
218         table.push_back(entry);
219     }
220 
221     return true;
222 }
223 
loadFromJSON(const std::string& json)224 bool FailoverTable::loadFromJSON(const std::string& json) {
225     cJSON* parsed = cJSON_Parse(json.c_str());
226     bool ret = true;
227 
228     if (parsed) {
229         ret = loadFromJSON(parsed);
230         cachedTableJSON = json;
231         cJSON_Delete(parsed);
232     }
233 
234     return ret;
235 }
236 
replaceFailoverLog(uint8_t* bytes, uint32_t length)237 void FailoverTable::replaceFailoverLog(uint8_t* bytes, uint32_t length) {
238     LockHolder lh(lock);
239     cb_assert((length % 16) == 0 && length != 0);
240     table.clear();
241 
242     for (; length > 0; length -=16) {
243         failover_entry_t entry;
244         memcpy(&entry.by_seqno, bytes + length - 8, sizeof(uint64_t));
245         memcpy(&entry.vb_uuid, bytes + length - 16, sizeof(uint64_t));
246         entry.by_seqno = ntohll(entry.by_seqno);
247         entry.vb_uuid = ntohll(entry.vb_uuid);
248         table.push_front(entry);
249     }
250     cacheTableJSON();
251 }
252