1package gomemcached 2 3import ( 4 "encoding/binary" 5 "fmt" 6 "io" 7 "sync" 8) 9 10// MCResponse is memcached response 11type MCResponse struct { 12 // The command opcode of the command that sent the request 13 Opcode CommandCode 14 // The status of the response 15 Status Status 16 // The opaque sent in the request 17 Opaque uint32 18 // The CAS identifier (if applicable) 19 Cas uint64 20 // Extras, key, and body for this response 21 Extras, Key, Body []byte 22 // If true, this represents a fatal condition and we should hang up 23 Fatal bool 24 // Datatype identifier 25 DataType uint8 26} 27 28// A debugging string representation of this response 29func (res MCResponse) String() string { 30 return fmt.Sprintf("{MCResponse status=%v keylen=%d, extralen=%d, bodylen=%d}", 31 res.Status, len(res.Key), len(res.Extras), len(res.Body)) 32} 33 34// Response as an error. 35func (res *MCResponse) Error() string { 36 return fmt.Sprintf("MCResponse status=%v, opcode=%v, opaque=%v, msg: %s", 37 res.Status, res.Opcode, res.Opaque, string(res.Body)) 38} 39 40func errStatus(e error) Status { 41 status := Status(0xffff) 42 if res, ok := e.(*MCResponse); ok { 43 status = res.Status 44 } 45 return status 46} 47 48// IsNotFound is true if this error represents a "not found" response. 49func IsNotFound(e error) bool { 50 return errStatus(e) == KEY_ENOENT 51} 52 53// IsFatal is false if this error isn't believed to be fatal to a connection. 54func IsFatal(e error) bool { 55 if e == nil { 56 return false 57 } 58 _, ok := isFatal[errStatus(e)] 59 if ok { 60 return true 61 } 62 return false 63} 64 65// Size is number of bytes this response consumes on the wire. 66func (res *MCResponse) Size() int { 67 return HDR_LEN + len(res.Extras) + len(res.Key) + len(res.Body) 68} 69 70func (res *MCResponse) fillHeaderBytes(data []byte) int { 71 pos := 0 72 data[pos] = RES_MAGIC 73 pos++ 74 data[pos] = byte(res.Opcode) 75 pos++ 76 binary.BigEndian.PutUint16(data[pos:pos+2], 77 uint16(len(res.Key))) 78 pos += 2 79 80 // 4 81 data[pos] = byte(len(res.Extras)) 82 pos++ 83 // Data type 84 if res.DataType != 0 { 85 data[pos] = byte(res.DataType) 86 } else { 87 data[pos] = 0 88 } 89 pos++ 90 binary.BigEndian.PutUint16(data[pos:pos+2], uint16(res.Status)) 91 pos += 2 92 93 // 8 94 binary.BigEndian.PutUint32(data[pos:pos+4], 95 uint32(len(res.Body)+len(res.Key)+len(res.Extras))) 96 pos += 4 97 98 // 12 99 binary.BigEndian.PutUint32(data[pos:pos+4], res.Opaque) 100 pos += 4 101 102 // 16 103 binary.BigEndian.PutUint64(data[pos:pos+8], res.Cas) 104 pos += 8 105 106 if len(res.Extras) > 0 { 107 copy(data[pos:pos+len(res.Extras)], res.Extras) 108 pos += len(res.Extras) 109 } 110 111 if len(res.Key) > 0 { 112 copy(data[pos:pos+len(res.Key)], res.Key) 113 pos += len(res.Key) 114 } 115 116 return pos 117} 118 119// HeaderBytes will get just the header bytes for this response. 120func (res *MCResponse) HeaderBytes() []byte { 121 data := make([]byte, HDR_LEN+len(res.Extras)+len(res.Key)) 122 123 res.fillHeaderBytes(data) 124 125 return data 126} 127 128// Bytes will return the actual bytes transmitted for this response. 129func (res *MCResponse) Bytes() []byte { 130 data := make([]byte, res.Size()) 131 132 pos := res.fillHeaderBytes(data) 133 134 copy(data[pos:pos+len(res.Body)], res.Body) 135 136 return data 137} 138 139// Transmit will send this response message across a writer. 140func (res *MCResponse) Transmit(w io.Writer) (n int, err error) { 141 if len(res.Body) < 128 { 142 n, err = w.Write(res.Bytes()) 143 } else { 144 n, err = w.Write(res.HeaderBytes()) 145 if err == nil { 146 m := 0 147 m, err = w.Write(res.Body) 148 m += n 149 } 150 } 151 return 152} 153 154// Receive will fill this MCResponse with the data from this reader. 155func (res *MCResponse) Receive(r io.Reader, hdrBytes []byte) (n int, err error) { 156 if len(hdrBytes) < HDR_LEN { 157 hdrBytes = []byte{ 158 0, 0, 0, 0, 0, 0, 0, 0, 159 0, 0, 0, 0, 0, 0, 0, 0, 160 0, 0, 0, 0, 0, 0, 0, 0} 161 } 162 n, err = io.ReadFull(r, hdrBytes) 163 if err != nil { 164 return n, err 165 } 166 167 if hdrBytes[0] != RES_MAGIC && hdrBytes[0] != REQ_MAGIC { 168 return n, fmt.Errorf("bad magic: 0x%02x", hdrBytes[0]) 169 } 170 171 klen := int(binary.BigEndian.Uint16(hdrBytes[2:4])) 172 elen := int(hdrBytes[4]) 173 174 res.Opcode = CommandCode(hdrBytes[1]) 175 res.DataType = uint8(hdrBytes[5]) 176 res.Status = Status(binary.BigEndian.Uint16(hdrBytes[6:8])) 177 res.Opaque = binary.BigEndian.Uint32(hdrBytes[12:16]) 178 res.Cas = binary.BigEndian.Uint64(hdrBytes[16:24]) 179 180 bodyLen := int(binary.BigEndian.Uint32(hdrBytes[8:12])) - (klen + elen) 181 182 //defer function to debug the panic seen with MB-15557 183 defer func() { 184 if e := recover(); e != nil { 185 err = fmt.Errorf(`Panic in Receive. Response %v \n 186 key len %v extra len %v bodylen %v`, res, klen, elen, bodyLen) 187 } 188 }() 189 190 buf := make([]byte, klen+elen+bodyLen) 191 m, err := io.ReadFull(r, buf) 192 if err == nil { 193 res.Extras = buf[0:elen] 194 res.Key = buf[elen : klen+elen] 195 res.Body = buf[klen+elen:] 196 } 197 198 return n + m, err 199} 200 201type MCResponsePool struct { 202 pool *sync.Pool 203} 204 205func NewMCResponsePool() *MCResponsePool { 206 rv := &MCResponsePool{ 207 pool: &sync.Pool{ 208 New: func() interface{} { 209 return &MCResponse{} 210 }, 211 }, 212 } 213 214 return rv 215} 216 217func (this *MCResponsePool) Get() *MCResponse { 218 return this.pool.Get().(*MCResponse) 219} 220 221func (this *MCResponsePool) Put(r *MCResponse) { 222 if r == nil { 223 return 224 } 225 226 r.Extras = nil 227 r.Key = nil 228 r.Body = nil 229 r.Fatal = false 230 231 this.pool.Put(r) 232} 233 234type StringMCResponsePool struct { 235 pool *sync.Pool 236 size int 237} 238 239func NewStringMCResponsePool(size int) *StringMCResponsePool { 240 rv := &StringMCResponsePool{ 241 pool: &sync.Pool{ 242 New: func() interface{} { 243 return make(map[string]*MCResponse, size) 244 }, 245 }, 246 size: size, 247 } 248 249 return rv 250} 251 252func (this *StringMCResponsePool) Get() map[string]*MCResponse { 253 return this.pool.Get().(map[string]*MCResponse) 254} 255 256func (this *StringMCResponsePool) Put(m map[string]*MCResponse) { 257 if m == nil || len(m) > 2*this.size { 258 return 259 } 260 261 for k := range m { 262 m[k] = nil 263 delete(m, k) 264 } 265 266 this.pool.Put(m) 267} 268