1package dataport 2 3import "testing" 4import "time" 5import "fmt" 6 7import "github.com/couchbase/indexing/secondary/logging" 8import c "github.com/couchbase/indexing/secondary/common" 9import protobuf "github.com/couchbase/indexing/secondary/protobuf/data" 10 11func TestTimeout(t *testing.T) { 12 logging.SetLogLevel(logging.Silent) 13 14 raddr := "localhost:8888" 15 maxBuckets, maxvbuckets, mutChanSize := 2, 4, 100 16 17 // start server 18 appch := make(chan interface{}, mutChanSize) 19 prefix := "indexer.dataport." 20 dconfig := c.SystemConfig.SectionConfig(prefix, true /*trim*/) 21 daemon, err := NewServer(raddr, maxvbuckets, dconfig, appch) 22 if err != nil { 23 t.Fatal(err) 24 } 25 26 // start endpoint 27 config := c.SystemConfig.SectionConfig("projector.dataport.", true /*trim*/) 28 endp, err := NewRouterEndpoint("clust", "topic", raddr, maxvbuckets, config) 29 if err != nil { 30 t.Fatal(err) 31 } 32 33 vbmaps := makeVbmaps(maxvbuckets, maxBuckets) // vbmaps 34 35 // send StreamBegin 36 for _, vbmap := range vbmaps { 37 for i := 0; i < len(vbmap.Vbuckets); i++ { // for N vbuckets 38 vbno, vbuuid := vbmap.Vbuckets[i], vbmap.Vbuuids[i] 39 kv := c.NewKeyVersions(uint64(0), []byte("Bourne"), 1) 40 kv.AddStreamBegin() 41 dkv := &c.DataportKeyVersions{ 42 Bucket: vbmap.Bucket, Vbno: vbno, Vbuuid: vbuuid, Kv: kv, 43 } 44 if err := endp.Send(dkv); err != nil { 45 t.Fatal(err) 46 } 47 } 48 } 49 50 go func() { // this routine will keep one connection alive 51 for i := 0; ; i++ { 52 vbmap := vbmaps[0] // keep sending sync for first vbucket alone 53 idx := i % len(vbmap.Vbuckets) 54 vbno, vbuuid := vbmap.Vbuckets[idx], vbmap.Vbuuids[idx] 55 // send sync messages 56 kv := c.NewKeyVersions(10, nil, 1) 57 kv.AddSync() 58 dkv := &c.DataportKeyVersions{ 59 Bucket: vbmap.Bucket, Vbno: vbno, Vbuuid: vbuuid, Kv: kv, 60 } 61 if endp.Send(dkv); err != nil { 62 t.Fatal(err) 63 } 64 <-time.After( 65 time.Duration(dconfig["tcpReadDeadline"].Int()) * time.Millisecond) 66 } 67 }() 68 69 wait := true 70 for wait { 71 select { 72 case msg := <-appch: 73 switch ce := msg.(type) { 74 case []*protobuf.VbKeyVersions: 75 case ConnectionError: 76 ref := maxvbuckets 77 t.Logf("%T %v \n", ce, ce) 78 if len(ce) != 2 { 79 t.Fatal("mismatch in ConnectionError") 80 } 81 refBuckets := map[string]bool{"default0": true, "default1": true} 82 for bucket, vbnos := range ce { 83 delete(refBuckets, bucket) 84 if len(vbnos) != ref { 85 t.Fatalf("mismatch in ConnectionError %v %v", vbnos, ref) 86 } 87 } 88 if len(refBuckets) > 0 { 89 t.Fatalf("mismatch in ConnectionError %v", refBuckets) 90 } 91 wait = false 92 93 default: 94 t.Fatalf("expected connection error %T", msg) 95 } 96 } 97 } 98 99 <-time.After(100 * time.Millisecond) 100 endp.Close() 101 102 <-time.After(100 * time.Millisecond) 103 daemon.Close() 104} 105 106func TestLoopback(t *testing.T) { 107 logging.SetLogLevel(logging.Silent) 108 109 raddr := "localhost:8888" 110 maxBuckets, maxvbuckets, mutChanSize := 2, 32, 100 111 112 // start server 113 appch := make(chan interface{}, mutChanSize) 114 prefix := "indexer.dataport." 115 config := c.SystemConfig.SectionConfig(prefix, true /*trim*/) 116 daemon, err := NewServer(raddr, maxvbuckets, config, appch) 117 if err != nil { 118 t.Fatal(err) 119 } 120 121 // start endpoint 122 config = c.SystemConfig.SectionConfig("projector.dataport.", true /*trim*/) 123 endp, err := NewRouterEndpoint("clust", "topic", raddr, maxvbuckets, config) 124 if err != nil { 125 t.Fatal(err) 126 } 127 128 vbmaps := makeVbmaps(maxvbuckets, maxBuckets) // vbmaps 129 130 // send StreamBegin 131 for _, vbmap := range vbmaps { 132 for i := 0; i < len(vbmap.Vbuckets); i++ { // for N vbuckets 133 vbno, vbuuid := vbmap.Vbuckets[i], vbmap.Vbuuids[i] 134 kv := c.NewKeyVersions(uint64(0), []byte("Bourne"), 1) 135 kv.AddStreamBegin() 136 dkv := &c.DataportKeyVersions{ 137 Bucket: vbmap.Bucket, Vbno: vbno, Vbuuid: vbuuid, Kv: kv, 138 } 139 if err := endp.Send(dkv); err != nil { 140 t.Fatal(err) 141 } 142 } 143 } 144 145 count, seqno := 200, 1 146 for i := 1; i <= count; i += 2 { 147 nVbs, nMuts, nIndexes := maxvbuckets, 5, 5 148 dkvs := dataKeyVersions("default0", seqno, nVbs, nMuts, nIndexes) 149 dkvs = append(dkvs, dataKeyVersions("default1", seqno, nVbs, nMuts, nIndexes)...) 150 for _, dkv := range dkvs { 151 if err := endp.Send(dkv); err != nil { 152 t.Fatal(err) 153 } 154 } 155 seqno += nMuts 156 157 // gather 158 pvbs := make([]*protobuf.VbKeyVersions, 0) 159 loop: 160 for { 161 select { 162 case msg := <-appch: 163 //t.Logf("%T %v\n", msg, msg) 164 if pvbsSub, ok := msg.([]*protobuf.VbKeyVersions); !ok { 165 t.Fatalf("unexpected type in loopback %T", msg) 166 } else { 167 pvbs = append(pvbs, pvbsSub...) 168 } 169 case <-time.After(10 * time.Millisecond): 170 break loop 171 } 172 } 173 commands := make(map[byte]int) 174 for _, vb := range protobuf2VbKeyVersions(pvbs) { 175 for _, kv := range vb.Kvs { 176 for _, cmd := range kv.Commands { 177 if _, ok := commands[byte(cmd)]; !ok { 178 commands[byte(cmd)] = 0 179 } 180 commands[byte(cmd)]++ 181 } 182 } 183 } 184 if StreamBegins, ok := commands[c.StreamBegin]; ok && StreamBegins != 64 { 185 t.Fatalf("unexpected response %v", StreamBegins) 186 } 187 if commands[c.Upsert] != 1600 { 188 t.Fatalf("unexpected response %v", commands[c.Upsert]) 189 } 190 } 191 192 endp.Close() 193 daemon.Close() 194} 195 196func BenchmarkLoopback(b *testing.B) { 197 logging.SetLogLevel(logging.Silent) 198 199 raddr := "localhost:8888" 200 maxBuckets, maxvbuckets, mutChanSize := 2, 32, 100 201 202 // start server 203 appch := make(chan interface{}, mutChanSize) 204 prefix := "indexer.dataport." 205 config := c.SystemConfig.SectionConfig(prefix, true /*trim*/) 206 daemon, err := NewServer(raddr, maxvbuckets, config, appch) 207 if err != nil { 208 b.Fatal(err) 209 } 210 211 // start endpoint 212 config = c.SystemConfig.SectionConfig("projector.dataport.", true /*trim*/) 213 endp, err := NewRouterEndpoint("clust", "topic", raddr, maxvbuckets, config) 214 if err != nil { 215 b.Fatal(err) 216 } 217 218 vbmaps := makeVbmaps(maxvbuckets, maxBuckets) 219 220 // send StreamBegin 221 for _, vbmap := range vbmaps { 222 for i := 0; i < len(vbmap.Vbuckets); i++ { // for N vbuckets 223 vbno, vbuuid := vbmap.Vbuckets[i], vbmap.Vbuuids[i] 224 kv := c.NewKeyVersions(uint64(0), []byte("Bourne"), 1) 225 kv.AddStreamBegin() 226 dkv := &c.DataportKeyVersions{ 227 Bucket: vbmap.Bucket, Vbno: vbno, Vbuuid: vbuuid, Kv: kv, 228 } 229 if err := endp.Send(dkv); err != nil { 230 b.Fatal(err) 231 } 232 } 233 } 234 235 go func() { 236 nVbs, nMuts, nIndexes, seqno := maxvbuckets, 5, 5, 1 237 for { 238 dkvs := dataKeyVersions("default0", seqno, nVbs, nMuts, nIndexes) 239 dkvs = append(dkvs, dataKeyVersions("default1", seqno, nVbs, nMuts, nIndexes)...) 240 for _, dkv := range dkvs { 241 endp.Send(dkv) 242 } 243 seqno += nMuts 244 } 245 }() 246 247 b.ResetTimer() 248 for i := 0; i < b.N; i++ { 249 select { 250 case msg := <-appch: 251 if _, ok := msg.([]*protobuf.VbKeyVersions); !ok { 252 b.Fatalf("unexpected type in loopback %T", msg) 253 } 254 } 255 } 256 257 endp.Close() 258 daemon.Close() 259} 260 261func makeVbmaps(maxvbuckets int, maxBuckets int) []*c.VbConnectionMap { 262 vbmaps := make([]*c.VbConnectionMap, 0, maxBuckets) 263 for i := 0; i < maxBuckets; i++ { 264 vbmap := &c.VbConnectionMap{ 265 Bucket: fmt.Sprintf("default%v", i), 266 Vbuckets: make([]uint16, 0, maxvbuckets), 267 Vbuuids: make([]uint64, 0, maxvbuckets), 268 } 269 for i := 0; i < maxvbuckets; i++ { 270 vbmap.Vbuckets = append(vbmap.Vbuckets, uint16(i)) 271 vbmap.Vbuuids = append(vbmap.Vbuuids, uint64(i*10)) 272 } 273 vbmaps = append(vbmaps, vbmap) 274 } 275 return vbmaps 276} 277 278func dataKeyVersions(bucket string, seqno, nVbs, nMuts, nIndexes int) []*c.DataportKeyVersions { 279 dkvs := make([]*c.DataportKeyVersions, 0) 280 for i := 0; i < nVbs; i++ { // for N vbuckets 281 vbno, vbuuid := uint16(i), uint64(i*10) 282 for j := 0; j < nMuts; j++ { 283 kv := c.NewKeyVersions(uint64(seqno+j), []byte("Bourne"), nIndexes) 284 for k := 0; k < nIndexes; k++ { 285 key := fmt.Sprintf("bangalore%v", k) 286 oldkey := fmt.Sprintf("varanasi%v", k) 287 kv.AddUpsert(uint64(k), []byte(key), []byte(oldkey)) 288 } 289 dkv := &c.DataportKeyVersions{ 290 Bucket: bucket, Vbno: vbno, Vbuuid: vbuuid, Kv: kv, 291 } 292 dkvs = append(dkvs, dkv) 293 } 294 } 295 return dkvs 296} 297