1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ 2/* 3 * Copyright 2014 Couchbase, Inc 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17#include "config.h" 18 19#include "atomic.h" 20#include "failover-table.h" 21#include "locks.h" 22#define STATWRITER_NAMESPACE failovers 23#include "statwriter.h" 24#undef STATWRITER_NAMESPACE 25 26FailoverTable::FailoverTable(size_t capacity) 27 : max_entries(capacity), provider(true) { 28 createEntry(0); 29 cacheTableJSON(); 30} 31 32FailoverTable::FailoverTable(const std::string& json, size_t capacity) 33 : max_entries(capacity), provider(true) { 34 loadFromJSON(json); 35 cb_assert(table.size() > 0); 36} 37 38FailoverTable::~FailoverTable() { } 39 40failover_entry_t FailoverTable::getLatestEntry() { 41 LockHolder lh(lock); 42 return table.front(); 43} 44 45void FailoverTable::createEntry(uint64_t high_seqno) { 46 LockHolder lh(lock); 47 // Our failover table represents only *our* branch of history. 48 // We must remove branches we've diverged from. 49 table_t::iterator itr = table.begin(); 50 for (; itr != table.end(); ++itr) { 51 if (itr->by_seqno > high_seqno) { 52 itr = table.erase(itr); 53 } 54 } 55 56 failover_entry_t entry; 57 entry.vb_uuid = (provider.next() >> 16); 58 entry.by_seqno = high_seqno; 59 table.push_front(entry); 60 61 // Cap the size of the table 62 while (table.size() > max_entries) { 63 table.pop_back(); 64 } 65 cacheTableJSON(); 66} 67 68bool FailoverTable::needsRollback(uint64_t start_seqno, 69 uint64_t cur_seqno, 70 uint64_t vb_uuid, 71 uint64_t snap_start_seqno, 72 uint64_t snap_end_seqno, 73 uint64_t purge_seqno, 74 uint64_t* rollback_seqno) { 75 LockHolder lh(lock); 76 if (start_seqno == 0) { 77 return false; 78 } 79 80 *rollback_seqno = 0; 81 /* There may be items that are purged during compaction. We need 82 to rollback to seq no 0 in that case */ 83 if (snap_start_seqno < purge_seqno) { 84 return true; 85 } 86 table_t::reverse_iterator itr; 87 for (itr = table.rbegin(); itr != table.rend(); ++itr) { 88 if (itr->vb_uuid == vb_uuid) { 89 uint64_t upper = cur_seqno; 90 91 ++itr; 92 if (itr != table.rend()) { 93 upper = itr->by_seqno; 94 } 95 96 if (snap_end_seqno <= upper) { 97 return false; 98 } 99 100 if (snap_start_seqno == start_seqno && upper == start_seqno) { 101 return false; 102 } 103 104 if (snap_start_seqno >= upper) { 105 *rollback_seqno = upper; 106 return true; 107 } 108 109 if (snap_start_seqno == start_seqno) { 110 return false; 111 } 112 113 *rollback_seqno = snap_start_seqno; 114 return true; 115 } 116 } 117 118 return true; 119} 120 121void FailoverTable::pruneEntries(uint64_t seqno) { 122 LockHolder lh(lock); 123 table_t::iterator it = table.begin(); 124 for (; it != table.end(); ++it) { 125 if (it->by_seqno > seqno) { 126 it = table.erase(it); 127 } 128 } 129 130 cacheTableJSON(); 131} 132 133std::string FailoverTable::toJSON() { 134 LockHolder lh(lock); 135 return cachedTableJSON; 136} 137 138void FailoverTable::cacheTableJSON() { 139 cJSON* list = cJSON_CreateArray(); 140 table_t::iterator it; 141 for(it = table.begin(); it != table.end(); it++) { 142 cJSON* obj = cJSON_CreateObject(); 143 cJSON_AddNumberToObject(obj, "id", (*it).vb_uuid); 144 cJSON_AddNumberToObject(obj, "seq", (*it).by_seqno); 145 cJSON_AddItemToArray(list, obj); 146 } 147 char* json = cJSON_PrintUnformatted(list); 148 cachedTableJSON = json; 149 free(json); 150 cJSON_Delete(list); 151} 152 153void FailoverTable::addStats(const void* cookie, uint16_t vbid, 154 ADD_STAT add_stat) { 155 LockHolder lh(lock); 156 char statname[80] = {0}; 157 snprintf(statname, 80, "vb_%d:num_entries", vbid); 158 add_casted_stat(statname, table.size(), add_stat, cookie); 159 160 table_t::iterator it; 161 int entrycounter = 0; 162 for(it = table.begin(); it != table.end(); ++it) { 163 snprintf(statname, 80, "vb_%d:%d:id", vbid, entrycounter); 164 add_casted_stat(statname, it->vb_uuid, add_stat, cookie); 165 snprintf(statname, 80, "vb_%d:%d:seq", vbid, entrycounter); 166 add_casted_stat(statname, it->by_seqno, add_stat, cookie); 167 entrycounter++; 168 } 169} 170 171ENGINE_ERROR_CODE FailoverTable::addFailoverLog(const void* cookie, 172 dcp_add_failover_log callback) { 173 LockHolder lh(lock); 174 ENGINE_ERROR_CODE rv = ENGINE_SUCCESS; 175 size_t logsize = table.size(); 176 177 cb_assert(logsize > 0); 178 vbucket_failover_t *logentries = new vbucket_failover_t[logsize]; 179 vbucket_failover_t *logentry = logentries; 180 181 table_t::iterator itr; 182 for(itr = table.begin(); itr != table.end(); ++itr) { 183 logentry->uuid = itr->vb_uuid; 184 logentry->seqno = itr->by_seqno; 185 logentry++; 186 } 187 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true); 188 rv = callback(logentries, logsize, cookie); 189 ObjectRegistry::onSwitchThread(epe); 190 delete[] logentries; 191 192 return rv; 193} 194 195bool FailoverTable::loadFromJSON(cJSON *json) { 196 if (json->type != cJSON_Array) { 197 return false; 198 } 199 200 for (cJSON* it = json->child; it != NULL; it = it->next) { 201 if (it->type != cJSON_Object) { 202 return false; 203 } 204 205 cJSON* jid = cJSON_GetObjectItem(it, "id"); 206 cJSON* jseq = cJSON_GetObjectItem(it, "seq"); 207 208 if (jid && jid->type != cJSON_Number) { 209 return false; 210 } 211 if (jseq && jseq->type != cJSON_Number){ 212 return false; 213 } 214 215 failover_entry_t entry; 216 entry.vb_uuid = (uint64_t) jid->valuedouble; 217 entry.by_seqno = (uint64_t) jseq->valuedouble; 218 table.push_back(entry); 219 } 220 221 return true; 222} 223 224bool FailoverTable::loadFromJSON(const std::string& json) { 225 cJSON* parsed = cJSON_Parse(json.c_str()); 226 bool ret = true; 227 228 if (parsed) { 229 ret = loadFromJSON(parsed); 230 cachedTableJSON = json; 231 cJSON_Delete(parsed); 232 } 233 234 return ret; 235} 236 237void FailoverTable::replaceFailoverLog(uint8_t* bytes, uint32_t length) { 238 LockHolder lh(lock); 239 cb_assert((length % 16) == 0 && length != 0); 240 table.clear(); 241 242 for (; length > 0; length -=16) { 243 failover_entry_t entry; 244 memcpy(&entry.by_seqno, bytes + length - 8, sizeof(uint64_t)); 245 memcpy(&entry.vb_uuid, bytes + length - 16, sizeof(uint64_t)); 246 entry.by_seqno = ntohll(entry.by_seqno); 247 entry.vb_uuid = ntohll(entry.vb_uuid); 248 table.push_front(entry); 249 } 250 cacheTableJSON(); 251} 252