1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2016 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 #include "appendprepend_context.h"
18 #include "engine_wrapper.h"
19 
20 #include <daemon/cookie.h>
21 #include <daemon/stats.h>
22 #include <memcached/durability_spec.h>
23 #include <xattr/blob.h>
24 #include <xattr/utils.h>
25 
AppendPrependCommandContext( Cookie& cookie, const cb::mcbp::Request& req)26 AppendPrependCommandContext::AppendPrependCommandContext(
27         Cookie& cookie, const cb::mcbp::Request& req)
28     : SteppableCommandContext(cookie),
29       mode((req.getClientOpcode() == cb::mcbp::ClientOpcode::Append ||
30             req.getClientOpcode() == cb::mcbp::ClientOpcode::Appendq)
31                    ? Mode::Append
32                    : Mode::Prepend),
33       vbucket(req.getVBucket()),
34       cas(req.getCas()),
35       state(State::ValidateInput),
36       datatype(uint8_t(req.getDatatype())) {
37 }
38 
step()39 ENGINE_ERROR_CODE AppendPrependCommandContext::step() {
40     auto ret = ENGINE_SUCCESS;
41     do {
42         switch (state) {
43         case State::ValidateInput:
44             ret = validateInput();
45             break;
46         case State::InflateInputData:
47             ret = inflateInputData();
48             break;
49         case State::GetItem:
50             ret = getItem();
51             break;
52         case State::AllocateNewItem:
53             ret = allocateNewItem();
54             break;
55         case State::StoreItem:
56             ret = storeItem();
57             break;
58         case State::Reset:
59             ret = reset();
60             break;
61         case State::Done:
62 	    SLAB_INCR(&connection, cmd_set);
63             return ENGINE_SUCCESS;
64         }
65     } while (ret == ENGINE_SUCCESS);
66 
67     if (ret == ENGINE_KEY_ENOENT) {
68         // for some reason the return code for no key is not stored so we need
69         // to remap that error code..
70         ret = ENGINE_NOT_STORED;
71     }
72 
73     if (ret != ENGINE_EWOULDBLOCK) {
74         SLAB_INCR(&connection, cmd_set);
75     }
76 
77     return ret;
78 }
79 
validateInput()80 ENGINE_ERROR_CODE AppendPrependCommandContext::validateInput() {
81     if (!connection.isDatatypeEnabled(datatype)) {
82         return ENGINE_EINVAL;
83     }
84 
85     if (mcbp::datatype::is_snappy(datatype)) {
86         state = State::InflateInputData;
87     } else {
88         state = State::GetItem;
89     }
90     return ENGINE_SUCCESS;
91 }
92 
inflateInputData()93 ENGINE_ERROR_CODE AppendPrependCommandContext::inflateInputData() {
94     try {
95         auto value = cookie.getRequest().getValue();
96         cb::const_char_buffer buffer{
97                 reinterpret_cast<const char*>(value.data()), value.size()};
98         if (!cb::compression::inflate(
99                     cb::compression::Algorithm::Snappy, buffer, inputbuffer)) {
100             return ENGINE_EINVAL;
101         }
102         state = State::GetItem;
103     } catch (const std::bad_alloc&) {
104         return ENGINE_ENOMEM;
105     }
106 
107     return ENGINE_SUCCESS;
108 }
109 
getItem()110 ENGINE_ERROR_CODE AppendPrependCommandContext::getItem() {
111     auto ret = bucket_get(cookie, cookie.getRequestKey(), vbucket);
112     if (ret.first == cb::engine_errc::success) {
113         olditem = std::move(ret.second);
114         if (!bucket_get_item_info(connection, olditem.get(), &oldItemInfo)) {
115             return ENGINE_FAILED;
116         }
117 
118         if (cas != 0) {
119             if (oldItemInfo.cas == uint64_t(-1)) {
120                 // The object in the cache is locked... lets try to use
121                 // the cas provided by the user to override this
122                 oldItemInfo.cas = cas;
123             } else if (cas != oldItemInfo.cas) {
124                 return ENGINE_KEY_EEXISTS;
125             }
126         } else if (oldItemInfo.cas == uint64_t(-1)) {
127             return ENGINE_LOCKED;
128         }
129 
130         if (mcbp::datatype::is_snappy(oldItemInfo.datatype)) {
131             try {
132                 cb::const_char_buffer payload(static_cast<const char*>(
133                                               oldItemInfo.value[0].iov_base),
134                                               oldItemInfo.value[0].iov_len);
135                 if (!cb::compression::inflate(cb::compression::Algorithm::Snappy,
136                                               payload,
137                                               buffer)) {
138                     return ENGINE_FAILED;
139                 }
140             } catch (const std::bad_alloc&) {
141                 return ENGINE_ENOMEM;
142             }
143         }
144 
145         // Move on to the next state
146         state = State::AllocateNewItem;
147     }
148 
149     return ENGINE_ERROR_CODE(ret.first);
150 }
151 
allocateNewItem()152 ENGINE_ERROR_CODE AppendPrependCommandContext::allocateNewItem() {
153     cb::char_buffer old{static_cast<char*>(oldItemInfo.value[0].iov_base),
154                         oldItemInfo.nbytes};
155 
156     if (buffer.size() != 0) {
157         old = {buffer.data(), buffer.size()};
158     }
159 
160     // If we're operating on a document containing xattr's we need to
161     // tell the underlying engine about how much of the data which
162     // should be accounted for in the privileged segment.
163     size_t priv_size = 0;
164 
165     // The offset into the old item where the actual body start.
166     size_t body_offset = 0;
167 
168     // If the existing item had XATTRs we need to preserve the xattrs
169     protocol_binary_datatype_t datatype = PROTOCOL_BINARY_RAW_BYTES;
170     if (mcbp::datatype::is_xattr(oldItemInfo.datatype)) {
171         datatype |= PROTOCOL_BINARY_DATATYPE_XATTR;
172 
173         // Calculate the size of the system xattr's. We know they arn't
174         // compressed as we are already using the decompression buffer as
175         // input (see head of function).
176         cb::xattr::Blob blob(old, false);
177         body_offset = blob.size();
178         priv_size = blob.get_system_size();
179     }
180 
181     // If the client sent a compressed value we should use the one
182     // we inflated
183     cb::const_byte_buffer value;
184     if (inputbuffer.size() == 0) {
185         value = cookie.getRequest().getValue();
186     } else {
187         value = inputbuffer;
188     }
189 
190     auto pair = bucket_allocate_ex(cookie,
191                                    cookie.getRequestKey(),
192                                    old.size() + value.size(),
193                                    priv_size,
194                                    oldItemInfo.flags,
195                                    (rel_time_t)oldItemInfo.exptime,
196                                    datatype,
197                                    vbucket);
198 
199     newitem = std::move(pair.first);
200     cb::byte_buffer body{static_cast<uint8_t*>(pair.second.value[0].iov_base),
201                          pair.second.value[0].iov_len};
202 
203     // copy the data over..
204     if (mode == Mode::Append) {
205         memcpy(body.buf, old.buf, old.len);
206         memcpy(body.buf + old.len, value.buf, value.len);
207     } else {
208         // The xattrs should go first (body_offset == 0 if the object
209         // don't have any xattrs)
210         memcpy(body.buf, old.buf, body_offset);
211         memcpy(body.buf + body_offset, value.buf, value.len);
212         memcpy(body.buf + body_offset + value.len, old.buf + body_offset,
213                old.len - body_offset);
214     }
215     // If the resulting document's data is valid JSON, set the datatype flag
216     // to reflect this.
217     cb::const_byte_buffer buf{
218             reinterpret_cast<const uint8_t*>(body.buf + body_offset),
219             old.len + value.len};
220     // Update the documents's datatype and CAS values
221     setDatatypeJSONFromValue(buf, datatype);
222     bucket_item_set_datatype(connection, newitem.get(), datatype);
223     bucket_item_set_cas(connection, newitem.get(), oldItemInfo.cas);
224 
225     state = State::StoreItem;
226 
227     return ENGINE_SUCCESS;
228 }
229 
storeItem()230 ENGINE_ERROR_CODE AppendPrependCommandContext::storeItem() {
231     uint64_t ncas = cas;
232     auto ret = bucket_store(cookie,
233                             newitem.get(),
234                             ncas,
235                             OPERATION_CAS,
236                             cookie.getRequest(Cookie::PacketContent::Full)
237                                     .getDurabilityRequirements());
238 
239     if (ret == ENGINE_SUCCESS) {
240         update_topkeys(cookie);
241         cookie.setCas(ncas);
242         if (connection.isSupportsMutationExtras()) {
243             item_info newItemInfo;
244             if (!bucket_get_item_info(
245                         connection, newitem.get(), &newItemInfo)) {
246                 return ENGINE_FAILED;
247             }
248             extras.vbucket_uuid = htonll(newItemInfo.vbucket_uuid);
249             extras.seqno = htonll(newItemInfo.seqno);
250             cookie.sendResponse(
251                     cb::mcbp::Status::Success,
252                     {reinterpret_cast<const char*>(&extras), sizeof(extras)},
253                     {},
254                     {},
255                     cb::mcbp::Datatype::Raw,
256                     ncas);
257         } else {
258             cookie.sendResponse(cb::mcbp::Status::Success);
259         }
260         state = State::Done;
261     } else if (ret == ENGINE_KEY_EEXISTS && cas == 0) {
262         state = State::Reset;
263         // We need to return ENGINE_SUCCESS in order to continue processing
264         ret = ENGINE_SUCCESS;
265     }
266 
267     return ret;
268 }
269 
reset()270 ENGINE_ERROR_CODE AppendPrependCommandContext::reset() {
271     olditem.reset();
272     newitem.reset();
273     buffer.reset();
274 
275     state = State::GetItem;
276     return ENGINE_SUCCESS;
277 }
278