1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ 2/* 3 * Copyright 2013 Couchbase, Inc 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18#include <relaxed_atomic.h> 19#include <stdlib.h> 20 21#include "collections/collections_types.h" 22#include "item.h" 23#include "mock_dcp.h" 24 25#include <memcached/protocol_binary.h> 26 27static EngineIface* engine_handle = nullptr; 28static EngineIface* engine_handle_v1 = nullptr; 29 30std::vector<std::pair<uint64_t, uint64_t> > dcp_failover_log; 31 32ENGINE_ERROR_CODE mock_dcp_add_failover_log(vbucket_failover_t* entry, 33 size_t nentries, 34 gsl::not_null<const void*>) { 35 while (!dcp_failover_log.empty()) { 36 dcp_failover_log.clear(); 37 } 38 39 if(nentries > 0) { 40 for (size_t i = 0; i < nentries; i--) { 41 std::pair<uint64_t, uint64_t> curr; 42 curr.first = entry[i].uuid; 43 curr.second = entry[i].seqno; 44 dcp_failover_log.push_back(curr); 45 } 46 } 47 return ENGINE_SUCCESS; 48} 49 50ENGINE_ERROR_CODE MockDcpMessageProducers::get_failover_log(uint32_t opaque, 51 Vbid vbucket) { 52 clear_dcp_data(); 53 return ENGINE_ENOTSUP; 54} 55 56ENGINE_ERROR_CODE MockDcpMessageProducers::stream_req( 57 uint32_t opaque, 58 Vbid vbucket, 59 uint32_t flags, 60 uint64_t start_seqno, 61 uint64_t end_seqno, 62 uint64_t vbucket_uuid, 63 uint64_t snap_start_seqno, 64 uint64_t snap_end_seqno, 65 const std::string& request_value) { 66 clear_dcp_data(); 67 last_op = cb::mcbp::ClientOpcode::DcpStreamReq; 68 last_opaque = opaque; 69 last_vbucket = vbucket; 70 last_flags = flags; 71 last_start_seqno = start_seqno; 72 last_end_seqno = end_seqno; 73 last_vbucket_uuid = vbucket_uuid; 74 last_packet_size = 64; 75 last_snap_start_seqno = snap_start_seqno; 76 last_snap_end_seqno = snap_end_seqno; 77 last_collection_filter = request_value; 78 return ENGINE_SUCCESS; 79} 80 81ENGINE_ERROR_CODE MockDcpMessageProducers::add_stream_rsp( 82 uint32_t opaque, uint32_t stream_opaque, cb::mcbp::Status status) { 83 clear_dcp_data(); 84 last_op = cb::mcbp::ClientOpcode::DcpAddStream; 85 last_opaque = opaque; 86 last_stream_opaque = stream_opaque; 87 last_status = status; 88 last_packet_size = 28; 89 return ENGINE_SUCCESS; 90} 91 92ENGINE_ERROR_CODE MockDcpMessageProducers::marker_rsp(uint32_t opaque, 93 cb::mcbp::Status status) { 94 clear_dcp_data(); 95 last_op = cb::mcbp::ClientOpcode::DcpSnapshotMarker; 96 last_opaque = opaque; 97 last_status = status; 98 last_packet_size = 24; 99 return ENGINE_SUCCESS; 100} 101 102ENGINE_ERROR_CODE MockDcpMessageProducers::set_vbucket_state_rsp( 103 uint32_t opaque, cb::mcbp::Status status) { 104 clear_dcp_data(); 105 last_op = cb::mcbp::ClientOpcode::DcpSetVbucketState; 106 last_opaque = opaque; 107 last_status = status; 108 last_packet_size = 24; 109 return ENGINE_SUCCESS; 110} 111 112ENGINE_ERROR_CODE MockDcpMessageProducers::stream_end( 113 uint32_t opaque, 114 Vbid vbucket, 115 uint32_t flags, 116 cb::mcbp::DcpStreamId sid) { 117 clear_dcp_data(); 118 last_op = cb::mcbp::ClientOpcode::DcpStreamEnd; 119 last_opaque = opaque; 120 last_vbucket = vbucket; 121 last_flags = flags; 122 last_packet_size = 28; 123 last_stream_id = sid; 124 125 return ENGINE_SUCCESS; 126} 127 128ENGINE_ERROR_CODE MockDcpMessageProducers::marker( 129 uint32_t opaque, 130 Vbid vbucket, 131 uint64_t snap_start_seqno, 132 uint64_t snap_end_seqno, 133 uint32_t flags, 134 boost::optional<uint64_t> highCompletedSeqno, 135 boost::optional<uint64_t> maxVisibleSeqno, 136 cb::mcbp::DcpStreamId sid) { 137 clear_dcp_data(); 138 last_op = cb::mcbp::ClientOpcode::DcpSnapshotMarker; 139 last_opaque = opaque; 140 last_vbucket = vbucket; 141 last_packet_size = 44; 142 last_snap_start_seqno = snap_start_seqno; 143 last_snap_end_seqno = snap_end_seqno; 144 last_flags = flags; 145 last_stream_id = sid; 146 if (highCompletedSeqno) { 147 last_high_completed_seqno = *highCompletedSeqno; 148 } 149 return ENGINE_SUCCESS; 150} 151 152ENGINE_ERROR_CODE MockDcpMessageProducers::mutation(uint32_t opaque, 153 cb::unique_item_ptr itm, 154 Vbid vbucket, 155 uint64_t by_seqno, 156 uint64_t rev_seqno, 157 uint32_t lock_time, 158 uint8_t nru, 159 cb::mcbp::DcpStreamId sid) { 160 auto result = handleMutationOrPrepare(cb::mcbp::ClientOpcode::DcpMutation, 161 opaque, 162 std::move(itm), 163 vbucket, 164 by_seqno, 165 rev_seqno, 166 lock_time, 167 {}, 168 nru); 169 last_stream_id = sid; 170 return result; 171} 172 173ENGINE_ERROR_CODE MockDcpMessageProducers::handleMutationOrPrepare( 174 cb::mcbp::ClientOpcode opcode, 175 uint32_t opaque, 176 cb::unique_item_ptr itm, 177 Vbid vbucket, 178 uint64_t by_seqno, 179 uint64_t rev_seqno, 180 uint32_t lock_time, 181 cb::const_char_buffer meta, 182 uint8_t nru) { 183 clear_dcp_data(); 184 Item* item = reinterpret_cast<Item*>(itm.get()); 185 last_op = opcode; 186 last_opaque = opaque; 187 last_key.assign(item->getKey().c_str()); 188 last_vbucket = vbucket; 189 last_byseqno = by_seqno; 190 last_revseqno = rev_seqno; 191 last_locktime = lock_time; 192 last_meta = to_string(meta); 193 last_value.assign(item->getData(), item->getNBytes()); 194 last_nru = nru; 195 196 // @todo: MB-24391: We are querying the header length with collections 197 // off, which if we extended our testapp tests to do collections may not be 198 // correct. For now collections testing is done via GTEST tests and isn't 199 // reliant on last_packet_size so this doesn't cause any problems. 200 last_packet_size = sizeof(cb::mcbp::Request) + 201 sizeof(cb::mcbp::request::DcpMutationPayload); 202 last_packet_size = last_packet_size + last_key.length() + 203 item->getNBytes() + meta.size(); 204 205 last_datatype = item->getDataType(); 206 last_collection_id = item->getKey().getCollectionID(); 207 208 return mutationStatus; 209} 210 211ENGINE_ERROR_CODE MockDcpMessageProducers::deletionInner( 212 uint32_t opaque, 213 cb::unique_item_ptr itm, 214 Vbid vbucket, 215 uint64_t by_seqno, 216 uint64_t rev_seqno, 217 uint32_t deleteTime, 218 uint32_t extlen, 219 DeleteSource deleteSource, 220 cb::mcbp::DcpStreamId sid) { 221 clear_dcp_data(); 222 auto* item = reinterpret_cast<Item*>(itm.get()); 223 if (deleteSource == DeleteSource::TTL) { 224 last_op = cb::mcbp::ClientOpcode::DcpExpiration; 225 } else { 226 last_op = cb::mcbp::ClientOpcode::DcpDeletion; 227 } 228 last_opaque = opaque; 229 last_key.assign(item->getKey().c_str()); 230 last_cas = item->getCas(); 231 last_vbucket = vbucket; 232 last_byseqno = by_seqno; 233 last_revseqno = rev_seqno; 234 235 // @todo: MB-24391 as above. 236 last_packet_size = sizeof(protocol_binary_request_header) + 237 last_key.length() + item->getNBytes(); 238 last_packet_size += extlen; 239 240 last_value.assign(static_cast<const char*>(item->getData()), 241 item->getNBytes()); 242 last_delete_time = deleteTime; 243 last_collection_id = item->getKey().getCollectionID(); 244 245 last_stream_id = sid; 246 247 return ENGINE_SUCCESS; 248} 249 250ENGINE_ERROR_CODE MockDcpMessageProducers::deletion(uint32_t opaque, 251 cb::unique_item_ptr itm, 252 Vbid vbucket, 253 uint64_t by_seqno, 254 uint64_t rev_seqno, 255 cb::mcbp::DcpStreamId sid) { 256 return deletionInner(opaque, 257 std::move(itm), 258 vbucket, 259 by_seqno, 260 rev_seqno, 261 0, 262 sizeof(cb::mcbp::request::DcpDeletionV1Payload), 263 DeleteSource::Explicit, 264 sid); 265} 266 267ENGINE_ERROR_CODE MockDcpMessageProducers::deletion_v2( 268 uint32_t opaque, 269 cb::unique_item_ptr itm, 270 Vbid vbucket, 271 uint64_t by_seqno, 272 uint64_t rev_seqno, 273 uint32_t deleteTime, 274 cb::mcbp::DcpStreamId sid) { 275 return deletionInner(opaque, 276 std::move(itm), 277 vbucket, 278 by_seqno, 279 rev_seqno, 280 deleteTime, 281 sizeof(cb::mcbp::request::DcpDeletionV2Payload), 282 DeleteSource::Explicit, 283 sid); 284} 285 286ENGINE_ERROR_CODE MockDcpMessageProducers::expiration( 287 uint32_t opaque, 288 cb::unique_item_ptr itm, 289 Vbid vbucket, 290 uint64_t by_seqno, 291 uint64_t rev_seqno, 292 uint32_t deleteTime, 293 cb::mcbp::DcpStreamId sid) { 294 return deletionInner(opaque, 295 std::move(itm), 296 vbucket, 297 by_seqno, 298 rev_seqno, 299 deleteTime, 300 sizeof(cb::mcbp::request::DcpExpirationPayload), 301 DeleteSource::TTL, 302 sid); 303} 304 305ENGINE_ERROR_CODE MockDcpMessageProducers::set_vbucket_state( 306 uint32_t opaque, Vbid vbucket, vbucket_state_t state) { 307 clear_dcp_data(); 308 last_op = cb::mcbp::ClientOpcode::DcpSetVbucketState; 309 last_opaque = opaque; 310 last_vbucket = vbucket; 311 last_vbucket_state = state; 312 last_packet_size = 25; 313 return ENGINE_SUCCESS; 314} 315 316ENGINE_ERROR_CODE MockDcpMessageProducers::noop(uint32_t opaque) { 317 clear_dcp_data(); 318 last_op = cb::mcbp::ClientOpcode::DcpNoop; 319 last_opaque = opaque; 320 return ENGINE_SUCCESS; 321} 322 323ENGINE_ERROR_CODE MockDcpMessageProducers::buffer_acknowledgement( 324 uint32_t opaque, Vbid vbucket, uint32_t buffer_bytes) { 325 clear_dcp_data(); 326 last_op = cb::mcbp::ClientOpcode::DcpBufferAcknowledgement; 327 last_opaque = opaque; 328 last_vbucket = vbucket; 329 return ENGINE_SUCCESS; 330} 331 332ENGINE_ERROR_CODE MockDcpMessageProducers::control( 333 uint32_t opaque, 334 cb::const_char_buffer key, 335 cb::const_char_buffer value) { 336 clear_dcp_data(); 337 last_op = cb::mcbp::ClientOpcode::DcpControl; 338 last_opaque = opaque; 339 last_key.assign(key.data(), key.size()); 340 last_value.assign(value.data(), value.size()); 341 return ENGINE_SUCCESS; 342} 343 344ENGINE_ERROR_CODE MockDcpMessageProducers::system_event( 345 uint32_t opaque, 346 Vbid vbucket, 347 mcbp::systemevent::id event, 348 uint64_t bySeqno, 349 mcbp::systemevent::version version, 350 cb::const_byte_buffer key, 351 cb::const_byte_buffer eventData, 352 cb::mcbp::DcpStreamId sid) { 353 clear_dcp_data(); 354 last_op = cb::mcbp::ClientOpcode::DcpSystemEvent; 355 last_system_event = event; 356 last_system_event_data.insert( 357 last_system_event_data.begin(), eventData.begin(), eventData.end()); 358 last_system_event_version = version; 359 360 if (event == mcbp::systemevent::id::CreateCollection) { 361 last_collection_id = 362 reinterpret_cast<const Collections::CreateEventDcpData*>( 363 eventData.data()) 364 ->cid.to_host(); 365 last_scope_id = 366 reinterpret_cast<const Collections::CreateEventDcpData*>( 367 eventData.data()) 368 ->sid.to_host(); 369 370 last_key.assign(reinterpret_cast<const char*>(key.data()), key.size()); 371 } else if (event == mcbp::systemevent::id::DeleteCollection) { 372 last_collection_id = 373 reinterpret_cast<const Collections::DropEventDcpData*>( 374 eventData.data()) 375 ->cid.to_host(); 376 } 377 378 last_stream_id = sid; 379 return ENGINE_SUCCESS; 380} 381 382ENGINE_ERROR_CODE MockDcpMessageProducers::prepare(uint32_t opaque, 383 cb::unique_item_ptr itm, 384 Vbid vbucket, 385 uint64_t by_seqno, 386 uint64_t rev_seqno, 387 uint32_t lock_time, 388 uint8_t nru, 389 DocumentState document_state, 390 cb::durability::Level) { 391 return handleMutationOrPrepare(cb::mcbp::ClientOpcode::DcpPrepare, 392 opaque, 393 std::move(itm), 394 vbucket, 395 by_seqno, 396 rev_seqno, 397 lock_time, 398 {}, 399 nru); 400} 401 402ENGINE_ERROR_CODE MockDcpMessageProducers::seqno_acknowledged( 403 uint32_t opaque, Vbid vbucket, uint64_t prepared_seqno) { 404 last_op = cb::mcbp::ClientOpcode::DcpSeqnoAcknowledged; 405 last_opaque = opaque; 406 last_vbucket = vbucket; 407 last_prepared_seqno = prepared_seqno; 408 return ENGINE_SUCCESS; 409} 410 411ENGINE_ERROR_CODE MockDcpMessageProducers::commit(uint32_t opaque, 412 Vbid vbucket, 413 const DocKey& key, 414 uint64_t prepare_seqno, 415 uint64_t commit_seqno) { 416 last_op = cb::mcbp::ClientOpcode::DcpCommit; 417 last_opaque = opaque; 418 last_vbucket = vbucket; 419 last_prepared_seqno = prepare_seqno; 420 last_commit_seqno = commit_seqno; 421 return ENGINE_SUCCESS; 422} 423 424ENGINE_ERROR_CODE MockDcpMessageProducers::abort(uint32_t opaque, 425 Vbid vbucket, 426 const DocKey& key, 427 uint64_t prepared_seqno, 428 uint64_t abort_seqno) { 429 last_op = cb::mcbp::ClientOpcode::DcpAbort; 430 last_opaque = opaque; 431 last_vbucket = vbucket; 432 last_prepared_seqno = prepared_seqno; 433 last_abort_seqno = abort_seqno; 434 return ENGINE_SUCCESS; 435} 436 437ENGINE_ERROR_CODE MockDcpMessageProducers::get_error_map(uint32_t opaque, 438 uint16_t version) { 439 clear_dcp_data(); 440 last_op = cb::mcbp::ClientOpcode::GetErrorMap; 441 return ENGINE_SUCCESS; 442} 443 444MockDcpMessageProducers::MockDcpMessageProducers(EngineIface* engine) { 445 engine_handle = engine; 446 engine_handle_v1 = engine; 447} 448 449void MockDcpMessageProducers::setMutationStatus(ENGINE_ERROR_CODE code) { 450 mutationStatus = code; 451} 452 453void MockDcpMessageProducers::clear_dcp_data() { 454 last_op = cb::mcbp::ClientOpcode::Invalid; 455 last_status = cb::mcbp::Status::Success; 456 last_nru = 0; 457 last_vbucket = Vbid(0); 458 last_opaque = 0; 459 last_flags = 0; 460 last_stream_opaque = 0; 461 last_locktime = 0; 462 last_cas = 0; 463 last_start_seqno = 0; 464 last_end_seqno = 0; 465 last_vbucket_uuid = 0; 466 last_snap_start_seqno = 0; 467 last_snap_end_seqno = 0; 468 last_byseqno = 0; 469 last_meta.clear(); 470 last_value.clear(); 471 last_key.clear(); 472 last_vbucket_state = (vbucket_state_t)0; 473 last_delete_time = 0; 474 last_collection_id = 0; 475 last_system_event_data.clear(); 476 last_system_event_version = mcbp::systemevent::version::version0; 477 last_collection_manifest_uid = 0; 478 last_stream_id = cb::mcbp::DcpStreamId{}; 479 last_prepared_seqno = 0; 480 last_high_completed_seqno = 0; 481 last_commit_seqno = 0; 482 last_abort_seqno = 0; 483} 484