xref: /5.5.2/kv_engine/daemon/cookie.cc (revision 1aae4b16)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2017 Couchbase, Inc.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "cookie.h"
19#include "buckets.h"
20#include "connection.h"
21#include "mcbp.h"
22
23#include <mcbp/mcbp.h>
24#include <mcbp/protocol/framebuilder.h>
25#include <phosphor/phosphor.h>
26#include <platform/checked_snprintf.h>
27#include <platform/string.h>
28#include <platform/timeutils.h>
29#include <utilities/logtags.h>
30#include <chrono>
31
32unique_cJSON_ptr Cookie::toJSON() const {
33    unique_cJSON_ptr ret(cJSON_CreateObject());
34
35    if (packet.empty()) {
36        cJSON_AddItemToObject(ret.get(), "packet", cJSON_CreateObject());
37    } else {
38        const auto& header = getHeader();
39        cJSON_AddItemToObject(ret.get(), "packet", header.toJSON().release());
40    }
41
42    if (!event_id.empty()) {
43        cJSON_AddStringToObject(ret.get(), "event_id", event_id.c_str());
44    }
45
46    if (!error_context.empty()) {
47        cJSON_AddStringToObject(
48                ret.get(), "error_context", error_context.c_str());
49    }
50
51    if (cas != 0) {
52        std::string str = std::to_string(cas);
53        cJSON_AddStringToObject(ret.get(), "cas", str.c_str());
54    }
55
56    cJSON_AddStringToObject(
57            ret.get(), "connection", connection.getDescription());
58
59    return ret;
60}
61
62const std::string& Cookie::getErrorJson() {
63    json_message.clear();
64    if (error_context.empty() && event_id.empty()) {
65        return json_message;
66    }
67
68    unique_cJSON_ptr root(cJSON_CreateObject());
69    unique_cJSON_ptr error(cJSON_CreateObject());
70    if (!error_context.empty()) {
71        cJSON_AddStringToObject(error.get(), "context", error_context.c_str());
72    }
73    if (!event_id.empty()) {
74        cJSON_AddStringToObject(error.get(), "ref", event_id.c_str());
75    }
76    cJSON_AddItemToObject(root.get(), "error", error.release());
77    json_message = to_string(root, false);
78    return json_message;
79}
80
81void Cookie::setPacket(PacketContent content,
82                       cb::const_byte_buffer buffer,
83                       bool copy) {
84    if (buffer.size() < sizeof(cb::mcbp::Request)) {
85        // we don't have the header, so we can't even look at the body
86        // length
87        throw std::invalid_argument(
88                "Cookie::setPacket(): packet must contain header");
89    }
90
91    switch (content) {
92    case PacketContent::Header:
93        if (buffer.size() != sizeof(cb::mcbp::Request)) {
94            throw std::invalid_argument(
95                    "Cookie::setPacket(): Incorrect packet size");
96        }
97
98        if (copy) {
99            throw std::logic_error(
100                    "Cookie::setPacket(): copy should only be set for full "
101                    "content");
102        }
103        packet = buffer;
104        return;
105    case PacketContent::Full:
106        const auto* req =
107                reinterpret_cast<const cb::mcbp::Request*>(buffer.data());
108        const size_t packetsize = sizeof(cb::mcbp::Request) + req->getBodylen();
109
110        if (buffer.size() != packetsize) {
111            throw std::logic_error("Cookie::setPacket(): Body not available");
112        }
113
114        if (copy) {
115            received_packet.reset(new uint8_t[buffer.size()]);
116            std::copy(buffer.begin(), buffer.end(), received_packet.get());
117            packet = {received_packet.get(), buffer.size()};
118            return;
119        }
120
121        packet = buffer;
122        return;
123    }
124    throw std::logic_error("Cookie::setPacket(): Invalid content provided");
125}
126
127cb::const_byte_buffer Cookie::getPacket(PacketContent content) const {
128    if (packet.empty()) {
129        throw std::logic_error("Cookie::getPacket(): packet not available");
130    }
131
132    switch (content) {
133    case PacketContent::Header:
134        return cb::const_byte_buffer{packet.data(), sizeof(cb::mcbp::Request)};
135    case PacketContent::Full:
136        const auto* req =
137                reinterpret_cast<const cb::mcbp::Request*>(packet.data());
138        const size_t packetsize = sizeof(cb::mcbp::Request) + req->getBodylen();
139
140        if (packet.size() != packetsize) {
141            throw std::logic_error("Cookie::getPacket(): Body not available");
142        }
143
144        return packet;
145    }
146
147    throw std::invalid_argument(
148            "Cookie::getPacket(): Invalid content requested");
149}
150
151const cb::mcbp::Header& Cookie::getHeader() const {
152    const auto packet = getPacket(PacketContent::Header);
153    return *reinterpret_cast<const cb::mcbp::Header*>(packet.data());
154}
155
156const cb::mcbp::Request& Cookie::getRequest(PacketContent content) const {
157    cb::const_byte_buffer packet = getPacket(content);
158    const auto* ret = reinterpret_cast<const cb::mcbp::Request*>(packet.data());
159    switch (ret->getMagic()) {
160    case cb::mcbp::Magic::ClientRequest:
161    case cb::mcbp::Magic::ServerRequest:
162        return *ret;
163    case cb::mcbp::Magic::AltClientResponse:
164    case cb::mcbp::Magic::ClientResponse:
165    case cb::mcbp::Magic::ServerResponse:
166        throw std::logic_error("Cookie::getRequest(): Packet is response");
167    }
168
169    throw std::invalid_argument("Cookie::getRequest(): Invalid packet type");
170}
171
172const cb::mcbp::Response& Cookie::getResponse(PacketContent content) const {
173    cb::const_byte_buffer packet = getPacket(content);
174    const auto* ret =
175            reinterpret_cast<const cb::mcbp::Response*>(packet.data());
176    switch (ret->getMagic()) {
177    case cb::mcbp::Magic::ClientRequest:
178    case cb::mcbp::Magic::ServerRequest:
179        throw std::logic_error("Cookie::getRequest(): Packet is resquest");
180    case cb::mcbp::Magic::AltClientResponse:
181    case cb::mcbp::Magic::ClientResponse:
182    case cb::mcbp::Magic::ServerResponse:
183        return *ret;
184    }
185
186    throw std::invalid_argument("Cookie::getResponse(): Invalid packet type");
187}
188
189ENGINE_ERROR_CODE Cookie::swapAiostat(ENGINE_ERROR_CODE value) {
190    auto ret = getAiostat();
191    setAiostat(value);
192    return ret;
193}
194
195ENGINE_ERROR_CODE Cookie::getAiostat() const {
196    return connection.getAiostat();
197}
198
199void Cookie::setAiostat(ENGINE_ERROR_CODE aiostat) {
200    connection.setAiostat(aiostat);
201}
202
203bool Cookie::isEwouldblock() const {
204    return connection.isEwouldblock();
205}
206
207void Cookie::setEwouldblock(bool ewouldblock) {
208    if (ewouldblock && !connection.isDCP()) {
209        setAiostat(ENGINE_EWOULDBLOCK);
210    }
211
212    connection.setEwouldblock(ewouldblock);
213}
214
215void Cookie::sendDynamicBuffer() {
216    if (dynamicBuffer.getRoot() == nullptr) {
217        throw std::logic_error(
218                "Cookie::sendDynamicBuffer(): Dynamic buffer not created");
219    } else {
220        connection.addIov(dynamicBuffer.getRoot(), dynamicBuffer.getOffset());
221        connection.setState(McbpStateMachine::State::send_data);
222        connection.setWriteAndGo(McbpStateMachine::State::new_cmd);
223        connection.pushTempAlloc(dynamicBuffer.getRoot());
224        dynamicBuffer.takeOwnership();
225    }
226}
227
228void Cookie::sendNotMyVBucket() {
229    auto pair = connection.getBucket().clusterConfiguration.getConfiguration();
230    if (pair.first == -1 || (pair.first == connection.getClustermapRevno() &&
231                             settings.isDedupeNmvbMaps())) {
232        // We don't have a vbucket map, or we've already sent it to the
233        // client
234        mcbp_add_header(*this,
235                        PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET,
236                        0,
237                        0,
238                        0,
239                        PROTOCOL_BINARY_RAW_BYTES);
240        connection.setState(McbpStateMachine::State::send_data);
241        connection.setWriteAndGo(McbpStateMachine::State::new_cmd);
242        return;
243    }
244
245    const size_t needed = sizeof(cb::mcbp::Response) + pair.second->size();
246    if (!growDynamicBuffer(needed)) {
247        throw std::bad_alloc();
248    }
249    auto& buffer = getDynamicBuffer();
250    auto* buf = reinterpret_cast<uint8_t*>(buffer.getCurrent());
251    const auto& header = getHeader();
252    cb::mcbp::ResponseBuilder builder({buf, needed});
253    builder.setMagic(cb::mcbp::Magic::ClientResponse);
254    builder.setOpcode(header.getRequest().getClientOpcode());
255    builder.setStatus(cb::mcbp::Status::NotMyVbucket);
256    builder.setOpaque(header.getOpaque());
257    builder.setValue({reinterpret_cast<const uint8_t*>(pair.second->data()),
258                      pair.second->size()});
259    builder.validate();
260
261    buffer.moveOffset(needed);
262    sendDynamicBuffer();
263    connection.setClustermapRevno(pair.first);
264}
265
266void Cookie::sendResponse(cb::mcbp::Status status) {
267    if (status == cb::mcbp::Status::Success) {
268        const auto& request = getHeader().getRequest();
269        const auto quiet = request.isQuiet();
270        if (quiet) {
271            // The responseCounter is updated here as this is non-responding
272            // code hence mcbp_add_header will not be called (which is what
273            // normally updates the responseCounters).
274            auto& bucket = connection.getBucket();
275            ++bucket.responseCounters[PROTOCOL_BINARY_RESPONSE_SUCCESS];
276            connection.setState(McbpStateMachine::State::new_cmd);
277            return;
278        }
279
280        mcbp_add_header(
281                *this, uint16_t(status), 0, 0, 0, PROTOCOL_BINARY_RAW_BYTES);
282        connection.setState(McbpStateMachine::State::send_data);
283        connection.setWriteAndGo(McbpStateMachine::State::new_cmd);
284        return;
285    }
286
287    if (status == cb::mcbp::Status::NotMyVbucket) {
288        sendNotMyVBucket();
289        return;
290    }
291
292    // fall back sending the error message (and include the JSON payload etc)
293    sendResponse(status, {}, {}, {}, cb::mcbp::Datatype::Raw, cas);
294}
295
296void Cookie::sendResponse(cb::engine_errc code) {
297    sendResponse(cb::mcbp::to_status(code));
298}
299
300void Cookie::sendResponse(cb::mcbp::Status status,
301                          cb::const_char_buffer extras,
302                          cb::const_char_buffer key,
303                          cb::const_char_buffer value,
304                          cb::mcbp::Datatype datatype,
305                          uint64_t cas) {
306    if (!connection.write->empty()) {
307        // We can't continue as we might already have references
308        // in the IOvector stack pointing into the existing buffer!
309        throw std::logic_error(
310                "Cookie::sendResponse: No data should have been inserted "
311                "in the write buffer!");
312    }
313
314    if (datatype != cb::mcbp::Datatype::Raw &&
315        datatype != cb::mcbp::Datatype::JSON) {
316        throw std::runtime_error("Cookie::sendResponse: Unsupported datatype");
317    }
318
319    if (status == cb::mcbp::Status::NotMyVbucket) {
320        sendNotMyVBucket();
321        return;
322    }
323
324    const auto& error_json = getErrorJson();
325
326    if (cb::mcbp::isStatusSuccess(status)) {
327        setCas(cas);
328    } else {
329        // This is an error message.. Inject the error JSON!
330        extras = {};
331        key = {};
332        value = {error_json.data(), error_json.size()};
333        datatype = value.empty() ? cb::mcbp::Datatype::Raw
334                                 : cb::mcbp::Datatype::JSON;
335    }
336
337    size_t needed = sizeof(cb::mcbp::Header) + value.size() + key.size() +
338                    extras.size();
339    if (isTracingEnabled()) {
340        needed += MCBP_TRACING_RESPONSE_SIZE;
341    }
342    connection.write->ensureCapacity(needed);
343
344    mcbp_add_header(*this,
345                    uint16_t(status),
346                    uint8_t(extras.size()),
347                    uint16_t(key.size()),
348                    uint32_t(value.size() + key.size() + extras.size()),
349                    connection.getEnabledDatatypes(
350                            protocol_binary_datatype_t(datatype)));
351
352
353    if (!extras.empty()) {
354        auto wdata = connection.write->wdata();
355        std::copy(extras.begin(), extras.end(), wdata.begin());
356        connection.write->produced(extras.size());
357        connection.addIov(wdata.data(), extras.size());
358    }
359
360    if (!key.empty()) {
361        auto wdata = connection.write->wdata();
362        std::copy(key.begin(), key.end(), wdata.begin());
363        connection.write->produced(key.size());
364        connection.addIov(wdata.data(), key.size());
365    }
366
367    if (!value.empty()) {
368        auto wdata = connection.write->wdata();
369        std::copy(value.begin(), value.end(), wdata.begin());
370        connection.write->produced(value.size());
371        connection.addIov(wdata.data(), value.size());
372    }
373
374    connection.setState(McbpStateMachine::State::send_data);
375    connection.setWriteAndGo(McbpStateMachine::State::new_cmd);
376}
377
378const DocKey Cookie::getRequestKey() const {
379    auto key = getRequest().getKey();
380    return DocKey{key.data(), key.size(), connection.getDocNamespace()};
381}
382
383std::string Cookie::getPrintableRequestKey() const {
384    const auto key = getRequest().getKey();
385
386    std::string buffer{reinterpret_cast<const char*>(key.data()), key.size()};
387    for (auto& ii : buffer) {
388        if (!std::isgraph(ii)) {
389            ii = '.';
390        }
391    }
392
393    return cb::logtags::tagUserData(buffer);
394}
395
396void Cookie::logCommand() const {
397    if (settings.getVerbose() == 0) {
398        // Info is not enabled.. we don't want to try to format
399        // output
400        return;
401    }
402
403    const auto opcode = getRequest().getClientOpcode();
404    LOG_DEBUG("{}> {} {}",
405              connection.getId(),
406              to_string(opcode),
407              getPrintableRequestKey());
408}
409
410void Cookie::logResponse(const char* reason) const {
411    const auto opcode = getRequest().getClientOpcode();
412    LOG_DEBUG("{}< {} {} - {}",
413              connection.getId(),
414              to_string(opcode),
415              getPrintableRequestKey(),
416              reason);
417}
418
419void Cookie::logResponse(ENGINE_ERROR_CODE code) const {
420    if (settings.getVerbose() == 0) {
421        // Info is not enabled.. we don't want to try to format
422        // output
423        return;
424    }
425
426    if (code == ENGINE_EWOULDBLOCK || code == ENGINE_WANT_MORE) {
427        // These are temporary states
428        return;
429    }
430
431    logResponse(cb::to_string(cb::engine_errc(code)).c_str());
432}
433
434void Cookie::maybeLogSlowCommand(ProcessClock::duration elapsed) const {
435    const auto opcode = getRequest().getClientOpcode();
436    const auto limit = cb::mcbp::sla::getSlowOpThreshold(opcode);
437
438    if (elapsed > limit) {
439        const auto& header = getHeader();
440        std::chrono::nanoseconds timings(elapsed);
441        std::string command;
442        try {
443            command = to_string(opcode);
444        } catch (const std::exception&) {
445            char opcode_s[16];
446            checked_snprintf(
447                    opcode_s, sizeof(opcode_s), "0x%X", header.getOpcode());
448            command.assign(opcode_s);
449        }
450
451        if (opcode == cb::mcbp::ClientOpcode::Stat) {
452            // Log which stat command took a long time
453            const auto key = getPrintableRequestKey();
454
455            if (key.find("key ") == 0) {
456                // stat key username1324423e; truncate the actual item key
457                command.append("key <TRUNCATED>");
458            } else if (!key.empty()) {
459                command.append(key);
460            }
461        }
462
463        auto& c = getConnection();
464
465        TRACE_COMPLETE2("memcached/slow",
466                        "Slow cmd",
467                        getStart(),
468                        getStart() + elapsed,
469                        "opcode",
470                        getHeader().getOpcode(),
471                        "connection_id",
472                        c.getId());
473
474        const std::string traceData = to_string(tracer);
475        LOG_WARNING(
476                R"({}: Slow operation. {{"cid":"{}/{:x}","duration":"{}","trace":"{}","command":"{}","peer":"{}"}})",
477                c.getId(),
478                c.getConnectionId().data(),
479                ntohl(getHeader().getOpaque()),
480                cb::time2text(timings),
481                traceData,
482                command,
483                c.getPeername());
484    }
485}
486
487void Cookie::initialize(cb::const_byte_buffer header, bool tracing_enabled) {
488    reset();
489    enableTracing = tracing_enabled;
490    setPacket(Cookie::PacketContent::Header, header);
491    setCas(0);
492    start = ProcessClock::now();
493    tracer.begin(cb::tracing::TraceCode::REQUEST, start);
494}
495