1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2016 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 #include "remove_context.h"
18 #include "engine_wrapper.h"
19 
20 #include <daemon/buckets.h>
21 #include <daemon/mcbp.h>
22 #include <xattr/blob.h>
23 #include <xattr/utils.h>
24 
step()25 ENGINE_ERROR_CODE RemoveCommandContext::step() {
26     auto ret = ENGINE_SUCCESS;
27     do {
28         switch (state) {
29         case State::GetItem:
30             ret = getItem();
31             break;
32         case State::RebuildXattr:
33             ret = rebuildXattr();
34             break;
35         case State::AllocateDeletedItem:
36             ret = allocateDeletedItem();
37             break;
38         case State::StoreItem:
39             ret = storeItem();
40             break;
41         case State::RemoveItem:
42             ret = removeItem();
43             break;
44         case State::SendResponse:
45             ret = sendResponse();
46             break;
47         case State::Reset:
48             ret = reset();
49             break;
50         case State::Done:
51             SLAB_INCR(&connection, delete_hits);
52             update_topkeys(cookie);
53             return ENGINE_SUCCESS;
54         }
55     } while (ret == ENGINE_SUCCESS);
56 
57 
58     if (ret == ENGINE_KEY_ENOENT) {
59         STATS_INCR(&connection, delete_misses);
60     }
61 
62     return ret;
63 }
64 
getItem()65 ENGINE_ERROR_CODE RemoveCommandContext::getItem() {
66     auto ret = bucket_get(cookie, key, vbucket);
67     if (ret.first == cb::engine_errc::success) {
68         existing = std::move(ret.second);
69         if (!bucket_get_item_info(cookie, existing.get(), &existing_info)) {
70             return ENGINE_FAILED;
71         }
72 
73         if (input_cas != 0) {
74             if (existing_info.cas == uint64_t(-1)) {
75                 // The object in the cache is locked... lets try to use
76                 // the cas provided by the user to override this
77                 existing_info.cas = input_cas;
78             } else if (input_cas != existing_info.cas) {
79                 return ENGINE_KEY_EEXISTS;
80             }
81         } else if (existing_info.cas == uint64_t(-1)) {
82             return ENGINE_LOCKED;
83         }
84 
85         if (mcbp::datatype::is_xattr(existing_info.datatype)) {
86             state = State::RebuildXattr;
87         } else {
88             state = State::RemoveItem;
89         }
90     }
91     return ENGINE_ERROR_CODE(ret.first);
92 }
93 
allocateDeletedItem()94 ENGINE_ERROR_CODE RemoveCommandContext::allocateDeletedItem() {
95     protocol_binary_datatype_t datatype;
96     if (xattr.size() == 0) {
97         datatype = PROTOCOL_BINARY_RAW_BYTES;
98     } else {
99         datatype = PROTOCOL_BINARY_DATATYPE_XATTR;
100     }
101     auto pair =
102             bucket_allocate_ex(cookie,
103                                key,
104                                xattr.size(),
105                                xattr.size(), // Only system xattrs
106                                0, // MB-25273: 0 flags when deleting the body
107                                0, // Tombstone item, reset expiry time as will
108                                   // be removed during purge
109                                datatype,
110                                vbucket);
111 
112     deleted = std::move(pair.first);
113     if (input_cas == 0) {
114         bucket_item_set_cas(cookie, deleted.get(), existing_info.cas);
115     } else {
116         bucket_item_set_cas(cookie, deleted.get(), input_cas);
117     }
118 
119     if (xattr.size() > 0) {
120         std::memcpy(pair.second.value[0].iov_base, xattr.buf, xattr.size());
121     }
122 
123     state = State::StoreItem;
124     return ENGINE_SUCCESS;
125 }
126 
storeItem()127 ENGINE_ERROR_CODE RemoveCommandContext::storeItem() {
128     uint64_t new_cas;
129     auto ret = bucket_store(cookie,
130                             deleted.get(),
131                             new_cas,
132                             OPERATION_CAS,
133                             DocumentState::Deleted);
134 
135     if (ret == ENGINE_SUCCESS) {
136 
137         item_info info;
138         if (!bucket_get_item_info(cookie, deleted.get(), &info)) {
139             return ENGINE_FAILED;
140         }
141 
142         // Response includes vbucket UUID and sequence number
143         mutation_descr.vbucket_uuid = info.vbucket_uuid;
144         mutation_descr.seqno = info.seqno;
145         cookie.setCas(info.cas);
146 
147         state = State::SendResponse;
148     } else if (ret == ENGINE_KEY_EEXISTS && input_cas == 0) {
149         // Cas collision and the caller specified the CAS wildcard.. retry
150         state = State::Reset;
151         ret = ENGINE_SUCCESS;
152     }
153 
154     return ret;
155 }
156 
removeItem()157 ENGINE_ERROR_CODE RemoveCommandContext::removeItem() {
158     uint64_t new_cas = input_cas;
159     auto ret = bucket_remove(cookie, key, new_cas, vbucket, mutation_descr);
160 
161     if (ret == ENGINE_SUCCESS) {
162         cookie.setCas(new_cas);
163         state = State::SendResponse;
164     } else if (ret == ENGINE_KEY_EEXISTS && input_cas == 0) {
165         // Cas collision and the caller specified the CAS wildcard.. retry
166         state = State::Reset;
167         ret = ENGINE_SUCCESS;
168     }
169 
170     return ret;
171 }
172 
reset()173 ENGINE_ERROR_CODE RemoveCommandContext::reset() {
174     deleted.reset();
175     existing.reset();
176 
177     xattr = {nullptr, 0};
178 
179     state = State::GetItem;
180     return ENGINE_SUCCESS;
181 }
182 
sendResponse()183 ENGINE_ERROR_CODE RemoveCommandContext::sendResponse() {
184     state = State::Done;
185 
186     if (cookie.getRequest().isQuiet()) {
187         ++connection.getBucket()
188                   .responseCounters[PROTOCOL_BINARY_RESPONSE_SUCCESS];
189         connection.setState(McbpStateMachine::State::new_cmd);
190         return ENGINE_SUCCESS;
191     }
192 
193     if (connection.isSupportsMutationExtras()) {
194         // Response includes vbucket UUID and sequence number
195         // Make the byte ordering in the mutation descriptor
196         mutation_descr.vbucket_uuid = htonll(mutation_descr.vbucket_uuid);
197         mutation_descr.seqno = htonll(mutation_descr.seqno);
198 
199         cb::const_char_buffer extras = {
200                 reinterpret_cast<const char*>(&mutation_descr),
201                 sizeof(mutation_descr_t)};
202 
203         cookie.sendResponse(cb::mcbp::Status::Success,
204                             extras,
205                             {},
206                             {},
207                             cb::mcbp::Datatype::Raw,
208                             cookie.getCas());
209     } else {
210         cookie.sendResponse(cb::mcbp::Status::Success);
211     }
212 
213     return ENGINE_SUCCESS;
214 }
215 
rebuildXattr()216 ENGINE_ERROR_CODE RemoveCommandContext::rebuildXattr() {
217     if (mcbp::datatype::is_xattr(existing_info.datatype)) {
218         // Create a const blob of the incoming data, which may decompress it
219         // Note when writing back the xattrs (if any remain) the snappy bit is
220         // never reset, so no need to remember if we did decompress.
221         const cb::xattr::Blob existingData(
222                 {static_cast<char*>(existing_info.value[0].iov_base),
223                  existing_info.value[0].iov_len},
224                 mcbp::datatype::is_snappy(existing_info.datatype));
225 
226         // We can't modify the item as when we try to replace the item it
227         // may fail due to a race condition (writing back into the existing
228         // item). Create a temporary copy of the current value and prune that.
229         // Given that we're only going to (potentially) remove data in the xattr
230         // blob, it will only _shrink_ in size so we  don't need to pass on the
231         // allocator to the blob
232         xattr_buffer.reset(new char[existingData.size()]);
233         std::copy_n(
234                 existingData.data(), existingData.size(), xattr_buffer.get());
235 
236         // Now prune the copy
237         cb::xattr::Blob blob({xattr_buffer.get(), existingData.size()},
238                              false /* data is not compressed*/);
239         blob.prune_user_keys();
240         xattr = blob.finalize();
241         if (xattr.data() != xattr_buffer.get()) {
242             throw std::logic_error(
243                     "RemoveCommandContext::rebuildXattr: Internal error. No "
244                     "reallocations should happend when pruning user "
245                     "attributes");
246         }
247     }
248 
249     if (xattr.size() > 0) {
250         state = State::AllocateDeletedItem;
251     } else {
252         // All xattrs should be nuked
253         state = State::RemoveItem;
254     }
255 
256     return ENGINE_SUCCESS;
257 }
258