1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2015 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 #include "connection.h"
18 
19 #include "buckets.h"
20 #include "connections.h"
21 #include "cookie.h"
22 #include "external_auth_manager_thread.h"
23 #include "front_end_thread.h"
24 #include "listening_port.h"
25 #include "mc_time.h"
26 #include "mcaudit.h"
27 #include "memcached.h"
28 #include "protocol/mcbp/dcp_snapshot_marker_codec.h"
29 #include "protocol/mcbp/engine_wrapper.h"
30 #include "runtime.h"
31 #include "server_event.h"
32 #include "settings.h"
33 
34 #include <logger/logger.h>
35 #include <mcbp/mcbp.h>
36 #include <mcbp/protocol/framebuilder.h>
37 #include <mcbp/protocol/header.h>
38 #include <memcached/durability_spec.h>
39 #include <nlohmann/json.hpp>
40 #include <phosphor/phosphor.h>
41 #include <platform/cbassert.h>
42 #include <platform/checked_snprintf.h>
43 #include <platform/socket.h>
44 #include <platform/strerror.h>
45 #include <platform/string_hex.h>
46 #include <platform/timeutils.h>
47 #include <utilities/logtags.h>
48 #include <gsl/gsl>
49 
50 #include <cctype>
51 #include <exception>
52 #ifndef WIN32
53 #include <netinet/tcp.h> // For TCP_NODELAY etc
54 #endif
55 
56 /// The TLS packet is using the following format:
57 /// Byte 0 - Content type
58 /// Byte 1 and 2 - Version
59 /// Byte 3 and 4 - length
60 ///   n bytes of user payload
61 ///   m bytes of MAC
62 ///   o bytes of padding for block ciphers
63 /// Create a constant which represents the maxium amount of data we
64 /// may put in a single TLS frame
65 static constexpr std::size_t TlsFrameSize = 16 * 1024;
66 
to_string(Connection::Priority priority)67 std::string to_string(Connection::Priority priority) {
68     switch (priority) {
69     case Connection::Priority::High:
70         return "High";
71     case Connection::Priority::Medium:
72         return "Medium";
73     case Connection::Priority::Low:
74         return "Low";
75     }
76     throw std::invalid_argument("No such priority: " +
77                                 std::to_string(int(priority)));
78 }
79 
setTcpNoDelay(bool enable)80 bool Connection::setTcpNoDelay(bool enable) {
81     if (socketDescriptor == INVALID_SOCKET) {
82         // Our unit test run without a connected socket (and there is
83         // no point of running setsockopt on an invalid socket and
84         // get the error message from there).. But we don't want them
85         // (the unit tests) to flood the console with error messages
86         // that setsockopt failed
87         return false;
88     }
89 
90     const int flags = enable ? 1 : 0;
91     int error = cb::net::setsockopt(socketDescriptor,
92                                     IPPROTO_TCP,
93                                     TCP_NODELAY,
94                                     reinterpret_cast<const void*>(&flags),
95                                     sizeof(flags));
96 
97     if (error != 0) {
98         std::string errmsg = cb_strerror(cb::net::get_socket_error());
99         LOG_WARNING("setsockopt(TCP_NODELAY): {}", errmsg);
100         nodelay = false;
101         return false;
102     } else {
103         nodelay = enable;
104     }
105 
106     return true;
107 }
108 
109 /**
110  * Get a JSON representation of an event mask
111  *
112  * @param mask the mask to convert to JSON
113  * @return the json representation.
114  */
event_mask_to_json(const short mask)115 static nlohmann::json event_mask_to_json(const short mask) {
116     nlohmann::json ret;
117     nlohmann::json array = nlohmann::json::array();
118 
119     ret["raw"] = cb::to_hex(uint16_t(mask));
120 
121     if (mask & EV_READ) {
122         array.push_back("read");
123     }
124     if (mask & EV_WRITE) {
125         array.push_back("write");
126     }
127     if (mask & EV_PERSIST) {
128         array.push_back("persist");
129     }
130     if (mask & EV_TIMEOUT) {
131         array.push_back("timeout");
132     }
133 
134     ret["decoded"] = array;
135     return ret;
136 }
137 
toJSON() const138 nlohmann::json Connection::toJSON() const {
139     nlohmann::json ret;
140 
141     ret["connection"] = cb::to_hex(uint64_t(this));
142 
143     if (socketDescriptor == INVALID_SOCKET) {
144         ret["socket"] = "disconnected";
145         return ret;
146     }
147 
148     ret["socket"] = socketDescriptor;
149     ret["yields"] = yields.load();
150     ret["protocol"] = "memcached";
151     ret["peername"] = getPeername().c_str();
152     ret["sockname"] = getSockname().c_str();
153     ret["parent_port"] = parent_port;
154     ret["bucket_index"] = getBucketIndex();
155     ret["internal"] = isInternal();
156 
157     if (authenticated) {
158         if (internal) {
159             // We want to be able to map these connections, and given
160             // that it is internal we don't reveal any user data
161             ret["username"] = username;
162         } else {
163             ret["username"] = cb::tagUserData(username);
164         }
165     }
166 
167     ret["refcount"] = refcount;
168 
169     nlohmann::json features = nlohmann::json::array();
170     if (isSupportsMutationExtras()) {
171         features.push_back("mutation extras");
172     }
173     if (isXerrorSupport()) {
174         features.push_back("xerror");
175     }
176     if (nodelay) {
177         features.push_back("tcp nodelay");
178     }
179     if (allowUnorderedExecution()) {
180         features.push_back("unordered execution");
181     }
182     if (tracingEnabled) {
183         features.push_back("tracing");
184     }
185 
186     if (isCollectionsSupported()) {
187         features.push_back("collections");
188     }
189 
190     if (isDuplexSupported()) {
191         features.push_back("duplex");
192     }
193 
194     if (isClustermapChangeNotificationSupported()) {
195         features.push_back("CCN");
196     }
197 
198     ret["features"] = features;
199 
200     ret["thread"] = getThread().index;
201     ret["priority"] = to_string(priority);
202 
203     if (clustermap_revno == -2) {
204         ret["clustermap_revno"] = "unknown";
205     } else {
206         ret["clustermap_revno"] = clustermap_revno;
207     }
208 
209     ret["total_cpu_time"] = std::to_string(total_cpu_time.count());
210     ret["min_sched_time"] = std::to_string(min_sched_time.count());
211     ret["max_sched_time"] = std::to_string(max_sched_time.count());
212 
213     nlohmann::json arr = nlohmann::json::array();
214     for (const auto& c : cookies) {
215         arr.push_back(c->toJSON());
216     }
217     ret["cookies"] = arr;
218 
219     if (agentName.front() != '\0') {
220         ret["agent_name"] = std::string(agentName.data());
221     }
222     if (connectionId.front() != '\0') {
223         ret["connection_id"] = std::string(connectionId.data());
224     }
225 
226     ret["sasl_enabled"] = saslAuthEnabled;
227     ret["dcp"] = isDCP();
228     ret["dcp_xattr_aware"] = isDcpXattrAware();
229     ret["dcp_deleted_user_xattr"] = isDcpDeletedUserXattr();
230     ret["dcp_no_value"] = isDcpNoValue();
231     ret["max_reqs_per_event"] = max_reqs_per_event;
232     ret["nevents"] = numEvents;
233     ret["state"] = getStateName();
234 
235     nlohmann::json libevt;
236     libevt["registered"] = isRegisteredInLibevent();
237     libevt["ev_flags"] = event_mask_to_json(ev_flags);
238     libevt["which"] = event_mask_to_json(currentEvent);
239     ret["libevent"] = libevt;
240 
241     if (read) {
242         ret["read"] = read->to_json();
243     }
244 
245     if (write) {
246         ret["write"] = write->to_json();
247     }
248 
249     ret["write_and_go"] = std::string(stateMachine.getStateName(write_and_go));
250 
251     nlohmann::json iovobj;
252     iovobj["size"] = iov.size();
253     iovobj["used"] = iovused;
254     ret["iov"] = iovobj;
255 
256     nlohmann::json msg;
257     msg["used"] = msglist.size();
258     msg["curr"] = msgcurr;
259     msg["bytes"] = msgbytes;
260     ret["msglist"] = msg;
261 
262     nlohmann::json ilist;
263     ilist["size"] = reservedItems.size();
264     ret["itemlist"] = ilist;
265 
266     nlohmann::json talloc;
267     talloc["size"] = temp_alloc.size();
268     ret["temp_alloc_list"] = talloc;
269 
270     ret["ssl"] = ssl.toJSON();
271     ret["total_recv"] = totalRecv;
272     ret["total_send"] = totalSend;
273 
274     ret["datatype"] = mcbp::datatype::to_string(datatype.getRaw()).c_str();
275 
276     return ret;
277 }
278 
setDCP(bool dcp)279 void Connection::setDCP(bool dcp) {
280     Connection::dcp = dcp;
281 
282     if (isSslEnabled()) {
283         try {
284             // Make sure that we have space for up to a single TLS frame
285             // in our send buffer (so that we can stick all of the mutations
286             // in that buffer)
287             write->ensureCapacity(TlsFrameSize);
288         } catch (const std::bad_alloc&) {
289         }
290     }
291 }
292 
restartAuthentication()293 void Connection::restartAuthentication() {
294     if (authenticated && domain == cb::sasl::Domain::External) {
295         externalAuthManager->logoff(username);
296     }
297     sasl_conn.reset();
298     setInternal(false);
299     authenticated = false;
300     username = "";
301 }
302 
dropPrivilege(cb::rbac::Privilege privilege)303 cb::engine_errc Connection::dropPrivilege(cb::rbac::Privilege privilege) {
304     if (privilegeContext.dropPrivilege(privilege)) {
305         return cb::engine_errc::success;
306     }
307 
308     return cb::engine_errc::no_access;
309 }
310 
checkPrivilege( cb::rbac::Privilege privilege, Cookie& cookie)311 cb::rbac::PrivilegeAccess Connection::checkPrivilege(
312         cb::rbac::Privilege privilege, Cookie& cookie) {
313     cb::rbac::PrivilegeAccess ret;
314     unsigned int retries = 0;
315     const unsigned int max_retries = 100;
316 
317     while ((ret = privilegeContext.check(privilege)) ==
318                    cb::rbac::PrivilegeAccess::Stale &&
319            retries < max_retries) {
320         ++retries;
321         const auto opcode = cookie.getRequest(Cookie::PacketContent::Header)
322                                     .getClientOpcode();
323         const std::string command(to_string(opcode));
324 
325         // The privilege context we had could have been a dummy entry
326         // (created when the client connected, and used until the
327         // connection authenticates). Let's try to automatically update it,
328         // but let the client deal with whatever happens after
329         // a single update.
330         try {
331             privilegeContext = cb::rbac::createContext(
332                     getUsername(), getDomain(), all_buckets[bucketIndex].name);
333         } catch (const cb::rbac::NoSuchBucketException&) {
334             // Remove all access to the bucket
335             privilegeContext =
336                     cb::rbac::createContext(getUsername(), getDomain(), "");
337             LOG_INFO(
338                     "{}: RBAC: Connection::checkPrivilege({}) {} No access "
339                     "to "
340                     "bucket [{}]. command: [{}] new privilege set: {}",
341                     getId(),
342                     to_string(privilege),
343                     getDescription(),
344                     all_buckets[bucketIndex].name,
345                     command,
346                     privilegeContext.to_string());
347         } catch (const cb::rbac::Exception& error) {
348             LOG_WARNING(
349                     "{}: RBAC: Connection::checkPrivilege({}) {}: An "
350                     "exception occurred. command: [{}] bucket: [{}] UUID:"
351                     "[{}] message: {}",
352                     getId(),
353                     to_string(privilege),
354                     getDescription(),
355                     command,
356                     all_buckets[bucketIndex].name,
357                     cookie.getEventId(),
358                     error.what());
359             // Add a textual error as well
360             cookie.setErrorContext("An exception occurred. command: [" +
361                                    command + "]");
362             return cb::rbac::PrivilegeAccess::Fail;
363         }
364     }
365 
366     if (retries == max_retries) {
367         LOG_INFO(
368                 "{}: RBAC: Gave up rebuilding privilege context after {} "
369                 "times. Let the client handle the stale authentication "
370                 "context",
371                 getId(),
372                 retries);
373 
374     } else if (retries > 1) {
375         LOG_INFO("{}: RBAC: Had to rebuild privilege context {} times",
376                  getId(),
377                  retries);
378     }
379 
380     if (ret == cb::rbac::PrivilegeAccess::Fail) {
381         const auto opcode = cookie.getRequest(Cookie::PacketContent::Header)
382                                     .getClientOpcode();
383         const std::string command(to_string(opcode));
384         const std::string privilege_string = cb::rbac::to_string(privilege);
385         const std::string context = privilegeContext.to_string();
386 
387         if (Settings::instance().isPrivilegeDebug()) {
388             audit_privilege_debug(*this,
389                                   command,
390                                   all_buckets[bucketIndex].name,
391                                   privilege_string,
392                                   context);
393 
394             LOG_INFO(
395                     "{}: RBAC privilege debug:{} command:[{}] bucket:[{}] "
396                     "privilege:[{}] context:{}",
397                     getId(),
398                     getDescription(),
399                     command,
400                     all_buckets[bucketIndex].name,
401                     privilege_string,
402                     context);
403 
404             return cb::rbac::PrivilegeAccess::Ok;
405         } else {
406             LOG_INFO(
407                     "{} RBAC {} missing privilege {} for {} in bucket:[{}] "
408                     "with context: "
409                     "{} UUID:[{}]",
410                     getId(),
411                     getDescription(),
412                     privilege_string,
413                     command,
414                     all_buckets[bucketIndex].name,
415                     context,
416                     cookie.getEventId());
417             // Add a textual error as well
418             cookie.setErrorContext("Authorization failure: can't execute " +
419                                    command + " operation without the " +
420                                    privilege_string + " privilege");
421         }
422     }
423 
424     return ret;
425 }
426 
getBucket() const427 Bucket& Connection::getBucket() const {
428     return all_buckets[getBucketIndex()];
429 }
430 
getBucketEngine() const431 EngineIface* Connection::getBucketEngine() const {
432     return getBucket().getEngine();
433 }
434 
remapErrorCode(ENGINE_ERROR_CODE code)435 ENGINE_ERROR_CODE Connection::remapErrorCode(ENGINE_ERROR_CODE code) {
436     if (xerror_support) {
437         return code;
438     }
439 
440     // Check our whitelist
441     switch (code) {
442     case ENGINE_SUCCESS: // FALLTHROUGH
443     case ENGINE_KEY_ENOENT: // FALLTHROUGH
444     case ENGINE_KEY_EEXISTS: // FALLTHROUGH
445     case ENGINE_ENOMEM: // FALLTHROUGH
446     case ENGINE_NOT_STORED: // FALLTHROUGH
447     case ENGINE_EINVAL: // FALLTHROUGH
448     case ENGINE_ENOTSUP: // FALLTHROUGH
449     case ENGINE_EWOULDBLOCK: // FALLTHROUGH
450     case ENGINE_E2BIG: // FALLTHROUGH
451     case ENGINE_DISCONNECT: // FALLTHROUGH
452     case ENGINE_NOT_MY_VBUCKET: // FALLTHROUGH
453     case ENGINE_TMPFAIL: // FALLTHROUGH
454     case ENGINE_ERANGE: // FALLTHROUGH
455     case ENGINE_ROLLBACK: // FALLTHROUGH
456     case ENGINE_EBUSY: // FALLTHROUGH
457     case ENGINE_DELTA_BADVAL: // FALLTHROUGH
458     case ENGINE_PREDICATE_FAILED:
459     case ENGINE_FAILED:
460         return code;
461 
462     case ENGINE_LOCKED:
463         return ENGINE_KEY_EEXISTS;
464     case ENGINE_LOCKED_TMPFAIL:
465         return ENGINE_TMPFAIL;
466     case ENGINE_UNKNOWN_COLLECTION:
467     case ENGINE_COLLECTIONS_MANIFEST_IS_AHEAD:
468         return isCollectionsSupported() ? code : ENGINE_EINVAL;
469 
470     case ENGINE_EACCESS:break;
471     case ENGINE_NO_BUCKET:break;
472     case ENGINE_AUTH_STALE:break;
473     case ENGINE_DURABILITY_INVALID_LEVEL:
474     case ENGINE_DURABILITY_IMPOSSIBLE:
475     case ENGINE_SYNC_WRITE_PENDING:
476         break;
477     case ENGINE_SYNC_WRITE_IN_PROGRESS:
478     case ENGINE_SYNC_WRITE_RECOMMIT_IN_PROGRESS:
479         // we can return tmpfail to old clients and have them retry the
480         // operation
481         return ENGINE_TMPFAIL;
482     case ENGINE_SYNC_WRITE_AMBIGUOUS:
483     case ENGINE_DCP_STREAMID_INVALID:
484         break;
485     }
486 
487     // Seems like the rest of the components in our system isn't
488     // prepared to receive access denied or authentincation stale.
489     // For now we should just disconnect them
490     auto errc = cb::make_error_condition(cb::engine_errc(code));
491     LOG_WARNING(
492             "{} - Client {} not aware of extended error code ({}). "
493             "Disconnecting",
494             getId(),
495             getDescription().c_str(),
496             errc.message().c_str());
497     setTerminationReason("XError not enabled on client");
498 
499     return ENGINE_DISCONNECT;
500 }
501 
resetUsernameCache()502 void Connection::resetUsernameCache() {
503     if (sasl_conn.isInitialized()) {
504         username = sasl_conn.getUsername();
505         domain = sasl_conn.getDomain();
506     } else {
507         username = "unknown";
508         domain = cb::sasl::Domain::Local;
509     }
510 
511     updateDescription();
512 }
513 
updateDescription()514 void Connection::updateDescription() {
515     description.assign("[ " + getPeername() + " - " + getSockname());
516     if (authenticated) {
517         description += " (";
518         if (isInternal()) {
519             description += "System, ";
520         }
521         description += cb::tagUserData(getUsername());
522 
523         if (domain == cb::sasl::Domain::External) {
524             description += " (LDAP)";
525         }
526         description += ")";
527     } else {
528         description += " (not authenticated)";
529     }
530     description += " ]";
531 }
532 
setBucketIndex(int bucketIndex)533 void Connection::setBucketIndex(int bucketIndex) {
534     Connection::bucketIndex.store(bucketIndex, std::memory_order_relaxed);
535 
536     // Update the privilege context. If a problem occurs within the RBAC
537     // module we'll assign an empty privilege context to the connection.
538     try {
539         if (authenticated) {
540             // The user have logged in, so we should create a context
541             // representing the users context in the desired bucket.
542             privilegeContext = cb::rbac::createContext(
543                     username, getDomain(), all_buckets[bucketIndex].name);
544         } else if (is_default_bucket_enabled() &&
545                    strcmp("default", all_buckets[bucketIndex].name) == 0) {
546             // We've just connected to the _default_ bucket, _AND_ the client
547             // is unknown.
548             // Personally I think the "default bucket" concept is a really
549             // really bad idea, but we need to be backwards compatible for
550             // a while... lets look up a profile named "default" and
551             // assign that. It should only contain access to the default
552             // bucket.
553             privilegeContext = cb::rbac::createContext(
554                     "default", getDomain(), all_buckets[bucketIndex].name);
555         } else {
556             // The user has not authenticated, and this isn't for the
557             // "default bucket". Assign an empty profile which won't give
558             // you any privileges.
559             privilegeContext = cb::rbac::PrivilegeContext{getDomain()};
560         }
561     } catch (const cb::rbac::Exception&) {
562         privilegeContext = cb::rbac::PrivilegeContext{getDomain()};
563     }
564 
565     if (bucketIndex == 0) {
566         // If we're connected to the no bucket we should return
567         // no bucket instead of EACCESS. Lets give the connection all
568         // possible bucket privileges
569         privilegeContext.setBucketPrivileges();
570     }
571 }
572 
addCpuTime(std::chrono::nanoseconds ns)573 void Connection::addCpuTime(std::chrono::nanoseconds ns) {
574     total_cpu_time += ns;
575     min_sched_time = std::min(min_sched_time, ns);
576     max_sched_time = std::max(min_sched_time, ns);
577 }
578 
enqueueServerEvent(std::unique_ptr<ServerEvent> event)579 void Connection::enqueueServerEvent(std::unique_ptr<ServerEvent> event) {
580     server_events.push(std::move(event));
581 }
582 
unregisterEvent()583 bool Connection::unregisterEvent() {
584     if (!registered_in_libevent) {
585         LOG_WARNING(
586                 "Connection::unregisterEvent: Not registered in libevent - "
587                 "ignoring unregister attempt");
588         return false;
589     }
590 
591     cb_assert(socketDescriptor != INVALID_SOCKET);
592 
593     if (event_del(event.get()) == -1) {
594         LOG_WARNING("Failed to remove connection to libevent: {}",
595                     cb_strerror());
596         return false;
597     }
598 
599     registered_in_libevent = false;
600     return true;
601 }
602 
registerEvent()603 bool Connection::registerEvent() {
604     if (registered_in_libevent) {
605         LOG_WARNING(
606                 "Connection::registerEvent: Already registered in"
607                 " libevent - ignoring register attempt");
608         return false;
609     }
610 
611     if (event_add(event.get(), nullptr) == -1) {
612         LOG_WARNING("Failed to add connection to libevent: {}", cb_strerror());
613         return false;
614     }
615 
616     registered_in_libevent = true;
617     return true;
618 }
619 
updateEvent(const short new_flags)620 bool Connection::updateEvent(const short new_flags) {
621     if (ssl.isEnabled() && ssl.isConnected() && (new_flags & EV_READ)) {
622         /*
623          * If we want more data and we have SSL, that data might be inside
624          * SSL's internal buffers rather than inside the socket buffer. In
625          * that case signal an EV_READ event without actually polling the
626          * socket.
627          */
628         if (ssl.havePendingInputData()) {
629             // signal a call to the handler
630             event_active(event.get(), EV_READ, 0);
631             return true;
632         }
633     }
634 
635     if (ev_flags == new_flags) {
636         // We do "cache" the current libevent state (using EV_PERSIST) to avoid
637         // having to re-register it when it doesn't change (which it mostly
638         // don't).
639         return true;
640     }
641 
642     if (!unregisterEvent()) {
643         LOG_WARNING(
644                 "{}: Failed to remove connection from event notification "
645                 "library. Shutting down connection {}",
646                 getId(),
647                 getDescription());
648         return false;
649     }
650 
651     if (event_assign(event.get(),
652                      base,
653                      socketDescriptor,
654                      new_flags,
655                      event_handler,
656                      reinterpret_cast<void*>(this)) == -1) {
657         LOG_WARNING(
658                 "{}: Failed to set up event notification. "
659                 "Shutting down connection {}",
660                 getId(),
661                 getDescription());
662         return false;
663     }
664     ev_flags = new_flags;
665 
666     if (!registerEvent()) {
667         LOG_WARNING(
668                 "{}: Failed to add connection to the event notification "
669                 "library. Shutting down connection {}",
670                 getId(),
671                 getDescription());
672         return false;
673     }
674 
675     return true;
676 }
677 
initializeEvent()678 bool Connection::initializeEvent() {
679     short event_flags = (EV_READ | EV_PERSIST);
680 
681     event.reset(event_new(base,
682                           socketDescriptor,
683                           event_flags,
684                           event_handler,
685                           reinterpret_cast<void*>(this)));
686 
687     if (!event) {
688         throw std::bad_alloc();
689     }
690     ev_flags = event_flags;
691 
692     return registerEvent();
693 }
694 
shrinkBuffers()695 void Connection::shrinkBuffers() {
696     // We share the buffers with the thread, so we don't need to worry
697     // about the read and write buffer.
698 
699     if (msglist.size() > MSG_LIST_HIGHWAT) {
700         try {
701             msglist.resize(MSG_LIST_INITIAL);
702             msglist.shrink_to_fit();
703         } catch (const std::bad_alloc&) {
704             LOG_WARNING("{}: Failed to shrink msglist down to {} elements.",
705                         getId(),
706                         MSG_LIST_INITIAL);
707         }
708     }
709 
710     if (iov.size() > IOV_LIST_HIGHWAT) {
711         try {
712             iov.resize(IOV_LIST_INITIAL);
713             iov.shrink_to_fit();
714         } catch (const std::bad_alloc&) {
715             LOG_WARNING("{}: Failed to shrink iov down to {} elements.",
716                         getId(),
717                         IOV_LIST_INITIAL);
718         }
719     }
720 }
721 
setAuthenticated(bool authenticated)722 void Connection::setAuthenticated(bool authenticated) {
723     Connection::authenticated = authenticated;
724     if (authenticated) {
725         updateDescription();
726         privilegeContext = cb::rbac::createContext(username, getDomain(), "");
727     } else {
728         resetUsernameCache();
729         privilegeContext = cb::rbac::PrivilegeContext{getDomain()};
730     }
731 }
732 
tryAuthFromSslCert(const std::string& userName)733 bool Connection::tryAuthFromSslCert(const std::string& userName) {
734     username.assign(userName);
735     domain = cb::sasl::Domain::Local;
736 
737     try {
738         auto context =
739                 cb::rbac::createInitialContext(getUsername(), getDomain());
740         setAuthenticated(true);
741         setInternal(context.second);
742         audit_auth_success(*this);
743         LOG_INFO(
744                 "{}: Client {} authenticated as '{}' via X509 "
745                 "certificate",
746                 getId(),
747                 getPeername(),
748                 cb::UserDataView(getUsername()));
749         // Connections authenticated by using X.509 certificates should not
750         // be able to use SASL to change it's identity.
751         saslAuthEnabled = false;
752     } catch (const cb::rbac::NoSuchUserException& e) {
753         setAuthenticated(false);
754         LOG_WARNING("{}: User [{}] is not defined as a user in Couchbase",
755                     getId(),
756                     cb::UserDataView(e.what()));
757         return false;
758     }
759     return true;
760 }
761 
logSslErrorInfo(const std::string& method, int rval)762 void Connection::logSslErrorInfo(const std::string& method, int rval) {
763     const int error = ssl.getError(rval);
764     unsigned long code = ERR_peek_error();
765     if (code == 0) {
766         LOG_WARNING("{}: ERROR: {} returned {} with error {}",
767                     getId(),
768                     method,
769                     rval,
770                     error);
771         return;
772     }
773 
774     try {
775         std::string errmsg(method + "() returned " +
776                            std::to_string(rval) + " with error " +
777                            std::to_string(error));
778         while ((code = ERR_get_error()) != 0) {
779             std::vector<char> ssl_err(1024);
780             ERR_error_string_n(
781                                code, ssl_err.data(), ssl_err.size());
782             LOG_WARNING("{}: {}: {}", getId(), errmsg, ssl_err.data());
783         }
784     } catch (const std::bad_alloc&) {
785         // unable to print error message; continue.
786         LOG_WARNING("{}: {}() returned {} with error {}",
787                     getId(),
788                     method,
789                     rval,
790                     error);
791     }
792 }
793 
sslPreConnection()794 int Connection::sslPreConnection() {
795     int r = ssl.accept();
796     if (r == 1) {
797         ssl.drainBioSendPipe(socketDescriptor);
798         ssl.setConnected();
799         auto certResult = ssl.getCertUserName();
800         bool disconnect = false;
801         switch (certResult.first) {
802         case cb::x509::Status::NoMatch:
803         case cb::x509::Status::Error:
804             disconnect = true;
805             break;
806         case cb::x509::Status::NotPresent:
807             if (Settings::instance().getClientCertMode() ==
808                 cb::x509::Mode::Mandatory) {
809                 disconnect = true;
810             } else if (is_default_bucket_enabled()) {
811                 associate_bucket(*this, "default");
812             }
813             break;
814         case cb::x509::Status::Success:
815             if (!tryAuthFromSslCert(certResult.second)) {
816                 disconnect = true;
817                 // Don't print an error message... already logged
818                 certResult.second.resize(0);
819             }
820         }
821         if (disconnect) {
822             // Set the username to "[unknown]" if we failed to pick
823             // out a username from the certificate to avoid the
824             // audit event being "empty"
825             if (certResult.first == cb::x509::Status::NotPresent) {
826                 audit_auth_failure(
827                         *this, "Client did not provide an X.509 certificate");
828             } else {
829                 audit_auth_failure(
830                         *this,
831                         "Failed to use client provided X.509 certificate");
832             }
833             cb::net::set_econnreset();
834             if (!certResult.second.empty()) {
835                 LOG_WARNING(
836                         "{}: SslPreConnection: disconnection client due to"
837                         " error [{}]",
838                         getId(),
839                         certResult.second);
840             }
841             return -1;
842         }
843 
844          LOG_INFO("{}: Using SSL cipher:{}",
845                  getId(),
846                  ssl.getCurrentCipherName());
847     } else {
848         if (ssl.getError(r) == SSL_ERROR_WANT_READ) {
849             ssl.drainBioSendPipe(socketDescriptor);
850             cb::net::set_ewouldblock();
851             return -1;
852         } else {
853             logSslErrorInfo("SSL_accept", r);
854             cb::net::set_econnreset();
855             return -1;
856         }
857     }
858 
859     return 0;
860 }
861 
recv(char* dest, size_t nbytes)862 int Connection::recv(char* dest, size_t nbytes) {
863     if (nbytes == 0) {
864         throw std::logic_error("Connection::recv: Can't read 0 bytes");
865     }
866 
867     int res = -1;
868     if (ssl.isEnabled()) {
869         ssl.drainBioRecvPipe(socketDescriptor);
870 
871         if (ssl.hasError()) {
872             cb::net::set_econnreset();
873             return -1;
874         }
875 
876         if (!ssl.isConnected()) {
877             res = sslPreConnection();
878             if (res == -1) {
879                 return -1;
880             }
881         }
882 
883         /* The SSL negotiation might be complete at this time */
884         if (ssl.isConnected()) {
885             res = sslRead(dest, nbytes);
886         }
887     } else {
888         res = (int)::cb::net::recv(socketDescriptor, dest, nbytes, 0);
889         if (res > 0) {
890             totalRecv += res;
891         }
892     }
893 
894     return res;
895 }
896 
sendmsg(struct msghdr* m)897 ssize_t Connection::sendmsg(struct msghdr* m) {
898     ssize_t res = 0;
899     if (ssl.isEnabled()) {
900         for (int ii = 0; ii < int(m->msg_iovlen); ++ii) {
901             int n = sslWrite(reinterpret_cast<char*>(m->msg_iov[ii].iov_base),
902                              m->msg_iov[ii].iov_len);
903             if (n > 0) {
904                 res += n;
905                 if (n != int(m->msg_iov[ii].iov_len)) {
906                     // We didnt' send the entire chunk. return the number
907                     // of bytes sent so far to the caller and let them
908                     // deal with adjusting the pointers and retry
909                     return res;
910                 }
911             } else {
912                 // We failed to send the data over ssl. it might be
913                 // because the underlying socket buffer is full, or
914                 // if there is a real error with the socket or inside
915                 // OpenSSL. If the error is because we the network
916                 // is full we'll return the number of bytes we've sent
917                 // so far (so that it may adjust the iov_base and iov_len
918                 // fields before it'll try to call us again and the
919                 // send will most likely fail again, but this we'll
920                 // return -1 and when the caller checks it'll see it is
921                 // because the network buffer is full).
922                 auto error = cb::net::get_socket_error();
923                 if (cb::net::is_blocking(error) && res > 0) {
924                     return res;
925                 }
926                 return -1;
927             }
928         }
929         ssl.drainBioSendPipe(socketDescriptor);
930         return res;
931     } else {
932         res = cb::net::sendmsg(socketDescriptor, m, 0);
933         if (res > 0) {
934             totalSend += res;
935         }
936     }
937 
938     return res;
939 }
940 
941 /**
942  * Adjust the msghdr by "removing" n bytes of data from it.
943  *
944  * @param m the msgheader to update
945  * @param nbytes
946  * @return the number of bytes left in the current iov entry
947  */
adjust_msghdr(cb::Pipe& pipe, struct msghdr* m, ssize_t nbytes)948 size_t adjust_msghdr(cb::Pipe& pipe, struct msghdr* m, ssize_t nbytes) {
949     auto rbuf = pipe.rdata();
950 
951     // We've written some of the data. Remove the completed
952     // iovec entries from the list of pending writes.
953     while (m->msg_iovlen > 0 && nbytes >= ssize_t(m->msg_iov->iov_len)) {
954         if (rbuf.data() == static_cast<const uint8_t*>(m->msg_iov->iov_base)) {
955             pipe.consumed(m->msg_iov->iov_len);
956             rbuf = pipe.rdata();
957         }
958         nbytes -= (ssize_t)m->msg_iov->iov_len;
959         m->msg_iovlen--;
960         m->msg_iov++;
961     }
962 
963     // Might have written just part of the last iovec entry;
964     // adjust it so the next write will do the rest.
965     if (nbytes > 0) {
966         if (rbuf.data() == static_cast<const uint8_t*>(m->msg_iov->iov_base)) {
967             pipe.consumed(nbytes);
968         }
969         m->msg_iov->iov_base =
970                 (void*)((unsigned char*)m->msg_iov->iov_base + nbytes);
971         m->msg_iov->iov_len -= nbytes;
972     }
973 
974     return m->msg_iov->iov_len;
975 }
976 
transmit()977 Connection::TransmitResult Connection::transmit() {
978     if (ssl.isEnabled()) {
979         // We use OpenSSL to write data into a buffer before we send it
980         // over the wire... Lets go ahead and drain that BIO pipe before
981         // we may do anything else.
982         ssl.drainBioSendPipe(socketDescriptor);
983         if (ssl.morePendingOutput()) {
984             if (ssl.hasError() || !updateEvent(EV_WRITE | EV_PERSIST)) {
985                 setTerminationReason("Failed to send data to client");
986                 setState(StateMachine::State::closing);
987                 return TransmitResult::HardError;
988             }
989             return TransmitResult::SoftError;
990         }
991 
992         // The output buffer is completely drained (well, put in the kernel
993         // buffer to send to the client). Go ahead and send more data
994     }
995 
996     while (msgcurr < msglist.size() && msglist[msgcurr].msg_iovlen == 0) {
997         /* Finished writing the current msg; advance to the next. */
998         msgcurr++;
999     }
1000 
1001     if (msgcurr < msglist.size()) {
1002         ssize_t res;
1003         struct msghdr* m = &msglist[msgcurr];
1004 
1005         res = sendmsg(m);
1006         auto error = cb::net::get_socket_error();
1007         if (res > 0) {
1008             get_thread_stats(this)->bytes_written += res;
1009 
1010             if (adjust_msghdr(*write, m, res) == 0) {
1011                 msgcurr++;
1012                 if (msgcurr == msglist.size()) {
1013                     // We sent the final chunk of data.. In our SSL connections
1014                     // we might however have data spooled in the SSL buffers
1015                     // which needs to be drained before we may consider the
1016                     // transmission complete (note that our sendmsg tried
1017                     // to drain the buffers before returning).
1018                     if (ssl.isEnabled() && ssl.morePendingOutput()) {
1019                         if (ssl.hasError() ||
1020                             !updateEvent(EV_WRITE | EV_PERSIST)) {
1021                             setTerminationReason(
1022                                     "Failed to send data to client");
1023                             setState(StateMachine::State::closing);
1024                             return TransmitResult::HardError;
1025                         }
1026                         return TransmitResult::SoftError;
1027                     }
1028                     return TransmitResult::Complete;
1029                 }
1030             }
1031 
1032             return TransmitResult::Incomplete;
1033         }
1034 
1035         if (res == -1 && cb::net::is_blocking(error)) {
1036             if (!updateEvent(EV_WRITE | EV_PERSIST)) {
1037                 setTerminationReason("Failed to send data to client");
1038                 setState(StateMachine::State::closing);
1039                 return TransmitResult::HardError;
1040             }
1041             return TransmitResult::SoftError;
1042         }
1043 
1044         // if res == 0 or res == -1 and error is not EAGAIN or EWOULDBLOCK,
1045         // we have a real error, on which we close the connection
1046         if (res == -1) {
1047             if (cb::net::is_closed_conn(error)) {
1048                 LOG_INFO("{}: Failed to send data; peer closed the connection",
1049                          getId());
1050             } else {
1051                 LOG_WARNING("Failed to write, and not due to blocking: {}",
1052                             cb_strerror(error));
1053             }
1054         } else {
1055             // sendmsg should return the number of bytes written, but we
1056             // sent 0 bytes. That shouldn't be possible unless we
1057             // requested to write 0 bytes (otherwise we should have gotten
1058             // -1 with EWOULDBLOCK)
1059             // Log the request buffer so that we can look into this
1060             LOG_WARNING("{} - sendmsg returned 0", socketDescriptor);
1061             for (int ii = 0; ii < int(m->msg_iovlen); ++ii) {
1062                 LOG_WARNING(
1063                         "\t{} - {}", socketDescriptor, m->msg_iov[ii].iov_len);
1064             }
1065         }
1066 
1067         setTerminationReason("Failed to send data to client");
1068         setState(StateMachine::State::closing);
1069         return TransmitResult::HardError;
1070     } else {
1071         return TransmitResult::Complete;
1072     }
1073 }
1074 
1075 /**
1076  * To protect us from someone flooding a connection with bogus data causing
1077  * the connection to eat up all available memory, break out and start
1078  * looking at the data I've got after a number of reallocs...
1079  */
tryReadNetwork()1080 Connection::TryReadResult Connection::tryReadNetwork() {
1081     // When we get here we've either got an empty buffer, or we've got
1082     // a buffer with less than a packet header filled in.
1083     //
1084     // Verify that assumption!!!
1085     if (read->rsize() >= sizeof(cb::mcbp::Request)) {
1086         // The above don't hold true ;)
1087         throw std::logic_error(
1088                 "tryReadNetwork: Expected the input buffer to be empty or "
1089                 "contain a partial header");
1090     }
1091 
1092     // Make sure we can fit the header into the input buffer
1093     try {
1094         read->ensureCapacity(sizeof(cb::mcbp::Request) - read->rsize());
1095     } catch (const std::bad_alloc&) {
1096         LOG_WARNING(
1097                 "{}: Failed to allocate memory for package header. Closing "
1098                 "connection {}",
1099                 getId(),
1100                 getDescription());
1101         return TryReadResult::MemoryError;
1102     }
1103 
1104     Connection* c = this;
1105     const auto res = read->produce([c](cb::byte_buffer buffer) -> ssize_t {
1106         return c->recv(reinterpret_cast<char*>(buffer.data()), buffer.size());
1107     });
1108 
1109     if (res > 0) {
1110         get_thread_stats(this)->bytes_read += res;
1111         return TryReadResult::DataReceived;
1112     }
1113 
1114     if (res == 0) {
1115         LOG_DEBUG(
1116                 "{} Closing connection as the other side closed the "
1117                 "connection {}",
1118                 getId(),
1119                 getDescription());
1120         return TryReadResult::SocketClosed;
1121     }
1122 
1123     const auto error = cb::net::get_socket_error();
1124     if (cb::net::is_blocking(error)) {
1125         return TryReadResult::NoDataReceived;
1126     }
1127 
1128     // There was an error reading from the socket. There isn't much we
1129     // can do about that apart from logging it and close the connection.
1130     // Keep this as INFO as it isn't a problem with the memcached server,
1131     // it is a network issue (or a bad client not closing the connection
1132     // cleanly)
1133     LOG_INFO("{} Closing connection {} due to read error: {}",
1134              getId(),
1135              getDescription(),
1136              cb_strerror(error));
1137     return TryReadResult::SocketError;
1138 }
1139 
sslRead(char* dest, size_t nbytes)1140 int Connection::sslRead(char* dest, size_t nbytes) {
1141     int ret = 0;
1142 
1143     while (ret < int(nbytes)) {
1144         int n;
1145         ssl.drainBioRecvPipe(socketDescriptor);
1146         if (ssl.hasError()) {
1147             cb::net::set_econnreset();
1148             return -1;
1149         }
1150         n = ssl.read(dest + ret, (int)(nbytes - ret));
1151         if (n > 0) {
1152             ret += n;
1153         } else {
1154             /* n < 0 and n == 0 require a check of SSL error*/
1155             const int error = ssl.getError(n);
1156 
1157             switch (error) {
1158             case SSL_ERROR_WANT_READ:
1159                 /*
1160                  * Drain the buffers and retry if we've got data in
1161                  * our input buffers
1162                  */
1163                 if (ssl.moreInputAvailable()) {
1164                     /* our recv buf has data feed the BIO */
1165                     ssl.drainBioRecvPipe(socketDescriptor);
1166                 } else if (ret > 0) {
1167                     /* nothing in our recv buf, return what we have */
1168                     return ret;
1169                 } else {
1170                     cb::net::set_ewouldblock();
1171                     return -1;
1172                 }
1173                 break;
1174 
1175             case SSL_ERROR_ZERO_RETURN:
1176                 /* The TLS/SSL connection has been closed (cleanly). */
1177                 return 0;
1178 
1179             default:
1180                 logSslErrorInfo("SSL_read", n);
1181                 cb::net::set_econnreset();
1182                 return -1;
1183             }
1184         }
1185     }
1186 
1187     return ret;
1188 }
1189 
sslWrite(const char* src, size_t nbytes)1190 int Connection::sslWrite(const char* src, size_t nbytes) {
1191     // Start by trying to send everything we've already got
1192     // buffered in our bio
1193     ssl.drainBioSendPipe(socketDescriptor);
1194     if (ssl.hasError()) {
1195         cb::net::set_econnreset();
1196         return -1;
1197     }
1198 
1199     // If the network socket is full there isn't much point
1200     // of trying to add more to SSL
1201     if (ssl.morePendingOutput()) {
1202         cb::net::set_ewouldblock();
1203         return -1;
1204     }
1205 
1206     // We've got an empty buffer for SSL to operate on,
1207     // so lets get to it.
1208     do {
1209         const auto n = ssl.write(src, int(nbytes));
1210         if (n > 0) {
1211             // we've successfully sent some bytes to
1212             // SSL. Lets try to flush it to the network
1213             // socket buffers.
1214             ssl.drainBioSendPipe(socketDescriptor);
1215             if (ssl.hasError()) {
1216                 cb::net::set_econnreset();
1217                 return -1;
1218             }
1219             // Return the number of bytes submitted to SSL
1220             return n;
1221         } else if (n == 0) {
1222             // Closed connection.
1223             cb::net::set_econnreset();
1224             return -1;
1225         } else {
1226             const int error = ssl.getError(n);
1227             if (error == SSL_ERROR_WANT_WRITE) {
1228                 ssl.drainBioSendPipe(socketDescriptor);
1229                 if (ssl.morePendingOutput()) {
1230                     cb::net::set_ewouldblock();
1231                     return -1;
1232                 }
1233                 // We've got space in the network buffer.
1234                 // retry the operation (and openssl will
1235                 // try to fill the network buffer again)
1236             } else {
1237                 // We enountered an error we don't have code
1238                 // to handle. Reset the connection
1239                 logSslErrorInfo("SSL_write", n);
1240                 cb::net::set_econnreset();
1241                 return -1;
1242             }
1243         }
1244     } while (true);
1245 }
1246 
useCookieSendResponse(std::size_t size) const1247 bool Connection::useCookieSendResponse(std::size_t size) const {
1248     // The limit for when to copy the data into a separate response
1249     // buffer instead of using the IO vector to send the data. The limit
1250     // is currently hardcoded, but it should probably be possible to tune
1251     // the value. (when the cost of copying exceeds the cost of creating
1252     // two (or 3) extra TLS frames. Start by keep it fixed to see if it
1253     // makes any difference in showfast.
1254     static constexpr std::size_t SslCopyLimit = 4096;
1255     return isSslEnabled() && size < SslCopyLimit;
1256 }
1257 
dcpUseWriteBuffer(size_t size) const1258 bool Connection::dcpUseWriteBuffer(size_t size) const {
1259     return isSslEnabled() && size < write->wsize();
1260 }
1261 
addMsgHdr(bool reset)1262 void Connection::addMsgHdr(bool reset) {
1263     if (reset) {
1264         msgcurr = 0;
1265         msglist.clear();
1266         iovused = 0;
1267     }
1268 
1269     msglist.emplace_back();
1270 
1271     struct msghdr& msg = msglist.back();
1272 
1273     /* this wipes msg_iovlen, msg_control, msg_controllen, and
1274        msg_flags, the last 3 of which aren't defined on solaris: */
1275     memset(&msg, 0, sizeof(struct msghdr));
1276 
1277     msg.msg_iov = &iov.data()[iovused];
1278 
1279     msgbytes = 0;
1280     STATS_MAX(this, msgused_high_watermark, gsl::narrow<int>(msglist.size()));
1281 }
1282 
addIov(const void* buf, size_t len)1283 void Connection::addIov(const void* buf, size_t len) {
1284     if (len == 0) {
1285         return;
1286     }
1287 
1288     struct msghdr* m = &msglist.back();
1289 
1290     /* We may need to start a new msghdr if this one is full. */
1291     if (m->msg_iovlen == IOV_MAX) {
1292         addMsgHdr(false);
1293     }
1294 
1295     ensureIovSpace();
1296 
1297     // Update 'm' as we may have added an additional msghdr
1298     m = &msglist.back();
1299     // If this entry is right after the previous one we can just
1300     // extend the previous entry instead of adding a new one
1301     bool addNewEntry = true;
1302     if (m->msg_iovlen > 0) {
1303         auto& prev = m->msg_iov[m->msg_iovlen - 1];
1304         const auto* p = static_cast<const char*>(prev.iov_base) + prev.iov_len;
1305         if (buf == p) {
1306             prev.iov_len += len;
1307             addNewEntry = false;
1308         }
1309     }
1310 
1311     if (addNewEntry) {
1312         m->msg_iov[m->msg_iovlen].iov_base = (void*)buf;
1313         m->msg_iov[m->msg_iovlen].iov_len = len;
1314         ++iovused;
1315         STATS_MAX(this, iovused_high_watermark, gsl::narrow<int>(getIovUsed()));
1316         m->msg_iovlen++;
1317     }
1318 
1319     msgbytes += len;
1320 }
1321 
releaseReservedItems()1322 void Connection::releaseReservedItems() {
1323     auto* bucketEngine = getBucket().getEngine();
1324     for (auto* it : reservedItems) {
1325         bucketEngine->release(it);
1326     }
1327     reservedItems.clear();
1328 }
1329 
ensureIovSpace()1330 void Connection::ensureIovSpace() {
1331     if (iovused < iov.size()) {
1332         // There is still size in the list
1333         return;
1334     }
1335 
1336     // Try to double the size of the array
1337     iov.resize(iov.size() * 2);
1338 
1339     /* Point all the msghdr structures at the new list. */
1340     size_t ii;
1341     int iovnum;
1342     for (ii = 0, iovnum = 0; ii < msglist.size(); ii++) {
1343         msglist[ii].msg_iov = &iov[iovnum];
1344         iovnum += msglist[ii].msg_iovlen;
1345     }
1346 }
1347 
enableSSL(const std::string& cert, const std::string& pkey)1348 bool Connection::enableSSL(const std::string& cert, const std::string& pkey) {
1349     if (ssl.enable(cert, pkey)) {
1350         if (Settings::instance().getVerbose() > 1) {
1351             ssl.dumpCipherList(getId());
1352         }
1353 
1354         return true;
1355     }
1356 
1357     return false;
1358 }
1359 
Connection(FrontEndThread& thr)1360 Connection::Connection(FrontEndThread& thr)
1361     : socketDescriptor(INVALID_SOCKET),
1362       connectedToSystemPort(false),
1363       base(nullptr),
1364       thread(thr),
1365       peername("unknown"),
1366       sockname("unknown"),
1367       stateMachine(*this),
1368       max_reqs_per_event(Settings::instance().getRequestsPerEventNotification(
1369               EventPriority::Default)) {
1370     updateDescription();
1371     cookies.emplace_back(std::unique_ptr<Cookie>{new Cookie(*this)});
1372     setConnectionId(peername.c_str());
1373     msglist.reserve(MSG_LIST_INITIAL);
1374     iov.resize(IOV_LIST_INITIAL);
1375 }
1376 
Connection(SOCKET sfd, event_base* b, const ListeningPort& ifc, FrontEndThread& thr)1377 Connection::Connection(SOCKET sfd,
1378                        event_base* b,
1379                        const ListeningPort& ifc,
1380                        FrontEndThread& thr)
1381     : socketDescriptor(sfd),
1382       connectedToSystemPort(ifc.system),
1383       base(b),
1384       thread(thr),
1385       parent_port(ifc.port),
1386       peername(cb::net::getpeername(socketDescriptor)),
1387       sockname(cb::net::getsockname(socketDescriptor)),
1388       stateMachine(*this),
1389       max_reqs_per_event(Settings::instance().getRequestsPerEventNotification(
1390               EventPriority::Default)) {
1391     setTcpNoDelay(true);
1392     updateDescription();
1393     cookies.emplace_back(std::unique_ptr<Cookie>{new Cookie(*this)});
1394     msglist.reserve(MSG_LIST_INITIAL);
1395     iov.resize(IOV_LIST_INITIAL);
1396 
1397     if (ifc.isSslPort()) {
1398         if (!enableSSL(ifc.sslCert, ifc.sslKey)) {
1399             throw std::runtime_error(std::to_string(getId()) +
1400                                      " Failed to enable SSL");
1401         }
1402     }
1403 
1404     if (!initializeEvent()) {
1405         throw std::runtime_error("Failed to initialize event structure");
1406     }
1407     setConnectionId(peername.c_str());
1408 }
1409 
~Connection()1410 Connection::~Connection() {
1411     cb::audit::addSessionTerminated(*this);
1412 
1413     if (connectedToSystemPort) {
1414         --stats.system_conns;
1415     }
1416     if (authenticated && domain == cb::sasl::Domain::External) {
1417         externalAuthManager->logoff(username);
1418     }
1419 
1420     releaseReservedItems();
1421     for (auto* ptr : temp_alloc) {
1422         cb_free(ptr);
1423     }
1424     if (socketDescriptor != INVALID_SOCKET) {
1425         LOG_DEBUG("{} - Closing socket descriptor", getId());
1426         safe_close(socketDescriptor);
1427     }
1428 }
1429 
setTerminationReason(std::string reason)1430 void Connection::setTerminationReason(std::string reason) {
1431     if (terminationReason.empty()) {
1432         terminationReason = std::move(reason);
1433     } else {
1434         terminationReason.append(";");
1435         terminationReason.append(reason);
1436     }
1437 }
1438 
setState(StateMachine::State next_state)1439 void Connection::setState(StateMachine::State next_state) {
1440     stateMachine.setCurrentState(next_state);
1441 }
1442 
runStateMachinery()1443 void Connection::runStateMachinery() {
1444     if (Settings::instance().getVerbose() > 1) {
1445         do {
1446             LOG_DEBUG("{} - Running task: {}",
1447                       getId(),
1448                       stateMachine.getCurrentStateName());
1449         } while (stateMachine.execute());
1450     } else {
1451         while (stateMachine.execute()) {
1452             // empty
1453         }
1454     }
1455 }
1456 
setAgentName(cb::const_char_buffer name)1457 void Connection::setAgentName(cb::const_char_buffer name) {
1458     auto size = std::min(name.size(), agentName.size() - 1);
1459     std::copy(name.begin(), name.begin() + size, agentName.begin());
1460     agentName[size] = '\0';
1461 }
1462 
setConnectionId(cb::const_char_buffer uuid)1463 void Connection::setConnectionId(cb::const_char_buffer uuid) {
1464     auto size = std::min(uuid.size(), connectionId.size() - 1);
1465     std::copy(uuid.begin(), uuid.begin() + size, connectionId.begin());
1466     // the uuid string shall always be zero terminated
1467     connectionId[size] = '\0';
1468 }
1469 
shouldDelete()1470 bool Connection::shouldDelete() {
1471     return getState() == StateMachine::State ::destroyed;
1472 }
1473 
setInternal(bool internal)1474 void Connection::setInternal(bool internal) {
1475     Connection::internal = internal;
1476 }
1477 
getNumberOfCookies() const1478 size_t Connection::getNumberOfCookies() const {
1479     size_t ret = 0;
1480     for (const auto& cookie : cookies) {
1481         if (cookie) {
1482             ++ret;
1483         }
1484     }
1485 
1486     return ret;
1487 }
1488 
isPacketAvailable() const1489 bool Connection::isPacketAvailable() const {
1490     auto buffer = read->rdata();
1491 
1492     if (buffer.size() < sizeof(cb::mcbp::Request)) {
1493         // we don't have the header, so we can't even look at the body
1494         // length
1495         return false;
1496     }
1497 
1498     const auto* req = reinterpret_cast<const cb::mcbp::Request*>(buffer.data());
1499     return buffer.size() >= sizeof(cb::mcbp::Request) + req->getBodylen();
1500 }
1501 
processServerEvents()1502 bool Connection::processServerEvents() {
1503     if (server_events.empty()) {
1504         return false;
1505     }
1506 
1507     const auto before = getState();
1508 
1509     // We're waiting for the next command to arrive from the client
1510     // and we've got a server event to process. Let's start
1511     // processing the server events (which might toggle our state)
1512     if (server_events.front()->execute(*this)) {
1513         server_events.pop();
1514     }
1515 
1516     return getState() != before;
1517 }
1518 
runEventLoop(short which)1519 void Connection::runEventLoop(short which) {
1520     conn_loan_buffers(this);
1521     currentEvent = which;
1522     numEvents = max_reqs_per_event;
1523 
1524     try {
1525         runStateMachinery();
1526     } catch (const std::exception& e) {
1527         setTerminationReason(std::string("Received exception: ") + e.what());
1528         bool logged = false;
1529         if (getState() == StateMachine::State::execute ||
1530             getState() == StateMachine::State::validate) {
1531             try {
1532                 // Converting the cookie to json -> string could probably
1533                 // cause too much memory allcation. We don't want that to
1534                 // cause us to crash..
1535                 std::stringstream ss;
1536                 nlohmann::json array = nlohmann::json::array();
1537                 for (const auto& cookie : cookies) {
1538                     if (cookie) {
1539                         try {
1540                             array.push_back(cookie->toJSON());
1541                         } catch (const std::exception&) {
1542                             // ignore
1543                         }
1544                     }
1545                 }
1546                 LOG_ERROR(
1547                         R"({}: exception occurred in runloop during packet execution. Cookie info: {} - closing connection ({}): {})",
1548                         getId(),
1549                         array.dump(),
1550                         getDescription(),
1551                         e.what());
1552                 logged = true;
1553             } catch (const std::bad_alloc&) {
1554                 // none
1555             }
1556         }
1557 
1558         if (!logged) {
1559             try {
1560                 LOG_ERROR(
1561                         R"({}: exception occurred in runloop (state: "{}") - closing connection ({}): {})",
1562                         getId(),
1563                         getStateName(),
1564                         getDescription(),
1565                         e.what());
1566             } catch (const std::exception&) {
1567                 // Ditch logging.. just shut down the connection
1568             }
1569         }
1570 
1571         setState(StateMachine::State::closing);
1572         /*
1573          * In addition to setting the state to conn_closing
1574          * we need to move execution foward by executing
1575          * conn_closing() and the subsequent functions
1576          * i.e. conn_pending_close() or conn_immediate_close()
1577          */
1578         try {
1579             runStateMachinery();
1580         } catch (const std::exception& e) {
1581             try {
1582                 LOG_ERROR(
1583                         R"({}: exception occurred in runloop whilst attempting to close connection ({}): {})",
1584                         getId(),
1585                         getDescription(),
1586                         e.what());
1587             } catch (const std::exception&) {
1588                 // Drop logging
1589             }
1590         }
1591     }
1592 
1593     conn_return_buffers(this);
1594 }
1595 
close()1596 bool Connection::close() {
1597     bool ewb = false;
1598     uint32_t rc = refcount;
1599 
1600     for (auto& cookie : cookies) {
1601         if (cookie) {
1602             rc += cookie->getRefcount();
1603             if (cookie->isEwouldblock()) {
1604                 ewb = true;
1605             } else {
1606                 cookie->reset();
1607             }
1608         }
1609     }
1610 
1611     if (getState() == StateMachine::State::closing) {
1612         // We don't want any network notifications anymore..
1613         if (registered_in_libevent) {
1614             unregisterEvent();
1615         }
1616 
1617         // Shut down the read end of the socket to avoid more data
1618         // to arrive
1619         shutdown(socketDescriptor, SHUT_RD);
1620 
1621         // Release all reserved items!
1622         releaseReservedItems();
1623     }
1624 
1625     // Notify interested parties that the connection is currently being
1626     // disconnected
1627     propagateDisconnect();
1628 
1629     if (isDCP()) {
1630         // DCP channels work a bit different.. they use the refcount
1631         // to track if it has a reference in the engine
1632         ewb = false;
1633     }
1634 
1635     if (rc > 1 || ewb) {
1636         setState(StateMachine::State::pending_close);
1637         return false;
1638     }
1639     setState(StateMachine::State::immediate_close);
1640     return true;
1641 }
1642 
propagateDisconnect() const1643 void Connection::propagateDisconnect() const {
1644     for (auto& cookie : cookies) {
1645         if (cookie) {
1646             perform_callbacks(ON_DISCONNECT, nullptr, cookie.get());
1647         }
1648     }
1649 }
1650 
signalIfIdle()1651 bool Connection::signalIfIdle() {
1652     for (const auto& c : cookies) {
1653         if (c->isEwouldblock()) {
1654             return false;
1655         }
1656     }
1657 
1658     if (stateMachine.isIdleState()) {
1659         thread.notification.push(this);
1660         notify_thread(thread);
1661         return true;
1662     }
1663 
1664     return false;
1665 }
1666 
setPriority(Connection::Priority priority)1667 void Connection::setPriority(Connection::Priority priority) {
1668     Connection::priority.store(priority);
1669     switch (priority) {
1670     case Priority::High:
1671         max_reqs_per_event =
1672                 Settings::instance().getRequestsPerEventNotification(
1673                         EventPriority::High);
1674         return;
1675     case Priority::Medium:
1676         max_reqs_per_event =
1677                 Settings::instance().getRequestsPerEventNotification(
1678                         EventPriority::Medium);
1679         return;
1680     case Priority::Low:
1681         max_reqs_per_event =
1682                 Settings::instance().getRequestsPerEventNotification(
1683                         EventPriority::Low);
1684         return;
1685     }
1686     throw std::invalid_argument("Unkown priority: " +
1687                                 std::to_string(int(priority)));
1688 }
1689 
selectedBucketIsXattrEnabled() const1690 bool Connection::selectedBucketIsXattrEnabled() const {
1691     auto* bucketEngine = getBucketEngine();
1692     if (bucketEngine) {
1693         return Settings::instance().isXattrEnabled() &&
1694                bucketEngine->isXattrEnabled();
1695     }
1696     return Settings::instance().isXattrEnabled();
1697 }
1698 
add_packet_to_send_pipe( cb::const_byte_buffer packet)1699 ENGINE_ERROR_CODE Connection::add_packet_to_send_pipe(
1700         cb::const_byte_buffer packet) {
1701     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1702     write->produce([this, packet, &ret](cb::byte_buffer buffer) -> size_t {
1703         if (buffer.size() < packet.size()) {
1704             ret = ENGINE_E2BIG;
1705             return 0;
1706         }
1707 
1708         std::copy(packet.begin(), packet.end(), buffer.begin());
1709         addIov(buffer.data(), packet.size());
1710         return packet.size();
1711     });
1712 
1713     return ret;
1714 }
1715 
1716 ////////////////////////////////////////////////////////////////////////////
1717 //                                                                        //
1718 //                   DCP Message producer interface                       //
1719 //                                                                        //
1720 ////////////////////////////////////////////////////////////////////////////
1721 
get_failover_log(uint32_t opaque, Vbid vbucket)1722 ENGINE_ERROR_CODE Connection::get_failover_log(uint32_t opaque, Vbid vbucket) {
1723     cb::mcbp::Request req = {};
1724     req.setMagic(cb::mcbp::Magic::ClientRequest);
1725     req.setOpcode(cb::mcbp::ClientOpcode::DcpGetFailoverLog);
1726     req.setOpaque(opaque);
1727     req.setVBucket(vbucket);
1728 
1729     return add_packet_to_send_pipe(req.getFrame());
1730 }
1731 
stream_req(uint32_t opaque, Vbid vbucket, uint32_t flags, uint64_t start_seqno, uint64_t end_seqno, uint64_t vbucket_uuid, uint64_t snap_start_seqno, uint64_t snap_end_seqno, const std::string& request_value)1732 ENGINE_ERROR_CODE Connection::stream_req(uint32_t opaque,
1733                                          Vbid vbucket,
1734                                          uint32_t flags,
1735                                          uint64_t start_seqno,
1736                                          uint64_t end_seqno,
1737                                          uint64_t vbucket_uuid,
1738                                          uint64_t snap_start_seqno,
1739                                          uint64_t snap_end_seqno,
1740                                          const std::string& request_value) {
1741     using Framebuilder = cb::mcbp::FrameBuilder<cb::mcbp::Request>;
1742     using cb::mcbp::Request;
1743     using cb::mcbp::request::DcpStreamReqPayload;
1744 
1745     auto size = sizeof(Request) + sizeof(DcpStreamReqPayload) +
1746                 request_value.size();
1747 
1748     std::vector<uint8_t> buffer(size);
1749 
1750     Framebuilder builder({buffer.data(), buffer.size()});
1751     builder.setMagic(cb::mcbp::Magic::ClientRequest);
1752     builder.setOpcode(cb::mcbp::ClientOpcode::DcpStreamReq);
1753     builder.setOpaque(opaque);
1754     builder.setVBucket(vbucket);
1755 
1756     DcpStreamReqPayload payload;
1757     payload.setFlags(flags);
1758     payload.setStartSeqno(start_seqno);
1759     payload.setEndSeqno(end_seqno);
1760     payload.setVbucketUuid(vbucket_uuid);
1761     payload.setSnapStartSeqno(snap_start_seqno);
1762     payload.setSnapEndSeqno(snap_end_seqno);
1763 
1764     builder.setExtras(
1765             {reinterpret_cast<const uint8_t*>(&payload), sizeof(payload)});
1766 
1767     if (request_value.empty()) {
1768         builder.setValue(request_value);
1769     }
1770 
1771     return add_packet_to_send_pipe(builder.getFrame()->getFrame());
1772 }
1773 
add_stream_rsp(uint32_t opaque, uint32_t dialogopaque, cb::mcbp::Status status)1774 ENGINE_ERROR_CODE Connection::add_stream_rsp(uint32_t opaque,
1775                                              uint32_t dialogopaque,
1776                                              cb::mcbp::Status status) {
1777     cb::mcbp::response::DcpAddStreamPayload extras;
1778     extras.setOpaque(dialogopaque);
1779     uint8_t buffer[sizeof(cb::mcbp::Response) + sizeof(extras)];
1780     cb::mcbp::ResponseBuilder builder({buffer, sizeof(buffer)});
1781     builder.setMagic(cb::mcbp::Magic::ClientResponse);
1782     builder.setOpcode(cb::mcbp::ClientOpcode::DcpAddStream);
1783     builder.setStatus(status);
1784     builder.setOpaque(opaque);
1785     builder.setExtras(extras.getBuffer());
1786 
1787     return add_packet_to_send_pipe(builder.getFrame()->getFrame());
1788 }
1789 
marker_rsp(uint32_t opaque, cb::mcbp::Status status)1790 ENGINE_ERROR_CODE Connection::marker_rsp(uint32_t opaque,
1791                                          cb::mcbp::Status status) {
1792     cb::mcbp::Response response{};
1793     response.setMagic(cb::mcbp::Magic::ClientResponse);
1794     response.setOpcode(cb::mcbp::ClientOpcode::DcpSnapshotMarker);
1795     response.setExtlen(0);
1796     response.setStatus(status);
1797     response.setBodylen(0);
1798     response.setOpaque(opaque);
1799 
1800     return add_packet_to_send_pipe(
1801             {reinterpret_cast<const uint8_t*>(&response), sizeof(response)});
1802 }
1803 
set_vbucket_state_rsp(uint32_t opaque, cb::mcbp::Status status)1804 ENGINE_ERROR_CODE Connection::set_vbucket_state_rsp(uint32_t opaque,
1805                                                     cb::mcbp::Status status) {
1806     uint8_t buffer[sizeof(cb::mcbp::Response)];
1807     cb::mcbp::ResponseBuilder builder({buffer, sizeof(buffer)});
1808     builder.setMagic(cb::mcbp::Magic::ClientResponse);
1809     builder.setOpcode(cb::mcbp::ClientOpcode::DcpSetVbucketState);
1810     builder.setStatus(status);
1811     builder.setOpaque(opaque);
1812 
1813     return add_packet_to_send_pipe(builder.getFrame()->getFrame());
1814 }
1815 
stream_end(uint32_t opaque, Vbid vbucket, uint32_t flags, cb::mcbp::DcpStreamId sid)1816 ENGINE_ERROR_CODE Connection::stream_end(uint32_t opaque,
1817                                          Vbid vbucket,
1818                                          uint32_t flags,
1819                                          cb::mcbp::DcpStreamId sid) {
1820     using Framebuilder = cb::mcbp::FrameBuilder<cb::mcbp::Request>;
1821     using cb::mcbp::Request;
1822     using cb::mcbp::request::DcpStreamEndPayload;
1823     uint8_t buffer[sizeof(Request) + sizeof(DcpStreamEndPayload) +
1824                    sizeof(cb::mcbp::DcpStreamIdFrameInfo)];
1825 
1826     Framebuilder builder({buffer, sizeof(buffer)});
1827     builder.setMagic(sid ? cb::mcbp::Magic::AltClientRequest
1828                          : cb::mcbp::Magic::ClientRequest);
1829     builder.setOpcode(cb::mcbp::ClientOpcode::DcpStreamEnd);
1830     builder.setOpaque(opaque);
1831     builder.setVBucket(vbucket);
1832 
1833     DcpStreamEndPayload payload;
1834     payload.setFlags(flags);
1835 
1836     builder.setExtras(
1837             {reinterpret_cast<const uint8_t*>(&payload), sizeof(payload)});
1838 
1839     if (sid) {
1840         cb::mcbp::DcpStreamIdFrameInfo framedSid(sid);
1841         builder.setFramingExtras(framedSid.getBuf());
1842     }
1843 
1844     return add_packet_to_send_pipe(builder.getFrame()->getFrame());
1845 }
1846 
marker(uint32_t opaque, Vbid vbucket, uint64_t start_seqno, uint64_t end_seqno, uint32_t flags, boost::optional<uint64_t> hcs, boost::optional<uint64_t> mvs, cb::mcbp::DcpStreamId sid)1847 ENGINE_ERROR_CODE Connection::marker(uint32_t opaque,
1848                                      Vbid vbucket,
1849                                      uint64_t start_seqno,
1850                                      uint64_t end_seqno,
1851                                      uint32_t flags,
1852                                      boost::optional<uint64_t> hcs,
1853                                      boost::optional<uint64_t> mvs,
1854                                      cb::mcbp::DcpStreamId sid) {
1855     using Framebuilder = cb::mcbp::FrameBuilder<cb::mcbp::Request>;
1856     using cb::mcbp::Request;
1857     using cb::mcbp::request::DcpSnapshotMarkerV1Payload;
1858     using cb::mcbp::request::DcpSnapshotMarkerV2_0Value;
1859     using cb::mcbp::request::DcpSnapshotMarkerV2xPayload;
1860 
1861     // Allocate the buffer to be big enough for all cases, which will be the
1862     // v2.0 packet
1863     const auto size = sizeof(Request) + sizeof(cb::mcbp::DcpStreamIdFrameInfo) +
1864                       sizeof(DcpSnapshotMarkerV2xPayload) +
1865                       sizeof(DcpSnapshotMarkerV2_0Value);
1866     std::vector<uint8_t> buffer(size);
1867 
1868     Framebuilder builder({buffer.data(), buffer.size()});
1869     builder.setMagic(sid ? cb::mcbp::Magic::AltClientRequest
1870                          : cb::mcbp::Magic::ClientRequest);
1871     builder.setOpcode(cb::mcbp::ClientOpcode::DcpSnapshotMarker);
1872     builder.setOpaque(opaque);
1873     builder.setVBucket(vbucket);
1874 
1875     if (sid) {
1876         cb::mcbp::DcpStreamIdFrameInfo framedSid(sid);
1877         builder.setFramingExtras(framedSid.getBuf());
1878     }
1879 
1880     cb::mcbp::encodeDcpSnapshotMarker(
1881             builder, start_seqno, end_seqno, flags, hcs, mvs);
1882 
1883     return add_packet_to_send_pipe(builder.getFrame()->getFrame());
1884 }
1885 
mutation(uint32_t opaque, cb::unique_item_ptr it, Vbid vbucket, uint64_t by_seqno, uint64_t rev_seqno, uint32_t lock_time, uint8_t nru, cb::mcbp::DcpStreamId sid)1886 ENGINE_ERROR_CODE Connection::mutation(uint32_t opaque,
1887                                        cb::unique_item_ptr it,
1888                                        Vbid vbucket,
1889                                        uint64_t by_seqno,
1890                                        uint64_t rev_seqno,
1891                                        uint32_t lock_time,
1892                                        uint8_t nru,
1893                                        cb::mcbp::DcpStreamId sid) {
1894     item_info info;
1895     if (!bucket_get_item_info(*this, it.get(), &info)) {
1896         LOG_WARNING("{}: Failed to get item info", getId());
1897         return ENGINE_FAILED;
1898     }
1899 
1900     char* root = reinterpret_cast<char*>(info.value[0].iov_base);
1901     cb::char_buffer value{root, info.value[0].iov_len};
1902 
1903     auto key = info.key;
1904     // The client doesn't support collections, so must not send an encoded key
1905     if (!isCollectionsSupported()) {
1906         key = key.makeDocKeyWithoutCollectionID();
1907     }
1908 
1909     cb::mcbp::request::DcpMutationPayload extras(
1910             by_seqno,
1911             rev_seqno,
1912             info.flags,
1913             gsl::narrow<uint32_t>(info.exptime),
1914             lock_time,
1915             nru);
1916 
1917     cb::mcbp::DcpStreamIdFrameInfo frameExtras(sid);
1918 
1919     const auto total = sizeof(extras) + key.size() + value.size() +
1920                        (sid ? sizeof(cb::mcbp::DcpStreamIdFrameInfo) : 0) +
1921                        sizeof(cb::mcbp::Request);
1922     if (dcpUseWriteBuffer(total)) {
1923         cb::mcbp::RequestBuilder builder(write->wdata());
1924         builder.setMagic(sid ? cb::mcbp::Magic::AltClientRequest
1925                              : cb::mcbp::Magic::ClientRequest);
1926         builder.setOpcode(cb::mcbp::ClientOpcode::DcpMutation);
1927         if (sid) {
1928             builder.setFramingExtras(frameExtras.getBuf());
1929         }
1930         builder.setExtras(extras.getBuffer());
1931         builder.setKey({key.data(), key.size()});
1932         builder.setValue(value);
1933         builder.setOpaque(opaque);
1934         builder.setVBucket(vbucket);
1935         builder.setCas(info.cas);
1936         builder.setDatatype(cb::mcbp::Datatype(info.datatype));
1937 
1938         auto packet = builder.getFrame()->getFrame();
1939         addIov(packet.data(), packet.size());
1940         write->produced(packet.size());
1941         return ENGINE_SUCCESS;
1942     }
1943 
1944     if (!reserveItem(it.get())) {
1945         LOG_WARNING("{}: Failed to grow item array", getId());
1946         return ENGINE_FAILED;
1947     }
1948 
1949     // we've reserved the item, and it'll be released when we're done sending
1950     // the item.
1951     it.release();
1952 
1953     cb::mcbp::Request req = {};
1954     req.setMagic(sid ? cb::mcbp::Magic::AltClientRequest
1955                      : cb::mcbp::Magic::ClientRequest);
1956     req.setOpcode(cb::mcbp::ClientOpcode::DcpMutation);
1957     req.setExtlen(gsl::narrow<uint8_t>(sizeof(extras)));
1958     req.setKeylen(gsl::narrow<uint16_t>(key.size()));
1959     req.setBodylen(gsl::narrow<uint32_t>(
1960             sizeof(extras) + key.size() + value.size() +
1961             (sid ? sizeof(cb::mcbp::DcpStreamIdFrameInfo) : 0)));
1962     req.setOpaque(opaque);
1963     req.setVBucket(vbucket);
1964     req.setCas(info.cas);
1965     req.setDatatype(cb::mcbp::Datatype(info.datatype));
1966 
1967     if (sid) {
1968         req.setFramingExtraslen(sizeof(cb::mcbp::DcpStreamIdFrameInfo));
1969     }
1970 
1971     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1972     write->produce([this, &req, &frameExtras, &extras, &value, &ret, &key, sid](
1973                            cb::byte_buffer wbuf) -> size_t {
1974         size_t headerSize = sizeof(extras) + sizeof(req);
1975         if (sid) {
1976             headerSize += sizeof(frameExtras);
1977         }
1978         if (wbuf.size() < headerSize) {
1979             ret = ENGINE_E2BIG;
1980             return 0;
1981         }
1982 
1983         auto nextWbuf = std::copy_n(reinterpret_cast<const uint8_t*>(&req),
1984                                     sizeof(req),
1985                                     wbuf.begin());
1986 
1987         if (sid) {
1988             // Add the optional stream-ID
1989             nextWbuf =
1990                     std::copy_n(reinterpret_cast<const uint8_t*>(&frameExtras),
1991                                 sizeof(frameExtras),
1992                                 nextWbuf);
1993         }
1994 
1995         nextWbuf = std::copy_n(reinterpret_cast<const uint8_t*>(&extras),
1996                                sizeof(extras),
1997                                nextWbuf);
1998 
1999         // Add the header (which includes extras, optional frame-extra and
2000         // optional nmeta)
2001         addIov(wbuf.data(), headerSize);
2002 
2003         // Add the key
2004         addIov(key.data(), key.size());
2005 
2006         // Add the value
2007         addIov(value.data(), value.size());
2008 
2009         return headerSize;
2010     });
2011 
2012     return ret;
2013 }
2014 
deletionInner(const item_info& info, cb::const_byte_buffer packet, const DocKey& key)2015 ENGINE_ERROR_CODE Connection::deletionInner(const item_info& info,
2016                                             cb::const_byte_buffer packet,
2017                                             const DocKey& key) {
2018     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2019     write->produce([this, &packet, &info, &ret, &key](
2020                            cb::byte_buffer buffer) -> size_t {
2021         if (buffer.size() <
2022             (packet.size() +
2023              cb::mcbp::unsigned_leb128<CollectionIDType>::getMaxSize())) {
2024             ret = ENGINE_E2BIG;
2025             return 0;
2026         }
2027 
2028         std::copy(packet.begin(), packet.end(), buffer.begin());
2029 
2030         // Add the header + collection-ID (stored in buffer)
2031         addIov(buffer.data(), packet.size());
2032 
2033         // Add the key
2034         addIov(key.data(), key.size());
2035 
2036         // Add the optional payload (xattr)
2037         if (info.nbytes > 0) {
2038             addIov(info.value[0].iov_base, info.nbytes);
2039         }
2040 
2041         return packet.size();
2042     });
2043 
2044     return ret;
2045 }
2046 
deletion(uint32_t opaque, cb::unique_item_ptr it, Vbid vbucket, uint64_t by_seqno, uint64_t rev_seqno, cb::mcbp::DcpStreamId sid)2047 ENGINE_ERROR_CODE Connection::deletion(uint32_t opaque,
2048                                        cb::unique_item_ptr it,
2049                                        Vbid vbucket,
2050                                        uint64_t by_seqno,
2051                                        uint64_t rev_seqno,
2052                                        cb::mcbp::DcpStreamId sid) {
2053     // Should be using the V2 callback
2054     if (isCollectionsSupported()) {
2055         LOG_WARNING("{}: Connection::deletion: called when collections-enabled",
2056                     getId());
2057         return ENGINE_FAILED;
2058     }
2059 
2060     item_info info;
2061     if (!bucket_get_item_info(*this, it.get(), &info)) {
2062         LOG_WARNING("{}: Connection::deletion: Failed to get item info",
2063                     getId());
2064         return ENGINE_FAILED;
2065     }
2066 
2067     auto key = info.key;
2068     if (!isCollectionsSupported()) {
2069         key = info.key.makeDocKeyWithoutCollectionID();
2070     }
2071     char* root = reinterpret_cast<char*>(info.value[0].iov_base);
2072     cb::char_buffer value{root, info.value[0].iov_len};
2073 
2074     cb::mcbp::DcpStreamIdFrameInfo frameInfo(sid);
2075     cb::mcbp::request::DcpDeletionV1Payload extdata(by_seqno, rev_seqno);
2076 
2077     const auto total = sizeof(extdata) + key.size() + value.size() +
2078                        (sid ? sizeof(cb::mcbp::DcpStreamIdFrameInfo) : 0) +
2079                        sizeof(cb::mcbp::Request);
2080 
2081     if (dcpUseWriteBuffer(total)) {
2082         cb::mcbp::RequestBuilder builder(write->wdata());
2083 
2084         builder.setMagic(sid ? cb::mcbp::Magic::AltClientRequest
2085                              : cb::mcbp::Magic::ClientRequest);
2086         builder.setOpcode(cb::mcbp::ClientOpcode::DcpDeletion);
2087         if (sid) {
2088             builder.setFramingExtras(frameInfo.getBuf());
2089         }
2090         builder.setExtras(extdata.getBuffer());
2091         builder.setKey({key.data(), key.size()});
2092         builder.setValue(value);
2093         builder.setOpaque(opaque);
2094         builder.setVBucket(vbucket);
2095         builder.setCas(info.cas);
2096         builder.setDatatype(cb::mcbp::Datatype(info.datatype));
2097 
2098         auto packet = builder.getFrame()->getFrame();
2099         addIov(packet.data(), packet.size());
2100         write->produced(packet.size());
2101         return ENGINE_SUCCESS;
2102     }
2103 
2104     if (!reserveItem(it.get())) {
2105         LOG_WARNING("{}: Connection::deletion: Failed to grow item array",
2106                     getId());
2107         return ENGINE_FAILED;
2108     }
2109 
2110     // we've reserved the item, and it'll be released when we're done sending
2111     // the item.
2112     it.release();
2113 
2114     using cb::mcbp::Request;
2115     using cb::mcbp::request::DcpDeletionV1Payload;
2116     uint8_t blob[sizeof(Request) + sizeof(DcpDeletionV1Payload) +
2117                  sizeof(cb::mcbp::DcpStreamIdFrameInfo)];
2118     auto& req = *reinterpret_cast<Request*>(blob);
2119     req.setMagic(sid ? cb::mcbp::Magic::AltClientRequest
2120                      : cb::mcbp::Magic::ClientRequest);
2121     req.setOpcode(cb::mcbp::ClientOpcode::DcpDeletion);
2122     req.setExtlen(gsl::narrow<uint8_t>(sizeof(DcpDeletionV1Payload)));
2123     req.setKeylen(gsl::narrow<uint16_t>(key.size()));
2124     req.setBodylen(gsl::narrow<uint32_t>(
2125             sizeof(DcpDeletionV1Payload) + key.size() + info.nbytes +
2126             (sid ? sizeof(cb::mcbp::DcpStreamIdFrameInfo) : 0)));
2127     req.setOpaque(opaque);
2128     req.setVBucket(vbucket);
2129     req.setCas(info.cas);
2130     req.setDatatype(cb::mcbp::Datatype(info.datatype));
2131 
2132     auto* ptr = blob + sizeof(Request);
2133     if (sid) {
2134         auto buf = frameInfo.getBuf();
2135         std::copy(buf.begin(), buf.end(), ptr);
2136         ptr += buf.size();
2137         req.setFramingExtraslen(buf.size());
2138     }
2139 
2140     std::copy(extdata.getBuffer().begin(), extdata.getBuffer().end(), ptr);
2141     cb::const_byte_buffer packetBuffer{
2142             blob,
2143             sizeof(Request) + sizeof(DcpDeletionV1Payload) +
2144                     (sid ? sizeof(cb::mcbp::DcpStreamIdFrameInfo) : 0)};
2145 
2146     return deletionInner(info, packetBuffer, key);
2147 }
2148 
deletion_v2(uint32_t opaque, cb::unique_item_ptr it, Vbid vbucket, uint64_t by_seqno, uint64_t rev_seqno, uint32_t delete_time, cb::mcbp::DcpStreamId sid)2149 ENGINE_ERROR_CODE Connection::deletion_v2(uint32_t opaque,
2150                                           cb::unique_item_ptr it,
2151                                           Vbid vbucket,
2152                                           uint64_t by_seqno,
2153                                           uint64_t rev_seqno,
2154                                           uint32_t delete_time,
2155                                           cb::mcbp::DcpStreamId sid) {
2156     item_info info;
2157     if (!bucket_get_item_info(*this, it.get(), &info)) {
2158         LOG_WARNING("{}: Connection::deletion_v2: Failed to get item info",
2159                     getId());
2160         return ENGINE_FAILED;
2161     }
2162 
2163     auto key = info.key;
2164     if (!isCollectionsSupported()) {
2165         key = info.key.makeDocKeyWithoutCollectionID();
2166     }
2167 
2168     cb::mcbp::request::DcpDeletionV2Payload extras(
2169             by_seqno, rev_seqno, delete_time);
2170     cb::mcbp::DcpStreamIdFrameInfo frameInfo(sid);
2171     char* root = reinterpret_cast<char*>(info.value[0].iov_base);
2172     cb::char_buffer value{root, info.value[0].iov_len};
2173 
2174     const auto total = sizeof(extras) + key.size() + value.size() +
2175                        (sid ? sizeof(cb::mcbp::DcpStreamIdFrameInfo) : 0) +
2176                        sizeof(cb::mcbp::Request);
2177 
2178     if (dcpUseWriteBuffer(total)) {
2179         cb::mcbp::RequestBuilder builder(write->wdata());
2180         builder.setMagic(sid ? cb::mcbp::Magic::AltClientRequest
2181                              : cb::mcbp::Magic::ClientRequest);
2182         builder.setOpcode(cb::mcbp::ClientOpcode::DcpDeletion);
2183         if (sid) {
2184             builder.setFramingExtras(frameInfo.getBuf());
2185         }
2186         builder.setExtras(extras.getBuffer());
2187         builder.setKey({key.data(), key.size()});
2188         builder.setValue(value);
2189         builder.setOpaque(opaque);
2190         builder.setVBucket(vbucket);
2191         builder.setCas(info.cas);
2192         builder.setDatatype(cb::mcbp::Datatype(info.datatype));
2193 
2194         auto packet = builder.getFrame()->getFrame();
2195         addIov(packet.data(), packet.size());
2196         write->produced(packet.size());
2197         return ENGINE_SUCCESS;
2198     }
2199 
2200     if (!reserveItem(it.get())) {
2201         LOG_WARNING("{}: Connection::deletion_v2: Failed to grow item array",
2202                     getId());
2203         return ENGINE_FAILED;
2204     }
2205 
2206     // we've reserved the item, and it'll be released when we're done sending
2207     // the item.
2208     it.release();
2209 
2210     // Make blob big enough for either delete or expiry
2211     uint8_t blob[sizeof(cb::mcbp::Request) + sizeof(extras) +
2212                  sizeof(frameInfo)] = {};
2213     const size_t payloadLen = sizeof(extras);
2214     const size_t frameInfoLen = sid ? sizeof(frameInfo) : 0;
2215 
2216     auto& req = *reinterpret_cast<cb::mcbp::Request*>(blob);
2217     req.setMagic(sid ? cb::mcbp::Magic::AltClientRequest
2218                      : cb::mcbp::Magic::ClientRequest);
2219 
2220     req.setOpcode(cb::mcbp::ClientOpcode::DcpDeletion);
2221     req.setExtlen(gsl::narrow<uint8_t>(payloadLen));
2222     req.setKeylen(gsl::narrow<uint16_t>(key.size()));
2223     req.setBodylen(gsl::narrow<uint32_t>(payloadLen +
2224                                          gsl::narrow<uint16_t>(key.size()) +
2225                                          info.nbytes + frameInfoLen));
2226     req.setOpaque(opaque);
2227     req.setVBucket(vbucket);
2228     req.setCas(info.cas);
2229     req.setDatatype(cb::mcbp::Datatype(info.datatype));
2230     auto size = sizeof(cb::mcbp::Request);
2231     auto* ptr = blob + size;
2232     if (sid) {
2233         auto buf = frameInfo.getBuf();
2234         std::copy(buf.begin(), buf.end(), ptr);
2235         ptr += buf.size();
2236         size += buf.size();
2237     }
2238 
2239     auto buffer = extras.getBuffer();
2240     std::copy(buffer.begin(), buffer.end(), ptr);
2241     size += buffer.size();
2242 
2243     return deletionInner(info, {blob, size}, key);
2244 }
2245 
expiration(uint32_t opaque, cb::unique_item_ptr it, Vbid vbucket, uint64_t by_seqno, uint64_t rev_seqno, uint32_t delete_time, cb::mcbp::DcpStreamId sid)2246 ENGINE_ERROR_CODE Connection::expiration(uint32_t opaque,
2247                                          cb::unique_item_ptr it,
2248                                          Vbid vbucket,
2249                                          uint64_t by_seqno,
2250                                          uint64_t rev_seqno,
2251                                          uint32_t delete_time,
2252                                          cb::mcbp::DcpStreamId sid) {
2253     item_info info;
2254     if (!bucket_get_item_info(*this, it.get(), &info)) {
2255         LOG_WARNING("{}: Connection::expiration: Failed to get item info",
2256                     getId());
2257         return ENGINE_FAILED;
2258     }
2259 
2260     auto key = info.key;
2261     if (!isCollectionsSupported()) {
2262         key = info.key.makeDocKeyWithoutCollectionID();
2263     }
2264 
2265     cb::mcbp::request::DcpExpirationPayload extras(
2266             by_seqno, rev_seqno, delete_time);
2267     cb::mcbp::DcpStreamIdFrameInfo frameInfo(sid);
2268     char* root = reinterpret_cast<char*>(info.value[0].iov_base);
2269     cb::char_buffer value{root, info.value[0].iov_len};
2270 
2271     const auto total = sizeof(extras) + key.size() + value.size() +
2272                        (sid ? sizeof(cb::mcbp::DcpStreamIdFrameInfo) : 0) +
2273                        sizeof(cb::mcbp::Request);
2274 
2275     if (dcpUseWriteBuffer(total)) {
2276         cb::mcbp::RequestBuilder builder(write->wdata());
2277         builder.setMagic(sid ? cb::mcbp::Magic::AltClientRequest
2278                              : cb::mcbp::Magic::ClientRequest);
2279         builder.setOpcode(cb::mcbp::ClientOpcode::DcpExpiration);
2280         if (sid) {
2281             builder.setFramingExtras(frameInfo.getBuf());
2282         }
2283         builder.setExtras(extras.getBuffer());
2284         builder.setKey({key.data(), key.size()});
2285         builder.setValue(value);
2286         builder.setOpaque(opaque);
2287         builder.setVBucket(vbucket);
2288         builder.setCas(info.cas);
2289         builder.setDatatype(cb::mcbp::Datatype(info.datatype));
2290 
2291         auto packet = builder.getFrame()->getFrame();
2292         addIov(packet.data(), packet.size());
2293         write->produced(packet.size());
2294         return ENGINE_SUCCESS;
2295     }
2296 
2297     if (!reserveItem(it.get())) {
2298         LOG_WARNING("{}: Connection::expiration: Failed to grow item array",
2299                     getId());
2300         return ENGINE_FAILED;
2301     }
2302 
2303     // we've reserved the item, and it'll be released when we're done sending
2304     // the item.
2305     it.release();
2306 
2307     // Make blob big enough for either delete or expiry
2308     uint8_t blob[sizeof(cb::mcbp::Request) + sizeof(extras) +
2309                  sizeof(frameInfo)] = {};
2310     const size_t payloadLen = sizeof(extras);
2311     const size_t frameInfoLen = sid ? sizeof(frameInfo) : 0;
2312 
2313     auto& req = *reinterpret_cast<cb::mcbp::Request*>(blob);
2314     req.setMagic(sid ? cb::mcbp::Magic::AltClientRequest
2315                      : cb::mcbp::Magic::ClientRequest);
2316 
2317     req.setOpcode(cb::mcbp::ClientOpcode::DcpExpiration);
2318     req.setExtlen(gsl::narrow<uint8_t>(payloadLen));
2319     req.setKeylen(gsl::narrow<uint16_t>(key.size()));
2320     req.setBodylen(gsl::narrow<uint32_t>(payloadLen +
2321                                          gsl::narrow<uint16_t>(key.size()) +
2322                                          info.nbytes + frameInfoLen));
2323     req.setOpaque(opaque);
2324     req.setVBucket(vbucket);
2325     req.setCas(info.cas);
2326     req.setDatatype(cb::mcbp::Datatype(info.datatype));
2327     auto size = sizeof(cb::mcbp::Request);
2328     auto* ptr = blob + size;
2329     if (sid) {
2330         auto buf = frameInfo.getBuf();
2331         std::copy(buf.begin(), buf.end(), ptr);
2332         ptr += buf.size();
2333         size += buf.size();
2334     }
2335 
2336     auto buffer = extras.getBuffer();
2337     std::copy(buffer.begin(), buffer.end(), ptr);
2338     size += buffer.size();
2339 
2340     return deletionInner(info, {blob, size}, key);
2341 }
2342 
set_vbucket_state(uint32_t opaque, Vbid vbucket, vbucket_state_t state)2343 ENGINE_ERROR_CODE Connection::set_vbucket_state(uint32_t opaque,
2344                                                 Vbid vbucket,
2345                                                 vbucket_state_t state) {
2346     if (!is_valid_vbucket_state_t(state)) {
2347         return ENGINE_EINVAL;
2348     }
2349 
2350     cb::mcbp::request::DcpSetVBucketState extras;
2351     extras.setState(static_cast<uint8_t>(state));
2352     uint8_t buffer[sizeof(cb::mcbp::Request) + sizeof(extras)];
2353     cb::mcbp::RequestBuilder builder({buffer, sizeof(buffer)});
2354     builder.setMagic(cb::mcbp::Magic::ClientRequest);
2355     builder.setOpcode(cb::mcbp::ClientOpcode::DcpSetVbucketState);
2356     builder.setOpaque(opaque);
2357     builder.setVBucket(vbucket);
2358     builder.setExtras(extras.getBuffer());
2359 
2360     return add_packet_to_send_pipe(builder.getFrame()->getFrame());
2361 }
2362 
noop(uint32_t opaque)2363 ENGINE_ERROR_CODE Connection::noop(uint32_t opaque) {
2364     uint8_t buffer[sizeof(cb::mcbp::Request)];
2365     cb::mcbp::RequestBuilder builder({buffer, sizeof(buffer)});
2366     builder.setMagic(cb::mcbp::Magic::ClientRequest);
2367     builder.setOpcode(cb::mcbp::ClientOpcode::DcpNoop);
2368     builder.setOpaque(opaque);
2369 
2370     return add_packet_to_send_pipe(builder.getFrame()->getFrame());
2371 }
2372 
buffer_acknowledgement(uint32_t opaque, Vbid vbucket, uint32_t buffer_bytes)2373 ENGINE_ERROR_CODE Connection::buffer_acknowledgement(uint32_t opaque,
2374                                                      Vbid vbucket,
2375                                                      uint32_t buffer_bytes) {
2376     cb::mcbp::request::DcpBufferAckPayload extras;
2377     extras.setBufferBytes(buffer_bytes);
2378     uint8_t buffer[sizeof(cb::mcbp::Request) + sizeof(extras)];
2379     cb::mcbp::RequestBuilder builder({buffer, sizeof(buffer)});
2380     builder.setMagic(cb::mcbp::Magic::ClientRequest);
2381     builder.setOpcode(cb::mcbp::ClientOpcode::DcpBufferAcknowledgement);
2382     builder.setOpaque(opaque);
2383     builder.setVBucket(vbucket);
2384     builder.setExtras(extras.getBuffer());
2385 
2386     return add_packet_to_send_pipe(builder.getFrame()->getFrame());
2387 }
2388 
control(uint32_t opaque, cb::const_char_buffer key, cb::const_char_buffer value)2389 ENGINE_ERROR_CODE Connection::control(uint32_t opaque,
2390                                       cb::const_char_buffer key,
2391                                       cb::const_char_buffer value) {
2392     std::vector<uint8_t> buffer;
2393     buffer.resize(sizeof(cb::mcbp::Request) + key.size() + value.size());
2394     cb::mcbp::RequestBuilder builder({buffer.data(), buffer.size()});
2395 
2396     builder.setMagic(cb::mcbp::Magic::ClientRequest);
2397     builder.setOpcode(cb::mcbp::ClientOpcode::DcpControl);
2398     builder.setOpaque(opaque);
2399     builder.setKey({reinterpret_cast<const uint8_t*>(key.data()), key.size()});
2400     builder.setValue(
2401             {reinterpret_cast<const uint8_t*>(value.data()), value.size()});
2402     return add_packet_to_send_pipe(builder.getFrame()->getFrame());
2403 }
2404 
system_event(uint32_t opaque, Vbid vbucket, mcbp::systemevent::id event, uint64_t bySeqno, mcbp::systemevent::version version, cb::const_byte_buffer key, cb::const_byte_buffer eventData, cb::mcbp::DcpStreamId sid)2405 ENGINE_ERROR_CODE Connection::system_event(uint32_t opaque,
2406                                            Vbid vbucket,
2407                                            mcbp::systemevent::id event,
2408                                            uint64_t bySeqno,
2409                                            mcbp::systemevent::version version,
2410                                            cb::const_byte_buffer key,
2411                                            cb::const_byte_buffer eventData,
2412                                            cb::mcbp::DcpStreamId sid) {
2413     cb::mcbp::request::DcpSystemEventPayload extras(bySeqno, event, version);
2414     std::vector<uint8_t> buffer;
2415     buffer.resize(sizeof(cb::mcbp::Request) + sizeof(extras) + key.size() +
2416                   eventData.size() + sizeof(cb::mcbp::DcpStreamIdFrameInfo));
2417     cb::mcbp::RequestBuilder builder({buffer.data(), buffer.size()});
2418 
2419     builder.setMagic(sid ? cb::mcbp::Magic::AltClientRequest
2420                          : cb::mcbp::Magic::ClientRequest);
2421     builder.setOpcode(cb::mcbp::ClientOpcode::DcpSystemEvent);
2422     builder.setOpaque(opaque);
2423     builder.setVBucket(vbucket);
2424     builder.setDatatype(cb::mcbp::Datatype::Raw);
2425     builder.setExtras(extras.getBuffer());
2426     if (sid) {
2427         cb::mcbp::DcpStreamIdFrameInfo framedSid(sid);
2428         builder.setFramingExtras(framedSid.getBuf());
2429     }
2430     builder.setKey(key);
2431     builder.setValue(eventData);
2432 
2433     return add_packet_to_send_pipe(builder.getFrame()->getFrame());
2434 }
2435 
get_error_map(uint32_t opaque, uint16_t version)2436 ENGINE_ERROR_CODE Connection::get_error_map(uint32_t opaque, uint16_t version) {
2437     cb::mcbp::request::GetErrmapPayload body;
2438     body.setVersion(version);
2439     uint8_t buffer[sizeof(cb::mcbp::Request) + sizeof(body)];
2440     cb::mcbp::RequestBuilder builder({buffer, sizeof(buffer)});
2441     builder.setMagic(cb::mcbp::Magic::ClientRequest);
2442     builder.setOpcode(cb::mcbp::ClientOpcode::GetErrorMap);
2443     builder.setOpaque(opaque);
2444     builder.setValue(body.getBuffer());
2445 
2446     return add_packet_to_send_pipe(builder.getFrame()->getFrame());
2447 }
2448 
prepare(uint32_t opaque, cb::unique_item_ptr it, Vbid vbucket, uint64_t by_seqno, uint64_t rev_seqno, uint32_t lock_time, uint8_t nru, DocumentState document_state, cb::durability::Level level)2449 ENGINE_ERROR_CODE Connection::prepare(uint32_t opaque,
2450                                       cb::unique_item_ptr it,
2451                                       Vbid vbucket,
2452                                       uint64_t by_seqno,
2453                                       uint64_t rev_seqno,
2454                                       uint32_t lock_time,
2455                                       uint8_t nru,
2456                                       DocumentState document_state,
2457                                       cb::durability::Level level) {
2458     item_info info;
2459     if (!bucket_get_item_info(*this, it.get(), &info)) {
2460         LOG_WARNING("{}: Connection::prepare: Failed to get item info",
2461                     getId());
2462         return ENGINE_FAILED;
2463     }
2464 
2465     char* root = reinterpret_cast<char*>(info.value[0].iov_base);
2466     cb::char_buffer buffer{root, info.value[0].iov_len};
2467 
2468     auto key = info.key;
2469 
2470     // The client doesn't support collections, so must not send an encoded key
2471     if (!isCollectionsSupported()) {
2472         key = key.makeDocKeyWithoutCollectionID();
2473     }
2474 
2475     cb::mcbp::request::DcpPreparePayload extras(
2476             by_seqno,
2477             rev_seqno,
2478             info.flags,
2479             gsl::narrow<uint32_t>(info.exptime),
2480             lock_time,
2481             nru);
2482     if (document_state == DocumentState::Deleted) {
2483         extras.setDeleted(uint8_t(1));
2484     }
2485     extras.setDurabilityLevel(level);
2486 
2487     size_t total = sizeof(extras) + key.size() + buffer.size() +
2488                    sizeof(cb::mcbp::Request);
2489     if (dcpUseWriteBuffer(total)) {
2490         // Format a local copy and send
2491         cb::mcbp::RequestBuilder builder(write->wdata());
2492         builder.setMagic(cb::mcbp::Magic::ClientRequest);
2493         builder.setOpcode(cb::mcbp::ClientOpcode::DcpPrepare);
2494         builder.setExtras(extras.getBuffer());
2495         builder.setKey({key.data(), key.size()});
2496         builder.setOpaque(opaque);
2497         builder.setVBucket(vbucket);
2498         builder.setCas(info.cas);
2499         builder.setDatatype(cb::mcbp::Datatype(info.datatype));
2500         builder.setValue(buffer);
2501 
2502         auto packet = builder.getFrame()->getFrame();
2503         addIov(packet.data(), packet.size());
2504         write->produced(packet.size());
2505         return ENGINE_SUCCESS;
2506     }
2507 
2508     // Use an IO vector instead
2509     if (!reserveItem(it.get())) {
2510         LOG_WARNING("{}: Connection::prepare: Failed to grow item array",
2511                     getId());
2512         return ENGINE_FAILED;
2513     }
2514 
2515     // we've reserved the item, and it'll be released when we're done sending
2516     // the item.
2517     it.release();
2518 
2519     cb::mcbp::Request req = {};
2520     req.setMagic(cb::mcbp::Magic::ClientRequest);
2521     req.setOpcode(cb::mcbp::ClientOpcode::DcpPrepare);
2522     req.setExtlen(gsl::narrow<uint8_t>(sizeof(extras)));
2523     req.setKeylen(gsl::narrow<uint16_t>(key.size()));
2524     req.setBodylen(
2525             gsl::narrow<uint32_t>(sizeof(extras) + key.size() + buffer.size()));
2526     req.setOpaque(opaque);
2527     req.setVBucket(vbucket);
2528     req.setCas(info.cas);
2529     req.setDatatype(cb::mcbp::Datatype(info.datatype));
2530 
2531     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2532     write->produce([this, &req, &extras, &buffer, &ret, &key](
2533                            cb::byte_buffer wbuf) -> size_t {
2534         const size_t total = sizeof(extras) + sizeof(req);
2535         if (wbuf.size() < total) {
2536             ret = ENGINE_E2BIG;
2537             return 0;
2538         }
2539 
2540         std::copy_n(reinterpret_cast<const uint8_t*>(&req),
2541                     sizeof(req),
2542                     wbuf.begin());
2543         std::copy_n(reinterpret_cast<const uint8_t*>(&extras),
2544                     sizeof(extras),
2545                     wbuf.begin() + sizeof(req));
2546 
2547         // Add the header
2548         addIov(wbuf.data(), sizeof(req) + sizeof(extras));
2549 
2550         // Add the key
2551         addIov(key.data(), key.size());
2552 
2553         // Add the value
2554         addIov(buffer.data(), buffer.size());
2555         return total;
2556     });
2557 
2558     return ret;
2559 }
2560 
seqno_acknowledged(uint32_t opaque, Vbid vbucket, uint64_t prepared_seqno)2561 ENGINE_ERROR_CODE Connection::seqno_acknowledged(uint32_t opaque,
2562                                                  Vbid vbucket,
2563                                                  uint64_t prepared_seqno) {
2564     cb::mcbp::request::DcpSeqnoAcknowledgedPayload extras(prepared_seqno);
2565     uint8_t buffer[sizeof(cb::mcbp::Request) + sizeof(extras)];
2566     cb::mcbp::RequestBuilder builder({buffer, sizeof(buffer)});
2567     builder.setMagic(cb::mcbp::Magic::ClientRequest);
2568     builder.setOpcode(cb::mcbp::ClientOpcode::DcpSeqnoAcknowledged);
2569     builder.setOpaque(opaque);
2570     builder.setVBucket(vbucket);
2571     builder.setExtras(extras.getBuffer());
2572     return add_packet_to_send_pipe(builder.getFrame()->getFrame());
2573 }
2574 
commit(uint32_t opaque, Vbid vbucket, const DocKey& key_, uint64_t prepare_seqno, uint64_t commit_seqno)2575 ENGINE_ERROR_CODE Connection::commit(uint32_t opaque,
2576                                      Vbid vbucket,
2577                                      const DocKey& key_,
2578                                      uint64_t prepare_seqno,
2579                                      uint64_t commit_seqno) {
2580     cb::mcbp::request::DcpCommitPayload extras(prepare_seqno, commit_seqno);
2581     auto key = key_;
2582     if (!isCollectionsSupported()) {
2583         // The client doesn't support collections, don't send an encoded key
2584         key = key.makeDocKeyWithoutCollectionID();
2585     }
2586     const size_t totalBytes =
2587             sizeof(cb::mcbp::Request) + sizeof(extras) + key.size();
2588     std::vector<uint8_t> buffer(totalBytes);
2589     cb::mcbp::RequestBuilder builder({buffer.data(), buffer.size()});
2590     builder.setMagic(cb::mcbp::Magic::ClientRequest);
2591     builder.setOpcode(cb::mcbp::ClientOpcode::DcpCommit);
2592     builder.setOpaque(opaque);
2593     builder.setVBucket(vbucket);
2594     builder.setExtras(extras.getBuffer());
2595     builder.setKey(cb::const_char_buffer(key));
2596     return add_packet_to_send_pipe(builder.getFrame()->getFrame());
2597 }
2598 
abort(uint32_t opaque, Vbid vbucket, const DocKey& key_, uint64_t prepared_seqno, uint64_t abort_seqno)2599 ENGINE_ERROR_CODE Connection::abort(uint32_t opaque,
2600                                     Vbid vbucket,
2601                                     const DocKey& key_,
2602                                     uint64_t prepared_seqno,
2603                                     uint64_t abort_seqno) {
2604     cb::mcbp::request::DcpAbortPayload extras(prepared_seqno, abort_seqno);
2605     auto key = key_;
2606     if (!isCollectionsSupported()) {
2607         // The client doesn't support collections, don't send an encoded key
2608         key = key.makeDocKeyWithoutCollectionID();
2609     }
2610     const size_t totalBytes =
2611             sizeof(cb::mcbp::Request) + sizeof(extras) + key.size();
2612     std::vector<uint8_t> buffer(totalBytes);
2613     cb::mcbp::RequestBuilder builder({buffer.data(), buffer.size()});
2614     builder.setMagic(cb::mcbp::Magic::ClientRequest);
2615     builder.setOpcode(cb::mcbp::ClientOpcode::DcpAbort);
2616     builder.setOpaque(opaque);
2617     builder.setVBucket(vbucket);
2618     builder.setExtras(extras.getBuffer());
2619     builder.setKey(cb::const_char_buffer(key));
2620     return add_packet_to_send_pipe(builder.getFrame()->getFrame());
2621 }
2622 
2623 ////////////////////////////////////////////////////////////////////////////
2624 //                                                                        //
2625 //               End DCP Message producer interface                       //
2626 //                                                                        //
2627 ////////////////////////////////////////////////////////////////////////////
2628