1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2016 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 #include "appendprepend_context.h"
18 #include "engine_wrapper.h"
19 
20 #include <daemon/cookie.h>
21 #include <xattr/blob.h>
22 #include <xattr/utils.h>
23 
AppendPrependCommandContext( Cookie& cookie, const cb::mcbp::Request& req)24 AppendPrependCommandContext::AppendPrependCommandContext(
25         Cookie& cookie, const cb::mcbp::Request& req)
26     : SteppableCommandContext(cookie),
27       mode((req.opcode == PROTOCOL_BINARY_CMD_APPEND ||
28             req.opcode == PROTOCOL_BINARY_CMD_APPENDQ)
29                    ? Mode::Append
30                    : Mode::Prepend),
31       key(cookie.getRequestKey()),
32       value(reinterpret_cast<const char*>(req.getValue().buf),
33             req.getValue().len),
34       vbucket(req.getVBucket()),
35       cas(req.getCas()),
36       state(State::ValidateInput),
37       datatype(req.datatype) {
38 }
39 
step()40 ENGINE_ERROR_CODE AppendPrependCommandContext::step() {
41     auto ret = ENGINE_SUCCESS;
42     do {
43         switch (state) {
44         case State::ValidateInput:
45             ret = validateInput();
46             break;
47         case State::InflateInputData:
48             ret = inflateInputData();
49             break;
50         case State::GetItem:
51             ret = getItem();
52             break;
53         case State::AllocateNewItem:
54             ret = allocateNewItem();
55             break;
56         case State::StoreItem:
57             ret = storeItem();
58             break;
59         case State::Reset:
60             ret = reset();
61             break;
62         case State::Done:
63 	    SLAB_INCR(&connection, cmd_set);
64             return ENGINE_SUCCESS;
65         }
66     } while (ret == ENGINE_SUCCESS);
67 
68     if (ret == ENGINE_KEY_ENOENT) {
69         // for some reason the return code for no key is not stored so we need
70         // to remap that error code..
71         ret = ENGINE_NOT_STORED;
72     }
73 
74     if (ret != ENGINE_EWOULDBLOCK) {
75         SLAB_INCR(&connection, cmd_set);
76     }
77 
78     return ret;
79 }
80 
validateInput()81 ENGINE_ERROR_CODE AppendPrependCommandContext::validateInput() {
82     if (!connection.isDatatypeEnabled(datatype)) {
83         return ENGINE_EINVAL;
84     }
85 
86     if (mcbp::datatype::is_snappy(datatype)) {
87         state = State::InflateInputData;
88     } else {
89         state = State::GetItem;
90     }
91     return ENGINE_SUCCESS;
92 }
93 
inflateInputData()94 ENGINE_ERROR_CODE AppendPrependCommandContext::inflateInputData() {
95     try {
96         if (!cb::compression::inflate(cb::compression::Algorithm::Snappy,
97                                       value,
98                                       inputbuffer)) {
99             return ENGINE_EINVAL;
100         }
101         value = inputbuffer;
102         state = State::GetItem;
103     } catch (const std::bad_alloc&) {
104         return ENGINE_ENOMEM;
105     }
106 
107     return ENGINE_SUCCESS;
108 }
109 
getItem()110 ENGINE_ERROR_CODE AppendPrependCommandContext::getItem() {
111     auto ret = bucket_get(cookie, key, vbucket);
112     if (ret.first == cb::engine_errc::success) {
113         olditem = std::move(ret.second);
114         if (!bucket_get_item_info(cookie, olditem.get(), &oldItemInfo)) {
115             return ENGINE_FAILED;
116         }
117 
118         if (cas != 0) {
119             if (oldItemInfo.cas == uint64_t(-1)) {
120                 // The object in the cache is locked... lets try to use
121                 // the cas provided by the user to override this
122                 oldItemInfo.cas = cas;
123             } else if (cas != oldItemInfo.cas) {
124                 return ENGINE_KEY_EEXISTS;
125             }
126         } else if (oldItemInfo.cas == uint64_t(-1)) {
127             return ENGINE_LOCKED;
128         }
129 
130         if (mcbp::datatype::is_snappy(oldItemInfo.datatype)) {
131             try {
132                 cb::const_char_buffer payload(static_cast<const char*>(
133                                               oldItemInfo.value[0].iov_base),
134                                               oldItemInfo.value[0].iov_len);
135                 if (!cb::compression::inflate(cb::compression::Algorithm::Snappy,
136                                               payload,
137                                               buffer)) {
138                     return ENGINE_FAILED;
139                 }
140             } catch (const std::bad_alloc&) {
141                 return ENGINE_ENOMEM;
142             }
143         }
144 
145         // Move on to the next state
146         state = State::AllocateNewItem;
147     }
148 
149     return ENGINE_ERROR_CODE(ret.first);
150 }
151 
allocateNewItem()152 ENGINE_ERROR_CODE AppendPrependCommandContext::allocateNewItem() {
153     cb::char_buffer old{static_cast<char*>(oldItemInfo.value[0].iov_base),
154                         oldItemInfo.nbytes};
155 
156     if (buffer.size() != 0) {
157         old = {buffer.data(), buffer.size()};
158     }
159 
160     // If we're operating on a document containing xattr's we need to
161     // tell the underlying engine about how much of the data which
162     // should be accounted for in the privileged segment.
163     size_t priv_size = 0;
164 
165     // The offset into the old item where the actual body start.
166     size_t body_offset = 0;
167 
168     // If the existing item had XATTRs we need to preserve the xattrs
169     protocol_binary_datatype_t datatype = PROTOCOL_BINARY_RAW_BYTES;
170     if (mcbp::datatype::is_xattr(oldItemInfo.datatype)) {
171         datatype |= PROTOCOL_BINARY_DATATYPE_XATTR;
172 
173         // Calculate the size of the system xattr's. We know they arn't
174         // compressed as we are already using the decompression buffer as
175         // input (see head of function).
176         cb::xattr::Blob blob(old, false);
177         body_offset = blob.size();
178         priv_size = blob.get_system_size();
179     }
180 
181     auto pair = bucket_allocate_ex(cookie,
182                                    key,
183                                    old.len + value.len,
184                                    priv_size,
185                                    oldItemInfo.flags,
186                                    (rel_time_t)oldItemInfo.exptime,
187                                    datatype,
188                                    vbucket);
189 
190     newitem = std::move(pair.first);
191     cb::byte_buffer body{static_cast<uint8_t*>(pair.second.value[0].iov_base),
192                          pair.second.value[0].iov_len};
193 
194     // copy the data over..
195     if (mode == Mode::Append) {
196         memcpy(body.buf, old.buf, old.len);
197         memcpy(body.buf + old.len, value.buf, value.len);
198     } else {
199         // The xattrs should go first (body_offset == 0 if the object
200         // don't have any xattrs)
201         memcpy(body.buf, old.buf, body_offset);
202         memcpy(body.buf + body_offset, value.buf, value.len);
203         memcpy(body.buf + body_offset + value.len, old.buf + body_offset,
204                old.len - body_offset);
205     }
206     // If the resulting document's data is valid JSON, set the datatype flag
207     // to reflect this.
208     cb::const_byte_buffer buf{
209             reinterpret_cast<const uint8_t*>(body.buf + body_offset),
210             old.len + value.len};
211     // Update the documents's datatype and CAS values
212     setDatatypeJSONFromValue(buf, datatype);
213     bucket_item_set_datatype(cookie, newitem.get(), datatype);
214     bucket_item_set_cas(cookie, newitem.get(), oldItemInfo.cas);
215 
216     state = State::StoreItem;
217 
218     return ENGINE_SUCCESS;
219 }
220 
storeItem()221 ENGINE_ERROR_CODE AppendPrependCommandContext::storeItem() {
222     uint64_t ncas = cas;
223     auto ret = bucket_store(cookie, newitem.get(), ncas, OPERATION_CAS);
224 
225     if (ret == ENGINE_SUCCESS) {
226         update_topkeys(cookie);
227         cookie.setCas(ncas);
228         if (connection.isSupportsMutationExtras()) {
229             item_info newItemInfo;
230             if (!bucket_get_item_info(cookie, newitem.get(), &newItemInfo)) {
231                 return ENGINE_FAILED;
232             }
233             extras.vbucket_uuid = htonll(newItemInfo.vbucket_uuid);
234             extras.seqno = htonll(newItemInfo.seqno);
235             cookie.sendResponse(
236                     cb::mcbp::Status::Success,
237                     {reinterpret_cast<const char*>(&extras), sizeof(extras)},
238                     {},
239                     {},
240                     cb::mcbp::Datatype::Raw,
241                     ncas);
242         } else {
243             cookie.sendResponse(cb::mcbp::Status::Success);
244         }
245         state = State::Done;
246     } else if (ret == ENGINE_KEY_EEXISTS && cas == 0) {
247         state = State::Reset;
248         // We need to return ENGINE_SUCCESS in order to continue processing
249         ret = ENGINE_SUCCESS;
250     }
251 
252     return ret;
253 }
254 
reset()255 ENGINE_ERROR_CODE AppendPrependCommandContext::reset() {
256     olditem.reset();
257     newitem.reset();
258     buffer.reset();
259 
260     state = State::GetItem;
261     return ENGINE_SUCCESS;
262 }
263