xref: /6.6.0/kv_engine/daemon/cookie.cc (revision ee5212ee)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2017 Couchbase, Inc.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "cookie.h"
19
20#include "buckets.h"
21#include "connection.h"
22#include "cookie_trace_context.h"
23#include "mcbp.h"
24#include "mcbp_executors.h"
25#include "settings.h"
26
27#include <logger/logger.h>
28#include <mcbp/mcbp.h>
29#include <mcbp/protocol/framebuilder.h>
30#include <nlohmann/json.hpp>
31#include <phosphor/phosphor.h>
32#include <platform/checked_snprintf.h>
33#include <platform/string_hex.h>
34#include <platform/timeutils.h>
35#include <platform/uuid.h>
36#include <utilities/logtags.h>
37#include <chrono>
38
39nlohmann::json Cookie::toJSON() const {
40    nlohmann::json ret;
41
42    if (packet.empty()) {
43        ret["packet"] = nlohmann::json();
44    } else {
45        const auto& header = getHeader();
46        ret["packet"] = header.toJSON(validated);
47    }
48
49    if (!event_id.empty()) {
50        ret["event_id"] = event_id;
51    }
52
53    if (!error_context.empty()) {
54        ret["error_context"] = error_context;
55    }
56
57    if (cas != 0) {
58        ret["cas"] = std::to_string(cas);
59    }
60
61    ret["connection"] = connection.getDescription();
62    ret["ewouldblock"] = ewouldblock;
63    ret["aiostat"] = to_string(cb::engine_errc(aiostat));
64    ret["refcount"] = uint32_t(refcount);
65    ret["engine_storage"] = cb::to_hex(uint64_t(engine_storage));
66    return ret;
67}
68
69const std::string& Cookie::getEventId() const {
70    if (event_id.empty()) {
71        event_id = to_string(cb::uuid::random());
72    }
73
74    return event_id;
75}
76
77void Cookie::setErrorJsonExtras(const nlohmann::json& json) {
78    if (json.find("error") != json.end()) {
79        throw std::invalid_argument(
80                "Cookie::setErrorJsonExtras: cannot use \"error\" as a key, "
81                "json:" +
82                json.dump());
83    }
84
85    error_extra_json = json;
86}
87
88const std::string& Cookie::getErrorJson() {
89    json_message.clear();
90    if (error_context.empty() && event_id.empty() && error_extra_json.empty()) {
91        return json_message;
92    }
93
94    nlohmann::json error;
95    if (!error_context.empty()) {
96        error["context"] = error_context;
97    }
98    if (!event_id.empty()) {
99        error["ref"] = event_id;
100    }
101
102    nlohmann::json root;
103
104    if (!error.empty()) {
105        root["error"] = error;
106    }
107
108    if (!error_extra_json.empty()) {
109        root.update(error_extra_json);
110    }
111    json_message = root.dump();
112    return json_message;
113}
114
115bool Cookie::execute() {
116    // Reset ewouldblock state!
117    setEwouldblock(false);
118    const auto& header = getHeader();
119    if (header.isResponse()) {
120        execute_response_packet(*this, header.getResponse());
121    } else {
122        // We've already verified that the packet is a legal packet
123        // so it must be a request
124        execute_request_packet(*this, header.getRequest());
125    }
126
127    return !isEwouldblock();
128}
129
130void Cookie::setPacket(PacketContent content,
131                       cb::const_byte_buffer buffer,
132                       bool copy) {
133    if (buffer.size() < sizeof(cb::mcbp::Request)) {
134        // we don't have the header, so we can't even look at the body
135        // length
136        throw std::invalid_argument(
137                "Cookie::setPacket(): packet must contain header");
138    }
139
140    switch (content) {
141    case PacketContent::Header:
142        if (buffer.size() != sizeof(cb::mcbp::Request)) {
143            throw std::invalid_argument(
144                    "Cookie::setPacket(): Incorrect packet size");
145        }
146
147        if (copy) {
148            throw std::logic_error(
149                    "Cookie::setPacket(): copy should only be set for full "
150                    "content");
151        }
152        packet = buffer;
153        return;
154    case PacketContent::Full:
155        const auto* req =
156                reinterpret_cast<const cb::mcbp::Request*>(buffer.data());
157        const size_t packetsize = sizeof(cb::mcbp::Request) + req->getBodylen();
158
159        if (buffer.size() != packetsize) {
160            throw std::logic_error("Cookie::setPacket(): Body not available");
161        }
162
163        if (copy) {
164            received_packet.reset(new uint8_t[buffer.size()]);
165            std::copy(buffer.begin(), buffer.end(), received_packet.get());
166            packet = {received_packet.get(), buffer.size()};
167            return;
168        }
169
170        packet = buffer;
171        return;
172    }
173    throw std::logic_error("Cookie::setPacket(): Invalid content provided");
174}
175
176cb::const_byte_buffer Cookie::getPacket(PacketContent content) const {
177    if (packet.empty()) {
178        throw std::logic_error("Cookie::getPacket(): packet not available");
179    }
180
181    switch (content) {
182    case PacketContent::Header:
183        return cb::const_byte_buffer{packet.data(), sizeof(cb::mcbp::Request)};
184    case PacketContent::Full:
185        const auto* req =
186                reinterpret_cast<const cb::mcbp::Request*>(packet.data());
187        const size_t packetsize = sizeof(cb::mcbp::Request) + req->getBodylen();
188
189        if (packet.size() != packetsize) {
190            throw std::logic_error("Cookie::getPacket(): Body not available");
191        }
192
193        return packet;
194    }
195
196    throw std::invalid_argument(
197            "Cookie::getPacket(): Invalid content requested");
198}
199
200const cb::mcbp::Header& Cookie::getHeader() const {
201    const auto packet = getPacket(PacketContent::Header);
202    return *reinterpret_cast<const cb::mcbp::Header*>(packet.data());
203}
204
205const cb::mcbp::Request& Cookie::getRequest(PacketContent content) const {
206    cb::const_byte_buffer packet = getPacket(content);
207    const auto* ret = reinterpret_cast<const cb::mcbp::Header*>(packet.data());
208    return ret->getRequest();
209}
210
211const cb::mcbp::Response& Cookie::getResponse(PacketContent content) const {
212    const auto* ret = reinterpret_cast<const cb::mcbp::Header*>(packet.data());
213    return ret->getResponse();
214}
215
216ENGINE_ERROR_CODE Cookie::swapAiostat(ENGINE_ERROR_CODE value) {
217    auto ret = getAiostat();
218    setAiostat(value);
219    return ret;
220}
221
222ENGINE_ERROR_CODE Cookie::getAiostat() const {
223    return aiostat;
224}
225
226void Cookie::setAiostat(ENGINE_ERROR_CODE aiostat) {
227    Cookie::aiostat = aiostat;
228}
229
230void Cookie::setEwouldblock(bool ewouldblock) {
231    if (ewouldblock && !connection.isDCP()) {
232        setAiostat(ENGINE_EWOULDBLOCK);
233    }
234
235    Cookie::ewouldblock = ewouldblock;
236}
237
238void Cookie::sendDynamicBuffer() {
239    if (dynamicBuffer.getRoot() == nullptr) {
240        throw std::logic_error(
241                "Cookie::sendDynamicBuffer(): Dynamic buffer not created");
242    } else {
243        connection.addIov(dynamicBuffer.getRoot(), dynamicBuffer.getOffset());
244        connection.setState(StateMachine::State::send_data);
245        connection.setWriteAndGo(StateMachine::State::new_cmd);
246        connection.pushTempAlloc(dynamicBuffer.getRoot());
247        dynamicBuffer.takeOwnership();
248    }
249}
250
251void Cookie::sendNotMyVBucket() {
252    auto pair = connection.getBucket().clusterConfiguration.getConfiguration();
253    if (pair.first == -1 || (pair.first == connection.getClustermapRevno() &&
254                             Settings::instance().isDedupeNmvbMaps())) {
255        // We don't have a vbucket map, or we've already sent it to the
256        // client
257        mcbp_add_header(*this,
258                        cb::mcbp::Status::NotMyVbucket,
259                        0,
260                        0,
261                        0,
262                        PROTOCOL_BINARY_RAW_BYTES);
263        connection.setState(StateMachine::State::send_data);
264        connection.setWriteAndGo(StateMachine::State::new_cmd);
265        return;
266    }
267
268    const size_t needed = sizeof(cb::mcbp::Response) + pair.second->size();
269    if (!growDynamicBuffer(needed)) {
270        throw std::bad_alloc();
271    }
272    auto& buffer = getDynamicBuffer();
273    auto* buf = reinterpret_cast<uint8_t*>(buffer.getCurrent());
274    const auto& header = getHeader();
275    cb::mcbp::ResponseBuilder builder({buf, needed});
276    builder.setMagic(cb::mcbp::Magic::ClientResponse);
277    builder.setOpcode(header.getRequest().getClientOpcode());
278    builder.setStatus(cb::mcbp::Status::NotMyVbucket);
279    builder.setOpaque(header.getOpaque());
280    builder.setValue({reinterpret_cast<const uint8_t*>(pair.second->data()),
281                      pair.second->size()});
282    builder.validate();
283
284    buffer.moveOffset(needed);
285    sendDynamicBuffer();
286    connection.setClustermapRevno(pair.first);
287}
288
289void Cookie::sendResponse(cb::mcbp::Status status) {
290    if (status == cb::mcbp::Status::Success) {
291        const auto& request = getHeader().getRequest();
292        const auto quiet = request.isQuiet();
293        if (quiet) {
294            // The responseCounter is updated here as this is non-responding
295            // code hence mcbp_add_header will not be called (which is what
296            // normally updates the responseCounters).
297            auto& bucket = connection.getBucket();
298            ++bucket.responseCounters[int(cb::mcbp::Status::Success)];
299            connection.setState(StateMachine::State::new_cmd);
300            return;
301        }
302
303        mcbp_add_header(*this, status, 0, 0, 0, PROTOCOL_BINARY_RAW_BYTES);
304        connection.setState(StateMachine::State::send_data);
305        connection.setWriteAndGo(StateMachine::State::new_cmd);
306        return;
307    }
308
309    if (status == cb::mcbp::Status::NotMyVbucket) {
310        sendNotMyVBucket();
311        return;
312    }
313
314    // fall back sending the error message (and include the JSON payload etc)
315    sendResponse(status, {}, {}, {}, cb::mcbp::Datatype::Raw, cas);
316}
317
318void Cookie::sendResponse(cb::engine_errc code) {
319    sendResponse(cb::mcbp::to_status(code));
320}
321
322void Cookie::sendResponse(cb::mcbp::Status status,
323                          cb::const_char_buffer extras,
324                          cb::const_char_buffer key,
325                          cb::const_char_buffer value,
326                          cb::mcbp::Datatype datatype,
327                          uint64_t cas) {
328    if (!connection.write->empty()) {
329        // We can't continue as we might already have references
330        // in the IOvector stack pointing into the existing buffer!
331        throw std::logic_error(
332                "Cookie::sendResponse: No data should have been inserted "
333                "in the write buffer!");
334    }
335
336    if (status == cb::mcbp::Status::NotMyVbucket) {
337        sendNotMyVBucket();
338        return;
339    }
340
341    const auto& error_json = getErrorJson();
342
343    if (cb::mcbp::isStatusSuccess(status)) {
344        setCas(cas);
345    } else {
346        // This is an error message.. Inject the error JSON!
347        extras = {};
348        key = {};
349        value = {error_json.data(), error_json.size()};
350        datatype = value.empty() ? cb::mcbp::Datatype::Raw
351                                 : cb::mcbp::Datatype::JSON;
352    }
353
354    size_t needed = sizeof(cb::mcbp::Header) + value.size() + key.size() +
355                    extras.size();
356    if (isTracingEnabled()) {
357        needed += MCBP_TRACING_RESPONSE_SIZE;
358    }
359    connection.write->ensureCapacity(needed);
360
361    mcbp_add_header(*this,
362                    status,
363                    uint8_t(extras.size()),
364                    uint16_t(key.size()),
365                    uint32_t(value.size() + key.size() + extras.size()),
366                    connection.getEnabledDatatypes(
367                            protocol_binary_datatype_t(datatype)));
368
369    if (!extras.empty()) {
370        auto wdata = connection.write->wdata();
371        std::copy(extras.begin(), extras.end(), wdata.begin());
372        connection.write->produced(extras.size());
373        connection.addIov(wdata.data(), extras.size());
374    }
375
376    if (!key.empty()) {
377        auto wdata = connection.write->wdata();
378        std::copy(key.begin(), key.end(), wdata.begin());
379        connection.write->produced(key.size());
380        connection.addIov(wdata.data(), key.size());
381    }
382
383    if (!value.empty()) {
384        auto wdata = connection.write->wdata();
385        std::copy(value.begin(), value.end(), wdata.begin());
386        connection.write->produced(value.size());
387        connection.addIov(wdata.data(), value.size());
388    }
389
390    connection.setState(StateMachine::State::send_data);
391    connection.setWriteAndGo(StateMachine::State::new_cmd);
392}
393
394const DocKey Cookie::getRequestKey() const {
395    return connection.makeDocKey(getRequest().getKey());
396}
397
398std::string Cookie::getPrintableRequestKey() const {
399    const auto key = getRequest().getKey();
400
401    std::string buffer{reinterpret_cast<const char*>(key.data()), key.size()};
402    for (auto& ii : buffer) {
403        if (!std::isgraph(ii)) {
404            ii = '.';
405        }
406    }
407
408    return cb::tagUserData(buffer);
409}
410
411void Cookie::logCommand() const {
412    if (Settings::instance().getVerbose() == 0) {
413        // Info is not enabled.. we don't want to try to format
414        // output
415        return;
416    }
417
418    const auto opcode = getRequest().getClientOpcode();
419    LOG_DEBUG("{}> {} {}",
420              connection.getId(),
421              to_string(opcode),
422              getPrintableRequestKey());
423}
424
425void Cookie::logResponse(const char* reason) const {
426    const auto opcode = getRequest().getClientOpcode();
427    LOG_DEBUG("{}< {} {} - {}",
428              connection.getId(),
429              to_string(opcode),
430              getPrintableRequestKey(),
431              reason);
432}
433
434void Cookie::logResponse(ENGINE_ERROR_CODE code) const {
435    if (Settings::instance().getVerbose() == 0) {
436        // Info is not enabled.. we don't want to try to format
437        // output
438        return;
439    }
440
441    if (code == ENGINE_EWOULDBLOCK) {
442        // This is a temporary state
443        return;
444    }
445
446    logResponse(cb::to_string(cb::engine_errc(code)).c_str());
447}
448
449void Cookie::setCommandContext(CommandContext* ctx) {
450    commandContext.reset(ctx);
451}
452
453void Cookie::maybeLogSlowCommand(
454        std::chrono::steady_clock::duration elapsed) const {
455    const auto opcode = getRequest().getClientOpcode();
456    const auto limit = cb::mcbp::sla::getSlowOpThreshold(opcode);
457
458    if (elapsed > limit) {
459        const auto& header = getHeader();
460        std::chrono::nanoseconds timings(elapsed);
461        std::string command;
462        try {
463            command = to_string(opcode);
464        } catch (const std::exception&) {
465            char opcode_s[16];
466            checked_snprintf(
467                    opcode_s, sizeof(opcode_s), "0x%X", header.getOpcode());
468            command.assign(opcode_s);
469        }
470
471        auto& c = getConnection();
472
473        TRACE_COMPLETE2("memcached/slow",
474                        "Slow cmd",
475                        getStart(),
476                        getStart() + elapsed,
477                        "opcode",
478                        getHeader().getOpcode(),
479                        "connection_id",
480                        c.getId());
481
482        LOG_WARNING(
483                R"({}: Slow operation. {{"cid":"{}/{:x}","duration":"{}","trace":"{}","command":"{}","peer":"{}","bucket":"{}","packet":{}}})",
484                c.getId(),
485                c.getConnectionId().data(),
486                ntohl(getHeader().getOpaque()),
487                cb::time2text(timings),
488                tracer.to_string(),
489                command,
490                c.getPeername(),
491                c.getBucket().name,
492                getHeader().toJSON(validated));
493    }
494}
495
496Cookie::Cookie(Connection& conn) : connection(conn) {
497}
498
499void Cookie::initialize(cb::const_byte_buffer header, bool tracing_enabled) {
500    reset();
501    setTracingEnabled(tracing_enabled ||
502                      Settings::instance().alwaysCollectTraceInfo());
503    setPacket(Cookie::PacketContent::Header, header);
504    start = std::chrono::steady_clock::now();
505    tracer.begin(cb::tracing::Code::Request, start);
506}
507
508void Cookie::reset() {
509    event_id.clear();
510    error_context.clear();
511    json_message.clear();
512    packet = {};
513    validated = false;
514    cas = 0;
515    commandContext.reset();
516    dynamicBuffer.clear();
517    tracer.clear();
518    ewouldblock = false;
519    openTracingContext.clear();
520    authorized = false;
521}
522
523void Cookie::setOpenTracingContext(cb::const_byte_buffer context) {
524    try {
525        openTracingContext.assign(reinterpret_cast<const char*>(context.data()),
526                                  context.size());
527    } catch (const std::bad_alloc&) {
528        // Drop tracing if we run out of memory
529    }
530}
531
532CookieTraceContext Cookie::extractTraceContext() {
533    if (openTracingContext.empty()) {
534        throw std::logic_error(
535                "Cookie::extractTraceContext should only be called if we have "
536                "a context");
537    }
538
539    auto& header = getHeader();
540    return CookieTraceContext{cb::mcbp::Magic(header.getMagic()),
541                              header.getOpcode(),
542                              header.getOpaque(),
543                              header.getKey(),
544                              std::move(openTracingContext),
545                              tracer.extractDurations()};
546}
547