xref: /3.0.3-GA/couchbase-cli/pump_tap.py (revision 777028a7)
1#!/usr/bin/env python
2
3import logging
4import os
5import random
6import simplejson as json
7import socket
8import struct
9import time
10
11import cb_bin_client
12import couchbaseConstants
13
14import pump
15import pump_mc
16import pump_cb
17
18try:
19    import ctypes
20except ImportError:
21    cb_path = '/opt/couchbase/lib/python'
22    while cb_path in sys.path:
23        sys.path.remove(cb_path)
24    try:
25        import ctypes
26    except ImportError:
27        sys.exit('error: could not import ctypes module')
28    else:
29        sys.path.insert(0, cb_path)
30
31# TODO: (1) TAPDumpSource - handle TAP_FLAG_NETWORK_BYTE_ORDER.
32
33class TAPDumpSource(pump.Source):
34    """Can read from cluster/server/bucket via TAP dump."""
35
36    def __init__(self, opts, spec, source_bucket, source_node,
37                 source_map, sink_map, ctl, cur):
38        super(TAPDumpSource, self).__init__(opts, spec, source_bucket, source_node,
39                                            source_map, sink_map, ctl, cur)
40        self.tap_done = False
41        self.tap_conn = None
42        self.tap_name = opts.process_name
43        self.ack_last = False # True when the last TAP msg had TAP_FLAG_ACK.
44        self.cmd_last = None
45        self.num_msg = 0
46
47        self.recv_min_bytes = int(opts.extra.get("recv_min_bytes", 4096))
48        self.vbucket_list = getattr(opts, "vbucket_list", None)
49
50    @staticmethod
51    def can_handle(opts, spec):
52        return (spec.startswith("http://") or
53                spec.startswith("couchbase://"))
54
55    @staticmethod
56    def check(opts, spec):
57        err, map = pump.rest_couchbase(opts, spec)
58        if err:
59            return err, map
60        if not map or not map.get('buckets'):
61            return ("error: no TAP'able buckets at source: %s;"
62                    " please check your username/password to the cluster" %
63                    (spec)), None
64        return 0, map
65
66    @staticmethod
67    def provide_design(opts, source_spec, source_bucket, source_map):
68        spec_parts = source_map.get('spec_parts')
69        if not spec_parts:
70            return "error: no design spec_parts", None
71        host, port, user, pswd, path = spec_parts
72
73        source_nodes = pump.filter_bucket_nodes(source_bucket, spec_parts)
74        if not source_nodes:
75            source_nodes = source_bucket['nodes']
76            if not source_nodes:
77                return ("error: no design source node; spec_parts: %s" %
78                        (spec_parts,), None)
79
80        couch_api_base = source_nodes[0].get('couchApiBase')
81        if not couch_api_base:
82            return 0, None # No couchApiBase; probably not 2.0.
83
84        err, ddocs_json, ddocs = \
85            pump.rest_request_json(host, int(port), user, pswd,
86                                   "/pools/default/buckets/%s/ddocs" %
87                                   (source_bucket['name']),
88                                   reason="provide_design")
89        if err and "response: 404" in err: # A 404/not-found likely means 2.0-DP4.
90            ddocs_json = None
91            ddocs_url = couch_api_base + "/_all_docs"
92            ddocs_qry = "?startkey=\"_design/\"&endkey=\"_design0\"&include_docs=true"
93
94            host, port, user, pswd, path = \
95                pump.parse_spec(opts, ddocs_url, 8092)
96            # Not using user/pwd as 2.0-DP4 CAPI did not support auth.
97            err, ddocs_json, ddocs = \
98                pump.rest_request_json(host, int(port), None, None,
99                                       path + ddocs_qry,
100                                       reason="provide_design-2.0DP4")
101        if err:
102            return err, None
103
104        if not ddocs.get('rows', None):
105            return 0, None
106        else:
107            return 0, json.dumps(ddocs.get('rows', []))
108
109    def provide_batch(self):
110        cur_sleep = 0.2
111        cur_retry = 0
112        max_retry = self.opts.extra['max_retry']
113
114        while True:
115            if self.tap_done:
116                return 0, None
117
118            rv, tap_conn = self.get_tap_conn()
119            if rv != 0:
120                self.tap_done = True
121                return rv, None
122
123            rv, batch = self.provide_batch_actual(tap_conn)
124            if rv == 0:
125                return rv, batch
126
127            if self.tap_conn:
128                self.tap_conn.close()
129                self.tap_conn = None
130
131            if cur_retry >= max_retry:
132                self.tap_done = True
133                return rv, batch
134
135            logging.warn("backoff: %s, sleeping: %s, on error: %s" %
136                         (cur_retry, cur_sleep, rv))
137            time.sleep(cur_sleep)
138            cur_sleep = min(cur_sleep * 2, 20) # Max backoff sleep 20 seconds.
139            cur_retry = cur_retry + 1
140
141    def provide_batch_actual(self, tap_conn):
142        batch = pump.Batch(self)
143
144        batch_max_size = self.opts.extra['batch_max_size']
145        batch_max_bytes = self.opts.extra['batch_max_bytes']
146        try:
147            while (not self.tap_done and
148                   batch.size() < batch_max_size and
149                   batch.bytes < batch_max_bytes):
150                # TODO: (1) TAPDumpSource - provide_batch timeout on inactivity.
151
152                rv, cmd, vbucket_id, key, flg, exp, cas, meta, val, \
153                    opaque, need_ack, dtype = self.read_tap_conn(tap_conn)
154                if rv != 0:
155                    self.tap_done = True
156                    return rv, batch
157
158                if (cmd == couchbaseConstants.CMD_TAP_MUTATION or
159                    cmd == couchbaseConstants.CMD_TAP_DELETE):
160                    if not self.skip(key, vbucket_id):
161                        msg = (cmd, vbucket_id, key, flg, exp, cas, meta, val, 0, dtype, 0)
162                        batch.append(msg, len(val))
163                        self.num_msg += 1
164                    if cmd == couchbaseConstants.CMD_TAP_DELETE:
165                        batch.adjust_size += 1
166                elif cmd == couchbaseConstants.CMD_TAP_OPAQUE:
167                    if not need_ack:
168                        pass
169                elif cmd == couchbaseConstants.CMD_NOOP:
170                    # 1.8.x servers might not end the TAP dump on an empty bucket,
171                    # so we treat 2 NOOP's in a row as the end and proactively close.
172                    # Only do this when there've been no msgs to avoid closing
173                    # during a slow backfill.
174                    if (self.cmd_last == couchbaseConstants.CMD_NOOP and
175                        self.num_msg == 0 and
176                        batch.size() <= 0):
177                        self.tap_done = True
178                        logging.debug("Receive 2 NOOp's in a row. Close it proactively.")
179                        return 0, batch
180                elif cmd == couchbaseConstants.CMD_TAP_FLUSH:
181                    logging.warn("stopping: saw CMD_TAP_FLUSH")
182                    self.tap_done = True
183                    break
184                else:
185                    s = str(pump.CMD_STR.get(cmd, cmd))
186                    logging.warn("warning: unexpected TAP message: " + s)
187                    return "unexpected TAP message: " + s, batch
188
189                if need_ack:
190                    self.ack_last = True
191                    try:
192                        tap_conn._sendMsg(cmd, '', '', opaque, vbucketId=0,
193                                          fmt=couchbaseConstants.RES_PKT_FMT,
194                                          magic=couchbaseConstants.RES_MAGIC_BYTE)
195                    except socket.error:
196                        return ("error: socket.error on send();"
197                                " perhaps the source server: %s was rebalancing"
198                                " or had connectivity/server problems" %
199                                (self.source_node['hostname'])), batch
200                    except EOFError:
201                        return ("error: EOFError on socket send();"
202                                " perhaps the source server: %s was rebalancing"
203                                " or had connectivity/server problems" %
204                                (self.source_node['hostname'])), batch
205
206                    # Close the batch when there's an ACK handshake, so
207                    # the server can concurrently send us the next batch.
208                    # If we are slow, our slow ACK's will naturally slow
209                    # down the server.
210                    return 0, batch
211
212                self.ack_last = False
213                self.cmd_last = cmd
214
215        except EOFError:
216            if batch.size() <= 0 and self.ack_last:
217                # A closed conn after an ACK means clean end of TAP dump.
218                self.tap_done = True
219            else:
220                logging.warn("Connection is closed prematurely")
221
222        if batch.size() <= 0:
223            return 0, None
224
225        return 0, batch
226
227    def get_tap_conn(self):
228        """Return previously connected tap conn."""
229
230        if not self.tap_conn:
231            host = self.source_node['hostname'].split(':')[0]
232            port = self.source_node['ports']['direct']
233            version = self.source_node['version'] # Ex: '2.0.0-1944-rel-community' or '1.8.1-937-rel-community'.
234
235            logging.debug("  TAPDumpSource connecting mc: " +
236                          host + ":" + str(port))
237
238            self.tap_conn = cb_bin_client.MemcachedClient(host, port)
239            if not self.tap_conn:
240                return "error: could not connect to memcached: " + \
241                    host + ":" + str(port), None
242
243            sasl_user = str(pump.get_username(self.source_bucket.get("name", self.opts.username)))
244            sasl_pswd = str(pump.get_password(self.source_bucket.get("saslPassword", self.opts.password)))
245            if sasl_user:
246                try:
247                    self.tap_conn.sasl_auth_cram_md5(sasl_user, sasl_pswd)
248                except cb_bin_client.MemcachedError:
249                    try:
250                        self.tap_conn.sasl_auth_plain(sasl_user, sasl_pswd)
251                    except EOFError:
252                        return "error: SASL auth error: %s:%s, user: %s" % \
253                            (host, port, sasl_user), None
254                    except cb_bin_client.MemcachedError:
255                        return "error: SASL auth failed: %s:%s, user: %s" % \
256                            (host, port, sasl_user), None
257                    except socket.error:
258                        return "error: SASL auth socket error: %s:%s, user: %s" % \
259                            (host, port, sasl_user), None
260                except EOFError:
261                    return "error: SASL auth error: %s:%s, user: %s" % \
262                        (host, port, sasl_user), None
263                except socket.error:
264                    return "error: SASL auth socket error: %s:%s, user: %s" % \
265                        (host, port, sasl_user), None
266
267            try:
268                self.tap_conn.hello()
269            except Exception, e:
270                logging.warn("fail to call hello command, maybe it is not supported.")
271                pass
272
273            # We explicitly do not use TAP_FLAG_REGISTERED_CLIENT,
274            # as that is for checkpoint/incremental backup only.
275            #
276            tap_opts = {couchbaseConstants.TAP_FLAG_DUMP: '',
277                        couchbaseConstants.TAP_FLAG_SUPPORT_ACK: ''}
278
279            self.tap_conn.tap_fix_flag_byteorder = version.split(".") >= ["2", "0", "0"]
280            if self.tap_conn.tap_fix_flag_byteorder:
281                tap_opts[couchbaseConstants.TAP_FLAG_TAP_FIX_FLAG_BYTEORDER] = ''
282
283            if self.vbucket_list:
284                tap_opts[couchbaseConstants.TAP_FLAG_LIST_VBUCKETS] = ''
285
286            ext, val = TAPDumpSource.encode_tap_connect_opts(tap_opts, vblist=self.vbucket_list)
287            self.tap_conn._sendCmd(couchbaseConstants.CMD_TAP_CONNECT,
288                                   self.tap_name, val, 0, ext)
289
290        return 0, self.tap_conn
291
292    def read_tap_conn(self, tap_conn):
293        buf, cmd, vbucket_id, opaque, cas, keylen, extlen, data, datalen, dtype = \
294            self.recv_msg(tap_conn.s, getattr(tap_conn, 'buf', ''))
295        tap_conn.buf = buf
296
297        rv = 0
298        metalen = flags = ttl = flg = exp = 0
299        meta = key = val = ext = ''
300        need_ack = False
301
302        if data:
303            ext = data[0:extlen]
304            if extlen == 8:
305                metalen, flags, ttl = \
306                    struct.unpack(couchbaseConstants.TAP_GENERAL_PKT_FMT, ext)
307            elif extlen == 16:
308                metalen, flags, ttl, flg, exp = \
309                    struct.unpack(couchbaseConstants.TAP_MUTATION_PKT_FMT, ext)
310                if not tap_conn.tap_fix_flag_byteorder:
311                    flg = socket.ntohl(flg)
312            need_ack = flags & couchbaseConstants.TAP_FLAG_ACK
313            meta_start = extlen
314            key_start = meta_start + metalen
315            val_start = key_start + keylen
316
317            meta = data[meta_start:key_start]
318            key = data[key_start:val_start]
319            val = data[val_start:]
320        elif datalen:
321            rv = "error: could not read full TAP message body"
322
323        return rv, cmd, vbucket_id, key, flg, exp, cas, meta, val, opaque, need_ack, dtype
324
325    def recv_msg(self, sock, buf):
326        pkt, buf = self.recv(sock, couchbaseConstants.MIN_RECV_PACKET, buf)
327        if not pkt:
328            raise EOFError()
329        magic, cmd, keylen, extlen, dtype, errcode, datalen, opaque, cas = \
330            struct.unpack(couchbaseConstants.REQ_PKT_FMT, pkt)
331        if magic != couchbaseConstants.REQ_MAGIC_BYTE:
332            raise Exception("unexpected recv_msg magic: " + str(magic))
333        data, buf = self.recv(sock, datalen, buf)
334        return buf, cmd, errcode, opaque, cas, keylen, extlen, data, datalen, dtype
335
336    def recv(self, skt, nbytes, buf):
337        recv_arr = [ buf ]
338        recv_tot = len(buf) # In bytes.
339
340        while recv_tot < nbytes:
341            data = None
342            try:
343                data = skt.recv(max(nbytes - len(buf), self.recv_min_bytes))
344            except socket.timeout:
345                logging.error("error: recv socket.timeout")
346            except Exception, e:
347                msg = str(e)
348                if msg.find("Connection reset by peer") >= 0:
349                    logging.error("error: recv exception: " + \
350                        "%s, %s may be inactive" % (msg, self.source_node["hostname"]))
351                else:
352                    logging.error("error: recv exception: " + str(e))
353
354
355            if not data:
356                return None, ''
357
358            recv_arr.append(data)
359            recv_tot += len(data)
360
361        joined = ''.join(recv_arr)
362
363        return joined[:nbytes], joined[nbytes:]
364
365    @staticmethod
366    def encode_tap_connect_opts(opts, backfill=False, vblist=None):
367        header = 0
368        val = []
369        for op in sorted(opts.keys()):
370            header |= op
371            if op in couchbaseConstants.TAP_FLAG_TYPES:
372                val.append(struct.pack(couchbaseConstants.TAP_FLAG_TYPES[op],
373                                       opts[op]))
374            elif backfill and op == couchbaseConstants.TAP_FLAG_CHECKPOINT:
375                if opts[op][2] >= 0:
376                    val.append(struct.pack(">HHQ",
377                                           opts[op][0], opts[op][1], opts[op][2]))
378            elif vblist and op == couchbaseConstants.TAP_FLAG_LIST_VBUCKETS:
379                vblist = json.loads(vblist)
380                if isinstance(vblist, dict):
381                    for node, vbucketlist in vblist.iteritems():
382                        val.append(struct.pack(">H", len(vbucketlist)))
383                        for v in vbucketlist:
384                            val.append(struct.pack(">H", int(v)))
385                elif isinstance(vblist, list):
386                    val.append(struct.pack(">H", len(vblist)))
387                    for v in vblist:
388                        val.append(struct.pack(">H", int(v)))
389            else:
390                val.append(opts[op])
391
392        return struct.pack(">I", header), ''.join(val)
393
394    @staticmethod
395    def total_msgs(opts, source_bucket, source_node, source_map):
396        source_name = source_node.get("hostname", None)
397        if not source_name:
398            return 0, None
399
400        spec = source_map['spec']
401        name = source_bucket['name']
402        vbuckets_num = len(source_bucket['vBucketServerMap']['vBucketMap'])
403        if not vbuckets_num:
404            return 0, None
405
406        vbucket_list = getattr(opts, "vbucket_list", None)
407
408        stats_vals = {}
409        host, port, user, pswd, _ = pump.parse_spec(opts, spec, 8091)
410        for stats in ["curr_items", "vb_active_resident_items_ratio"]:
411            path = "/pools/default/buckets/%s/stats/%s" % (name, stats)
412            err, json, data = pump.rest_request_json(host, int(port),
413                                                     user, pswd, path,
414                                                     reason="total_msgs")
415            if err:
416                return 0, None
417
418            nodeStats = data.get("nodeStats", None)
419            if not nodeStats:
420                return 0, None
421            vals = nodeStats.get(source_name, None)
422            if not vals:
423                return 0, None
424            stats_vals[stats] = vals[-1]
425
426        total_msgs = stats_vals["curr_items"]
427        resident_ratio = stats_vals["vb_active_resident_items_ratio"]
428        if 0 < resident_ratio < 100:
429            #for DGM case, server will transfer both in-memory items and backfill all items on disk
430            total_msgs += (resident_ratio/100.0) * stats_vals["curr_items"]
431        return 0, int(total_msgs)
432
433class TapSink(pump_cb.CBSink):
434    """Smart client using tap protocol to steam data to couchbase cluster."""
435    def __init__(self, opts, spec, source_bucket, source_node,
436                 source_map, sink_map, ctl, cur):
437        super(TapSink, self).__init__(opts, spec, source_bucket, source_node,
438                                     source_map, sink_map, ctl, cur)
439
440        self.tap_flags = couchbaseConstants.TAP_FLAG_NETWORK_BYTE_ORDER
441        self.vbucket_list = getattr(opts, "vbucket_list", None)
442
443    @staticmethod
444    def check_base(opts, spec):
445        #Overwrite CBSink.check_base() to allow replica vbucket state
446
447        op = getattr(opts, "destination_operation", None)
448        if op not in [None, 'set', 'add', 'get']:
449            return ("error: --destination-operation unsupported value: %s" +
450                    "; use set, add, get") % op
451
452        return pump.EndPoint.check_base(opts, spec)
453
454    def scatter_gather(self, mconns, batch):
455        sink_map_buckets = self.sink_map['buckets']
456        if len(sink_map_buckets) != 1:
457            return "error: CBSink.run() expected 1 bucket in sink_map", None, None
458
459        vbuckets_num = len(sink_map_buckets[0]['vBucketServerMap']['vBucketMap'])
460        vbuckets = batch.group_by_vbucket_id(vbuckets_num, self.rehash)
461
462        # Scatter or send phase.
463        for vbucket_id, msgs in vbuckets.iteritems():
464            rv, conn = self.find_conn(mconns, vbucket_id, msgs)
465            if rv != 0:
466                return rv, None, None
467            rv = self.send_msgs(conn, msgs, self.operation(),
468                                vbucket_id=vbucket_id)
469            if rv != 0:
470                return rv, None, None
471
472        # Yield to let other threads do stuff while server's processing.
473        time.sleep(0.01)
474
475        # Gather or recv phase.
476        # Only process the last msg which requires ack request
477        last_msg = []
478        retry_batch = None
479        need_refresh = False
480        for vbucket_id, msgs in vbuckets.iteritems():
481            last_msg.append(msgs[-1])
482            rv, conn = self.find_conn(mconns, vbucket_id, msgs)
483            if rv != 0:
484                return rv, None, None
485            rv, retry, refresh = \
486                self.recv_msgs(conn, last_msg, vbucket_id=vbucket_id, verify_opaque=False)
487            if rv != 0:
488                return rv, None, None
489            if retry:
490                retry_batch = batch
491            if refresh:
492                need_refresh = True
493
494        return 0, retry_batch, retry_batch and not need_refresh
495
496    def find_conn(self, mconns, vbucket_id, msgs):
497        if not self.vbucket_list:
498            return super(TapSink, self).find_conn(mconns, vbucket_id, msgs)
499
500        vbuckets = json.loads(self.vbucket_list)
501        if isinstance(vbuckets, list):
502            if vbucket_id not in vbuckets:
503                return "error: unexpected vbucket id:" + str(vbucket_id), None
504            return super(TapSink, self).find_conn(mconns, vbucket_id, msgs)
505
506        bucket = self.sink_map['buckets'][0]
507        serverList = bucket['vBucketServerMap']['serverList']
508        if serverList:
509            port_number = serverList[0].split(":")[-1]
510        else:
511            port_number = 11210
512
513        conn = None
514        if isinstance(vbuckets, dict):
515            for node, vblist in vbuckets.iteritems():
516                for vbucket in vblist:
517                    if int(vbucket) == int(vbucket_id):
518                        host_port = "%s:%s" % (node, port_number)
519                        conn = mconns.get(host_port, None)
520                        if not conn:
521                            user = bucket['name']
522                            pswd = bucket['saslPassword']
523                            rv, conn = TapSink.connect_mc(node, port_number, user, pswd)
524                            if rv != 0:
525                                logging.error("error: CBSink.connect() for send: " + rv)
526                                return rv, None
527                            mconns[host_port] = conn
528                            for i, msg in enumerate(msgs):
529                                msg_format_length = len(msg)
530                                if msg_format_length > 8:
531                                    try:
532                                        conn.hello()
533                                    except Exception, e:
534                                        logging.warn("fail to call hello command, maybe it is not supported")
535                                        pass
536                                break
537                        break
538        return 0, conn
539
540    def send_msgs(self, conn, msgs, operation, vbucket_id=None):
541        m = []
542        #Ask for acknowledgement for the last msg of batch
543        msg_format_length = 0
544        for i, msg in enumerate(msgs):
545            if not msg_format_length:
546                msg_format_length = len(msg)
547            cmd, vbucket_id_msg, key, flg, exp, cas, meta, val = msg[:8]
548            seqno = dtype = nmeta = 0
549            if msg_format_length > 8:
550                seqno, dtype, nmeta = msg[8:]
551            if vbucket_id is not None:
552                vbucket_id_msg = vbucket_id
553
554            if self.skip(key, vbucket_id_msg):
555                continue
556
557            if cmd == couchbaseConstants.CMD_GET:
558                val, flg, exp, cas = '', 0, 0, 0
559            if cmd == couchbaseConstants.CMD_NOOP:
560                key, val, flg, exp, cas = '', '', 0, 0, 0
561            need_ack = (i == len(msgs)-1)
562            rv, req = self.cmd_request(cmd, vbucket_id_msg, key, val,
563                                       ctypes.c_uint32(flg).value,
564                                       exp, cas, meta, i, need_ack, dtype)
565            if rv != 0:
566                return rv
567            self.append_req(m, req)
568        if m:
569            try:
570                conn.s.send(''.join(m))
571            except socket.error, e:
572                return "error: conn.send() exception: %s" % (e)
573        return 0
574
575    def cmd_request(self, cmd, vbucket_id, key, val, flg, exp, cas, meta, opaque, need_ack, dtype):
576        if meta:
577            seq_no = str(meta)
578            if len(seq_no) > 8:
579                seq_no = seq_no[0:8]
580            if len(seq_no) < 8:
581                # The seq_no might be 32-bits from 2.0DP4, so pad with 0x00's.
582                seq_no = ('\x00\x00\x00\x00\x00\x00\x00\x00' + seq_no)[-8:]
583            check_seqno, = struct.unpack(">Q", seq_no)
584            if not check_seqno:
585                seq_no = ('\x00\x00\x00\x00\x00\x00\x00\x00' + 1)[-8:]
586        else:
587            seq_no = ('\x00\x00\x00\x00\x00\x00\x00\x00' + 1)[-8:]
588        metalen = len(str(seq_no))
589        ttl = -1
590        tap_flags = self.tap_flags
591        if need_ack:
592            tap_flags |= couchbaseConstants.TAP_FLAG_ACK
593        if cmd == couchbaseConstants.CMD_TAP_MUTATION:
594            ext = struct.pack(couchbaseConstants.TAP_MUTATION_PKT_FMT,
595                              metalen,
596                              tap_flags,
597                              ttl, flg, exp)
598        elif cmd == couchbaseConstants.CMD_TAP_DELETE:
599            ext = struct.pack(couchbaseConstants.TAP_GENERAL_PKT_FMT,
600                              metalen,
601                              tap_flags, ttl)
602        else:
603            return "error: MCSink - unknown tap cmd for request: " + str(cmd), None
604
605        hdr = self.cmd_header(cmd, vbucket_id, key, val, ext, cas, opaque, metalen, dtype)
606        return 0, (hdr, ext, seq_no, key, val)
607
608    def cmd_header(self, cmd, vbucket_id, key, val, ext, cas, opaque, metalen,
609                   dtype=0,
610                   fmt=couchbaseConstants.REQ_PKT_FMT,
611                   magic=couchbaseConstants.REQ_MAGIC_BYTE):
612        #MB-11902
613        dtype = 0
614        return struct.pack(fmt, magic, cmd,
615                           len(key), len(ext), dtype, vbucket_id,
616                           metalen + len(key) + len(val) + len(ext), opaque, cas)
617
618    def append_req(self, m, req):
619        hdr, ext, seq_no, key, val = req
620        m.append(hdr)
621        if ext:
622            m.append(ext)
623        if seq_no:
624            m.append(seq_no)
625        if key:
626            m.append(str(key))
627        if val:
628            m.append(str(val))
629