xref: /5.5.2/kv_engine/daemon/mcbp.cc (revision 1aae4b16)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2016 Couchbase, Inc.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17#include "mcbp.h"
18
19#include "buckets.h"
20#include "debug_helpers.h"
21#include "memcached.h"
22#include "utilities/logtags.h"
23#include "xattr/utils.h"
24
25#include <mcbp/protocol/framebuilder.h>
26#include <platform/compress.h>
27
28static cb::const_byte_buffer mcbp_add_header(Cookie& cookie,
29                                             cb::Pipe& pipe,
30                                             uint8_t opcode,
31                                             uint16_t err,
32                                             uint8_t ext_len,
33                                             uint16_t key_len,
34                                             uint32_t body_len,
35                                             uint8_t datatype,
36                                             uint32_t opaque,
37                                             uint64_t cas) {
38    auto wbuf = pipe.wdata();
39    auto* header = (protocol_binary_response_header*)wbuf.data();
40
41    header->response.opcode = opcode;
42    header->response.extlen = ext_len;
43    header->response.datatype = datatype;
44    header->response.status = (uint16_t)htons(err);
45    header->response.opaque = opaque;
46    header->response.cas = htonll(cas);
47
48    if (cookie.isTracingEnabled()) {
49        // When tracing is enabled we'll be using the alternative
50        // response header where we inject the framing header.
51        // For now we'll just hard-code the adding of the bytes
52        // for the tracing info.
53        //
54        // Moving forward we should get a builder for encoding the
55        // framing header (but do that the next time we need to add
56        // something so that we have a better understanding on how
57        // we need to do that (it could be that we need to modify
58        // an already existing section etc).
59        header->response.magic = uint8_t(cb::mcbp::Magic::AltClientResponse);
60        // The framing extras when we just include the tracing information
61        // is 3 bytes. 1 byte with id and length, then the 2 bytes
62        // containing the actual data.
63        const uint8_t framing_extras_size = MCBP_TRACING_RESPONSE_SIZE;
64        const uint8_t tracing_framing_id = 0x02;
65
66        header->bytes[2] = framing_extras_size; // framing header extras 3 bytes
67        header->bytes[3] = uint8_t(key_len);
68        header->response.setBodylen(body_len + framing_extras_size);
69
70        auto& tracer = cookie.getTracer();
71        const auto val = htons(tracer.getEncodedMicros());
72        auto* ptr = header->bytes + sizeof(header->bytes);
73        *ptr = tracing_framing_id;
74        ptr++;
75        memcpy(ptr, &val, sizeof(val));
76        pipe.produced(sizeof(header->bytes) + framing_extras_size);
77        return {wbuf.data(), sizeof(header->bytes) + framing_extras_size};
78    } else {
79        header->response.magic = (uint8_t)PROTOCOL_BINARY_RES;
80        header->response.setKeylen(key_len);
81        header->response.setBodylen(body_len);
82    }
83
84    pipe.produced(sizeof(header->bytes));
85    return {wbuf.data(), sizeof(header->bytes)};
86}
87
88void mcbp_add_header(Cookie& cookie,
89                     uint16_t err,
90                     uint8_t ext_len,
91                     uint16_t key_len,
92                     uint32_t body_len,
93                     uint8_t datatype) {
94    auto& connection = cookie.getConnection();
95    connection.addMsgHdr(true);
96    const auto& header = cookie.getHeader();
97
98    const auto wbuf = mcbp_add_header(cookie,
99                                      *connection.write,
100                                      header.getOpcode(),
101                                      err,
102                                      ext_len,
103                                      key_len,
104                                      body_len,
105                                      datatype,
106                                      header.getOpaque(),
107                                      cookie.getCas());
108
109    if (settings.getVerbose() > 1) {
110        auto* header = reinterpret_cast<const cb::mcbp::Header*>(wbuf.data());
111        try {
112            LOG_DEBUG("<{} Sending: {}",
113                      connection.getId(),
114                      to_string(header->toJSON(), false));
115        } catch (const std::exception&) {
116            // Failed.. do a raw dump instead
117            char buffer[1024];
118            if (bytes_to_output_string(
119                        buffer,
120                        sizeof(buffer),
121                        connection.getId(),
122                        false,
123                        "Writing bin response:",
124                        reinterpret_cast<const char*>(wbuf.data()),
125                        wbuf.size()) != -1) {
126                LOG_DEBUG("{}", buffer);
127            }
128        }
129    }
130
131    ++connection.getBucket().responseCounters[err];
132    connection.addIov(wbuf.data(), wbuf.size());
133}
134
135bool mcbp_response_handler(const void* key, uint16_t keylen,
136                           const void* ext, uint8_t extlen,
137                           const void* body, uint32_t bodylen,
138                           protocol_binary_datatype_t datatype, uint16_t status,
139                           uint64_t cas, const void* void_cookie)
140{
141    auto* ccookie = reinterpret_cast<const Cookie*>(void_cookie);
142    auto* cookie = const_cast<Cookie*>(ccookie);
143
144    Connection* c = &cookie->getConnection();
145    cb::compression::Buffer buffer;
146    cb::const_char_buffer payload(static_cast<const char*>(body), bodylen);
147
148    if ((!c->isSnappyEnabled() && mcbp::datatype::is_snappy(datatype)) ||
149        (mcbp::datatype::is_snappy(datatype) &&
150         mcbp::datatype::is_xattr(datatype))) {
151        // The client is not snappy-aware, and the content contains
152        // snappy encoded data. Or it's xattr compressed. We need to inflate it!
153        if (!cb::compression::inflate(cb::compression::Algorithm::Snappy,
154                                      payload, buffer)) {
155            std::string mykey(reinterpret_cast<const char*>(key), keylen);
156            LOG_WARNING(
157                    "<{} ERROR: Failed to inflate body, "
158                    "Key: {} may have an incorrect datatype, "
159                    "Datatype indicates that document is {}",
160                    c->getId(),
161                    cb::logtags::tagUserData(mykey),
162                    mcbp::datatype::to_string(datatype));
163            return false;
164        }
165        payload = buffer;
166        datatype &= ~(PROTOCOL_BINARY_DATATYPE_SNAPPY);
167    }
168
169    if (mcbp::datatype::is_xattr(datatype)) {
170        // We need to strip off the xattrs
171        payload = cb::xattr::get_body(payload);
172        datatype &= ~(PROTOCOL_BINARY_DATATYPE_XATTR);
173    }
174
175    datatype = c->getEnabledDatatypes(datatype);
176    auto& error_json = cookie->getErrorJson();
177
178    switch (status) {
179    case PROTOCOL_BINARY_RESPONSE_SUCCESS:
180    case PROTOCOL_BINARY_RESPONSE_SUBDOC_SUCCESS_DELETED:
181    case PROTOCOL_BINARY_RESPONSE_SUBDOC_MULTI_PATH_FAILURE:
182    case PROTOCOL_BINARY_RESPONSE_ROLLBACK:
183        break;
184    case PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET:
185        cookie->sendNotMyVBucket();
186        return true;
187    default:
188        //
189        payload = {error_json.data(), error_json.size()};
190        keylen = 0;
191        extlen = 0;
192        datatype = payload.empty() ? PROTOCOL_BINARY_RAW_BYTES
193                                   : PROTOCOL_BINARY_DATATYPE_JSON;
194    }
195
196    const size_t needed = payload.len + keylen + extlen +
197                          sizeof(protocol_binary_response_header);
198
199    auto& dbuf = cookie->getDynamicBuffer();
200    if (!dbuf.grow(needed)) {
201        LOG_WARNING("<{} ERROR: Failed to allocate memory for response",
202                    c->getId());
203        return false;
204    }
205
206    auto* buf = reinterpret_cast<uint8_t*>(dbuf.getCurrent());
207    const auto& header = cookie->getHeader();
208
209    cb::mcbp::ResponseBuilder builder({buf, needed});
210    builder.setMagic(cb::mcbp::Magic::ClientResponse);
211    builder.setOpcode(header.getRequest().getClientOpcode());
212    builder.setDatatype(cb::mcbp::Datatype(datatype));
213    builder.setStatus(cb::mcbp::Status(status));
214    builder.setExtras({static_cast<const uint8_t*>(ext), extlen});
215    builder.setKey({static_cast<const uint8_t*>(key), keylen});
216    builder.setValue(
217            {reinterpret_cast<const uint8_t*>(payload.data()), payload.size()});
218    builder.setOpaque(header.getOpaque());
219    builder.setCas(cas);
220    builder.validate();
221
222    ++c->getBucket().responseCounters[status];
223    dbuf.moveOffset(needed);
224    return true;
225}
226
227void mcbp_collect_timings(Cookie& cookie) {
228    auto* c = &cookie.getConnection();
229    if (c->isDCP()) {
230        // The state machinery works differently for the DCP connections
231        // so these timings isn't accurate!
232        //
233        // For now disable the timings, and add them back once they're
234        // correct
235        return;
236    }
237    const auto opcode = cookie.getHeader().getOpcode();
238    const auto endTime = ProcessClock::now();
239    const auto elapsed = endTime - cookie.getStart();
240    cookie.getTracer().end(cb::tracing::TraceCode::REQUEST, endTime);
241
242    // aggregated timing for all buckets
243    all_buckets[0].timings.collect(opcode, elapsed);
244
245    // timing for current bucket
246    const auto bucketid = c->getBucketIndex();
247    /* bucketid will be zero initially before you run sasl auth
248     * (unless there is a default bucket), or if someone tries
249     * to delete the bucket you're associated with and your're idle.
250     */
251    if (bucketid != 0) {
252        all_buckets[bucketid].timings.collect(opcode, elapsed);
253    }
254
255    // Log operations taking longer than the "slow" threshold for the opcode.
256    cookie.maybeLogSlowCommand(elapsed);
257}
258