xref: /6.6.0/kv_engine/daemon/mcbp.cc (revision ee5212ee)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2016 Couchbase, Inc.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17#include "mcbp.h"
18
19#include "buckets.h"
20#include "cookie.h"
21#include "cookie_trace_context.h"
22#include "memcached.h"
23#include "opentracing.h"
24#include "settings.h"
25#include "utilities/logtags.h"
26#include "xattr/utils.h"
27#include <logger/logger.h>
28#include <mcbp/protocol/framebuilder.h>
29#include <mcbp/protocol/header.h>
30#include <memcached/protocol_binary.h>
31#include <nlohmann/json.hpp>
32#include <platform/compress.h>
33#include <platform/string_hex.h>
34
35static cb::const_byte_buffer mcbp_add_header(Cookie& cookie,
36                                             cb::Pipe& pipe,
37                                             uint8_t opcode,
38                                             cb::mcbp::Status status,
39                                             uint8_t ext_len,
40                                             uint16_t key_len,
41                                             uint32_t body_len,
42                                             uint8_t datatype,
43                                             uint32_t opaque,
44                                             uint64_t cas) {
45    auto wbuf = pipe.wdata();
46    auto* header = (protocol_binary_response_header*)wbuf.data();
47
48    header->response.setOpcode(cb::mcbp::ClientOpcode(opcode));
49    header->response.setExtlen(ext_len);
50    header->response.setDatatype(cb::mcbp::Datatype(datatype));
51    header->response.setStatus(status);
52    header->response.setOpaque(opaque);
53    header->response.setCas(cas);
54
55    if (cookie.isTracingEnabled()) {
56        // When tracing is enabled we'll be using the alternative
57        // response header where we inject the framing header.
58        // For now we'll just hard-code the adding of the bytes
59        // for the tracing info.
60        //
61        // Moving forward we should get a builder for encoding the
62        // framing header (but do that the next time we need to add
63        // something so that we have a better understanding on how
64        // we need to do that (it could be that we need to modify
65        // an already existing section etc).
66        header->response.setMagic(cb::mcbp::Magic::AltClientResponse);
67        // The framing extras when we just include the tracing information
68        // is 3 bytes. 1 byte with id and length, then the 2 bytes
69        // containing the actual data.
70        const uint8_t framing_extras_size = MCBP_TRACING_RESPONSE_SIZE;
71        const uint8_t tracing_framing_id = 0x02;
72
73        header->bytes[2] = framing_extras_size; // framing header extras 3 bytes
74        header->bytes[3] = uint8_t(key_len);
75        header->response.setBodylen(body_len + framing_extras_size);
76
77        auto& tracer = cookie.getTracer();
78        const auto val = htons(tracer.getEncodedMicros());
79        auto* ptr = header->bytes + sizeof(header->bytes);
80        *ptr = tracing_framing_id;
81        ptr++;
82        memcpy(ptr, &val, sizeof(val));
83        pipe.produced(sizeof(header->bytes) + framing_extras_size);
84        return {wbuf.data(), sizeof(header->bytes) + framing_extras_size};
85    } else {
86        header->response.setMagic(cb::mcbp::Magic::ClientResponse);
87        header->response.setKeylen(key_len);
88        header->response.setFramingExtraslen(0);
89        header->response.setBodylen(body_len);
90    }
91
92    pipe.produced(sizeof(header->bytes));
93    return {wbuf.data(), sizeof(header->bytes)};
94}
95
96void mcbp_add_header(Cookie& cookie,
97                     cb::mcbp::Status status,
98                     uint8_t ext_len,
99                     uint16_t key_len,
100                     uint32_t body_len,
101                     uint8_t datatype) {
102    auto& connection = cookie.getConnection();
103    connection.addMsgHdr(true);
104    const auto& header = cookie.getHeader();
105
106    const auto wbuf = mcbp_add_header(cookie,
107                                      *connection.write,
108                                      header.getOpcode(),
109                                      status,
110                                      ext_len,
111                                      key_len,
112                                      body_len,
113                                      datatype,
114                                      header.getOpaque(),
115                                      cookie.getCas());
116
117    if (Settings::instance().getVerbose() > 1) {
118        auto* header = reinterpret_cast<const cb::mcbp::Header*>(wbuf.data());
119        try {
120            LOG_TRACE("<{} Sending: {}",
121                      connection.getId(),
122                      header->toJSON(true).dump());
123        } catch (const std::exception&) {
124            // Failed.. do a raw dump instead
125            LOG_TRACE("<{} Sending: {}",
126                      connection.getId(),
127                      cb::to_hex({wbuf.data(), sizeof(cb::mcbp::Header)}));
128        }
129    }
130
131    ++connection.getBucket().responseCounters[uint16_t(status)];
132    connection.addIov(wbuf.data(), wbuf.size());
133}
134
135static bool mcbp_response_handler(const void* key,
136                                  uint16_t keylen,
137                                  const void* ext,
138                                  uint8_t extlen,
139                                  const void* body,
140                                  uint32_t bodylen,
141                                  protocol_binary_datatype_t datatype,
142                                  cb::mcbp::Status status,
143                                  uint64_t cas,
144                                  const void* void_cookie) {
145    auto* ccookie = reinterpret_cast<const Cookie*>(void_cookie);
146    auto* cookie = const_cast<Cookie*>(ccookie);
147
148    Connection* c = &cookie->getConnection();
149    cb::compression::Buffer buffer;
150    cb::const_char_buffer payload(static_cast<const char*>(body), bodylen);
151
152    if ((!c->isSnappyEnabled() && mcbp::datatype::is_snappy(datatype)) ||
153        (mcbp::datatype::is_snappy(datatype) &&
154         mcbp::datatype::is_xattr(datatype))) {
155        // The client is not snappy-aware, and the content contains
156        // snappy encoded data. Or it's xattr compressed. We need to inflate it!
157        if (!cb::compression::inflate(cb::compression::Algorithm::Snappy,
158                                      payload, buffer)) {
159            std::string mykey(reinterpret_cast<const char*>(key), keylen);
160            LOG_WARNING(
161                    "<{} ERROR: Failed to inflate body, "
162                    "Key: {} may have an incorrect datatype, "
163                    "Datatype indicates that document is {}",
164                    c->getId(),
165                    cb::UserDataView(mykey),
166                    mcbp::datatype::to_string(datatype));
167            return false;
168        }
169        payload = buffer;
170        datatype &= ~(PROTOCOL_BINARY_DATATYPE_SNAPPY);
171    }
172
173    if (mcbp::datatype::is_xattr(datatype)) {
174        // We need to strip off the xattrs
175        payload = cb::xattr::get_body(payload);
176        datatype &= ~(PROTOCOL_BINARY_DATATYPE_XATTR);
177    }
178
179    datatype = c->getEnabledDatatypes(datatype);
180    auto& error_json = cookie->getErrorJson();
181
182    switch (status) {
183    case cb::mcbp::Status::Success:
184    case cb::mcbp::Status::SubdocSuccessDeleted:
185    case cb::mcbp::Status::SubdocMultiPathFailure:
186    case cb::mcbp::Status::Rollback:
187        break;
188    case cb::mcbp::Status::NotMyVbucket:
189        cookie->sendNotMyVBucket();
190        return true;
191    default:
192        //
193        payload = {error_json.data(), error_json.size()};
194        keylen = 0;
195        extlen = 0;
196        datatype = payload.empty() ? PROTOCOL_BINARY_RAW_BYTES
197                                   : PROTOCOL_BINARY_DATATYPE_JSON;
198    }
199
200    const size_t needed = payload.len + keylen + extlen +
201                          sizeof(protocol_binary_response_header);
202
203    auto& dbuf = cookie->getDynamicBuffer();
204    if (!dbuf.grow(needed)) {
205        LOG_WARNING("<{} ERROR: Failed to allocate memory for response",
206                    c->getId());
207        return false;
208    }
209
210    auto* buf = reinterpret_cast<uint8_t*>(dbuf.getCurrent());
211    const auto& header = cookie->getHeader();
212
213    cb::mcbp::ResponseBuilder builder({buf, needed});
214    builder.setMagic(cb::mcbp::Magic::ClientResponse);
215    builder.setOpcode(header.getRequest().getClientOpcode());
216    builder.setDatatype(cb::mcbp::Datatype(datatype));
217    builder.setStatus(status);
218    builder.setExtras({static_cast<const uint8_t*>(ext), extlen});
219    builder.setKey({static_cast<const uint8_t*>(key), keylen});
220    builder.setValue(
221            {reinterpret_cast<const uint8_t*>(payload.data()), payload.size()});
222    builder.setOpaque(header.getOpaque());
223    builder.setCas(cas);
224    builder.validate();
225
226    ++c->getBucket().responseCounters[uint16_t(status)];
227    dbuf.moveOffset(needed);
228    return true;
229}
230
231// Expose a static std::function to wrap mcbp_response_handler, instead of
232// creating a temporary object every time we need to call into an engine.
233// This also avoids problems where a stack-allocated AddResponseFn could go
234// out of scope if someone (e.g. ep-engine's FetchAllKeysTask) needs to take
235// a copy of it and run it on another thread.
236//
237AddResponseFn mcbpResponseHandlerFn = mcbp_response_handler;
238
239void mcbp_collect_timings(Cookie& cookie) {
240    // The state machinery cause this method to be called for all kinds
241    // of packets, but the header musts be a client request for the timings
242    // to make sense (and not when we handled a ServerResponse message etc ;)
243    const auto& header = cookie.getHeader();
244    if (!header.isRequest()) {
245        return;
246    }
247
248    auto* c = &cookie.getConnection();
249    if (c->isDCP()) {
250        // The state machinery works differently for the DCP connections
251        // so these timings isn't accurate!
252        //
253        // For now disable the timings, and add them back once they're
254        // correct
255        return;
256    }
257    const auto opcode = header.getRequest().getClientOpcode();
258    const auto endTime = std::chrono::steady_clock::now();
259    const auto elapsed = endTime - cookie.getStart();
260    cookie.getTracer().end(cb::tracing::SpanId{0}, endTime);
261
262    // aggregated timing for all buckets
263    all_buckets[0].timings.collect(opcode, elapsed);
264
265    // timing for current bucket
266    const auto bucketid = c->getBucketIndex();
267    /* bucketid will be zero initially before you run sasl auth
268     * (unless there is a default bucket), or if someone tries
269     * to delete the bucket you're associated with and your're idle.
270     */
271    if (bucketid != 0) {
272        all_buckets[bucketid].timings.collect(opcode, elapsed);
273    }
274
275    // Log operations taking longer than the "slow" threshold for the opcode.
276    cookie.maybeLogSlowCommand(elapsed);
277
278    if (cookie.isOpenTracingEnabled()) {
279        OpenTracing::pushTraceLog(cookie.extractTraceContext());
280    }
281}
282