1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2016 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 #include "remove_context.h"
18 #include "engine_wrapper.h"
19 
20 #include <daemon/buckets.h>
21 #include <daemon/mcbp.h>
22 #include <memcached/durability_spec.h>
23 #include <xattr/blob.h>
24 #include <xattr/utils.h>
25 
step()26 ENGINE_ERROR_CODE RemoveCommandContext::step() {
27     auto ret = ENGINE_SUCCESS;
28     do {
29         switch (state) {
30         case State::GetItem:
31             ret = getItem();
32             break;
33         case State::RebuildXattr:
34             ret = rebuildXattr();
35             break;
36         case State::AllocateDeletedItem:
37             ret = allocateDeletedItem();
38             break;
39         case State::StoreItem:
40             ret = storeItem();
41             break;
42         case State::RemoveItem:
43             ret = removeItem();
44             break;
45         case State::SendResponse:
46             ret = sendResponse();
47             break;
48         case State::Reset:
49             ret = reset();
50             break;
51         case State::Done:
52             SLAB_INCR(&connection, delete_hits);
53             update_topkeys(cookie);
54             return ENGINE_SUCCESS;
55         }
56     } while (ret == ENGINE_SUCCESS);
57 
58 
59     if (ret == ENGINE_KEY_ENOENT) {
60         STATS_INCR(&connection, delete_misses);
61     }
62 
63     return ret;
64 }
65 
getItem()66 ENGINE_ERROR_CODE RemoveCommandContext::getItem() {
67     auto ret = bucket_get(cookie, cookie.getRequestKey(), vbucket);
68     if (ret.first == cb::engine_errc::success) {
69         existing = std::move(ret.second);
70         if (!bucket_get_item_info(connection, existing.get(), &existing_info)) {
71             return ENGINE_FAILED;
72         }
73 
74         if (input_cas != 0) {
75             if (existing_info.cas == uint64_t(-1)) {
76                 // The object in the cache is locked... lets try to use
77                 // the cas provided by the user to override this
78                 existing_info.cas = input_cas;
79             } else if (input_cas != existing_info.cas) {
80                 return ENGINE_KEY_EEXISTS;
81             }
82         } else if (existing_info.cas == uint64_t(-1)) {
83             return ENGINE_LOCKED;
84         }
85 
86         if (mcbp::datatype::is_xattr(existing_info.datatype)) {
87             state = State::RebuildXattr;
88         } else {
89             state = State::RemoveItem;
90         }
91     }
92     return ENGINE_ERROR_CODE(ret.first);
93 }
94 
allocateDeletedItem()95 ENGINE_ERROR_CODE RemoveCommandContext::allocateDeletedItem() {
96     protocol_binary_datatype_t datatype;
97     if (xattr.size() == 0) {
98         datatype = PROTOCOL_BINARY_RAW_BYTES;
99     } else {
100         datatype = PROTOCOL_BINARY_DATATYPE_XATTR;
101     }
102     auto pair =
103             bucket_allocate_ex(cookie,
104                                cookie.getRequestKey(),
105                                xattr.size(),
106                                xattr.size(), // Only system xattrs
107                                0, // MB-25273: 0 flags when deleting the body
108                                0, // Tombstone item, reset expiry time as will
109                                   // be removed during purge
110                                datatype,
111                                vbucket);
112 
113     deleted = std::move(pair.first);
114     if (input_cas == 0) {
115         bucket_item_set_cas(connection, deleted.get(), existing_info.cas);
116     } else {
117         bucket_item_set_cas(connection, deleted.get(), input_cas);
118     }
119 
120     if (xattr.size() > 0) {
121         std::memcpy(pair.second.value[0].iov_base, xattr.buf, xattr.size());
122     }
123 
124     state = State::StoreItem;
125     return ENGINE_SUCCESS;
126 }
127 
storeItem()128 ENGINE_ERROR_CODE RemoveCommandContext::storeItem() {
129     uint64_t new_cas;
130     auto ret = bucket_store(cookie,
131                             deleted.get(),
132                             new_cas,
133                             OPERATION_CAS,
134                             {},
135                             DocumentState::Deleted);
136 
137     if (ret == ENGINE_SUCCESS) {
138 
139         item_info info;
140         if (!bucket_get_item_info(connection, deleted.get(), &info)) {
141             return ENGINE_FAILED;
142         }
143 
144         // Response includes vbucket UUID and sequence number
145         mutation_descr.vbucket_uuid = info.vbucket_uuid;
146         mutation_descr.seqno = info.seqno;
147         cookie.setCas(info.cas);
148 
149         state = State::SendResponse;
150     } else if (ret == ENGINE_KEY_EEXISTS && input_cas == 0) {
151         // Cas collision and the caller specified the CAS wildcard.. retry
152         state = State::Reset;
153         ret = ENGINE_SUCCESS;
154     }
155 
156     return ret;
157 }
158 
removeItem()159 ENGINE_ERROR_CODE RemoveCommandContext::removeItem() {
160     uint64_t new_cas = input_cas;
161     const auto& request = cookie.getRequest(Cookie::PacketContent::Full);
162     auto ret = bucket_remove(cookie,
163                              cookie.getRequestKey(),
164                              new_cas,
165                              vbucket,
166                              request.getDurabilityRequirements(),
167                              mutation_descr);
168 
169     if (ret == ENGINE_SUCCESS) {
170         cookie.setCas(new_cas);
171         state = State::SendResponse;
172     } else if (ret == ENGINE_KEY_EEXISTS && input_cas == 0) {
173         // Cas collision and the caller specified the CAS wildcard.. retry
174         state = State::Reset;
175         ret = ENGINE_SUCCESS;
176     }
177 
178     return ret;
179 }
180 
reset()181 ENGINE_ERROR_CODE RemoveCommandContext::reset() {
182     deleted.reset();
183     existing.reset();
184 
185     xattr = {nullptr, 0};
186 
187     state = State::GetItem;
188     return ENGINE_SUCCESS;
189 }
190 
sendResponse()191 ENGINE_ERROR_CODE RemoveCommandContext::sendResponse() {
192     state = State::Done;
193 
194     if (cookie.getRequest().isQuiet()) {
195         ++connection.getBucket()
196                   .responseCounters[int(cb::mcbp::Status::Success)];
197         connection.setState(StateMachine::State::new_cmd);
198         return ENGINE_SUCCESS;
199     }
200 
201     if (connection.isSupportsMutationExtras()) {
202         // Response includes vbucket UUID and sequence number
203         // Make the byte ordering in the mutation descriptor
204         mutation_descr.vbucket_uuid = htonll(mutation_descr.vbucket_uuid);
205         mutation_descr.seqno = htonll(mutation_descr.seqno);
206 
207         cb::const_char_buffer extras = {
208                 reinterpret_cast<const char*>(&mutation_descr),
209                 sizeof(mutation_descr_t)};
210 
211         cookie.sendResponse(cb::mcbp::Status::Success,
212                             extras,
213                             {},
214                             {},
215                             cb::mcbp::Datatype::Raw,
216                             cookie.getCas());
217     } else {
218         cookie.sendResponse(cb::mcbp::Status::Success);
219     }
220 
221     return ENGINE_SUCCESS;
222 }
223 
rebuildXattr()224 ENGINE_ERROR_CODE RemoveCommandContext::rebuildXattr() {
225     if (mcbp::datatype::is_xattr(existing_info.datatype)) {
226         // Create a const blob of the incoming data, which may decompress it
227         // Note when writing back the xattrs (if any remain) the snappy bit is
228         // never reset, so no need to remember if we did decompress.
229         const cb::xattr::Blob existingData(
230                 {static_cast<char*>(existing_info.value[0].iov_base),
231                  existing_info.value[0].iov_len},
232                 mcbp::datatype::is_snappy(existing_info.datatype));
233 
234         // We can't modify the item as when we try to replace the item it
235         // may fail due to a race condition (writing back into the existing
236         // item). Create a temporary copy of the current value and prune that.
237         // Given that we're only going to (potentially) remove data in the xattr
238         // blob, it will only _shrink_ in size so we  don't need to pass on the
239         // allocator to the blob
240         xattr_buffer.reset(new char[existingData.size()]);
241         std::copy_n(
242                 existingData.data(), existingData.size(), xattr_buffer.get());
243 
244         // Now prune the copy
245         cb::xattr::Blob blob({xattr_buffer.get(), existingData.size()},
246                              false /* data is not compressed*/);
247         blob.prune_user_keys();
248         xattr = blob.finalize();
249         if (xattr.data() != xattr_buffer.get()) {
250             throw std::logic_error(
251                     "RemoveCommandContext::rebuildXattr: Internal error. No "
252                     "reallocations should happend when pruning user "
253                     "attributes");
254         }
255     }
256 
257     if (xattr.size() > 0) {
258         state = State::AllocateDeletedItem;
259     } else {
260         // All xattrs should be nuked
261         state = State::RemoveItem;
262     }
263 
264     return ENGINE_SUCCESS;
265 }
266