1// Copyright (c) 2017 Couchbase, Inc. 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the 4// License. You may obtain a copy of the License at 5// http://www.apache.org/licenses/LICENSE-2.0 6// Unless required by applicable law or agreed to in writing, 7// software distributed under the License is distributed on an "AS 8// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 9// express or implied. See the License for the specific language 10// governing permissions and limitations under the License. 11package indexer 12 13import ( 14 "fmt" 15 "hash/crc32" 16 "io/ioutil" 17 "math/rand" 18 "os" 19 "sort" 20 "sync" 21 22 "github.com/couchbase/indexing/secondary/common" 23 "github.com/couchbase/indexing/secondary/indexer" 24 "github.com/mschoch/smat" 25) 26 27// Fuzz using state machine driven by byte stream 28func Fuzz(data []byte) int { 29 return smat.Fuzz(&smatContext{}, smat.ActionID('S'), smat.ActionID('T'), actionMap, data) 30} 31 32var numItems int 33var keys [][]byte 34var vals [][]byte 35var wg sync.WaitGroup 36var smatDebug = false 37 38func smatLog(prefix, format string, args ...interface{}) { 39 if smatDebug { 40 fmt.Print(prefix) 41 fmt.Printf(format, args...) 42 } 43} 44 45type smatContext struct { 46 plasmaDir string 47 48 slice indexer.Slice 49 infos []indexer.SnapshotInfo 50 snap indexer.Snapshot 51 mutationMeta *indexer.MutationMeta 52 53 actions int 54} 55 56// Action map 57var actionMap = smat.ActionMap{ 58 smat.ActionID('a'): action(" createSlice", createSliceFunc), 59 smat.ActionID('b'): action(" insert", insertFunc), 60 smat.ActionID('c'): action(" delete", deleteFunc), 61 smat.ActionID('d'): action(" storageStatistics", storageStatisticsFunc), 62 smat.ActionID('e'): action(" createSnapshot", createSnapshotFunc), 63 smat.ActionID('f'): action(" scan", scanFunc), 64 smat.ActionID('g'): action(" rollback", rollbackFunc), 65 smat.ActionID('h'): action(" close", closeFunc), 66} 67 68var runningPercentActions []smat.PercentAction 69 70// Init function 71func init() { 72 var ids []int 73 for actionId := range actionMap { 74 ids = append(ids, int(actionId)) 75 } 76 sort.Ints(ids) 77 78 pct := 100 / len(actionMap) 79 for _, actionId := range ids { 80 runningPercentActions = append(runningPercentActions, 81 smat.PercentAction{Percent: pct, Action: smat.ActionID(actionId)}) 82 } 83 actionMap[smat.ActionID('S')] = action("SETUP", setupFunc) 84 actionMap[smat.ActionID('T')] = action("TEARDOWN", teardownFunc) 85 numItems = 5000000 86 keys = make([][]byte, numItems) 87 vals = make([][]byte, numItems) 88} 89 90func running(next byte) smat.ActionID { 91 return smat.PercentExecute(next, runningPercentActions...) 92} 93 94func action(name string, f func(ctx smat.Context) (smat.State, error)) func(ctx smat.Context) (smat.State, error) { 95 return func(ctx smat.Context) (smat.State, error) { 96 c := ctx.(*smatContext) 97 c.actions++ 98 99 smatLog(" ", "%s\n", name) 100 101 return f(ctx) 102 } 103} 104 105// setup function 106func setupFunc(ctx smat.Context) (next smat.State, err error) { 107 c := ctx.(*smatContext) 108 109 c.plasmaDir, err = ioutil.TempDir("/tmp/", "plasma") 110 if err != nil { 111 return nil, err 112 } 113 return running, nil 114} 115 116// Teardown function 117func teardownFunc(ctx smat.Context) (next smat.State, err error) { 118 c := ctx.(*smatContext) 119 120 if c.plasmaDir != "" { 121 os.RemoveAll(c.plasmaDir) 122 } 123 return nil, err 124} 125 126// Helper functions 127// TODO: Parameterize these functions to do only plasma slice or both plasma and MOI slices and reuse in go tests 128// Create the plasma slice 129func createSliceFunc(ctx smat.Context) (next smat.State, err error) { 130 c := ctx.(*smatContext) 131 132 config := common.SystemConfig.SectionConfig( 133 "indexer.", true) 134 config.SetValue("settings.moi.debug", true) 135 indexerStats := &indexer.IndexerStats{} 136 indexerStats.Init() 137 stats := &indexer.IndexStats{} 138 stats.Init() 139 idxDefn := common.IndexDefn{ 140 DefnId: common.IndexDefnId(200), 141 Name: "plasma_slice_test", 142 Using: common.PlasmaDB, 143 Bucket: "default", 144 IsPrimary: false, 145 SecExprs: []string{"Testing"}, 146 ExprType: common.N1QL, 147 PartitionScheme: common.HASH, 148 PartitionKey: "Testing"} 149 150 instID, err := common.NewIndexInstId() 151 if err != nil { 152 return nil, err 153 } 154 c.slice, err = indexer.NewPlasmaSlice(c.plasmaDir, 0, idxDefn, instID, common.PartitionId(0), false, 1, config, stats, indexerStats) 155 if err != nil { 156 return nil, err 157 } 158 fmt.Print(c.slice, "\n") 159 c.mutationMeta = indexer.NewMutationMeta() 160 for i := 0; i < numItems; i++ { 161 keys[i] = []byte(fmt.Sprintf("perf%v", rand.Intn(10000000000))) 162 vals[i] = []byte(fmt.Sprintf("body%v", rand.Intn(10000000000))) 163 } 164 return running, nil 165} 166 167// Data insert function 168func insertFunc(ctx smat.Context) (next smat.State, err error) { 169 c := ctx.(*smatContext) 170 171 for i := 0; i < numItems; i++ { 172 c.mutationMeta.SetVBId(int(crc32.ChecksumIEEE(vals[i])) % 1024) 173 c.slice.Insert(keys[i], vals[i], c.mutationMeta) 174 } 175 176 return running, nil 177} 178 179// Data delete function 180func deleteFunc(ctx smat.Context) (next smat.State, err error) { 181 c := ctx.(*smatContext) 182 183 for i := 0; i < rand.Intn(numItems/4); i++ { 184 c.mutationMeta.SetVBId(int(crc32.ChecksumIEEE(vals[i])) % 1024) 185 c.slice.Delete(vals[i], c.mutationMeta) 186 } 187 188 return running, nil 189} 190 191// Printing storage statistics for logging purposes only 192func storageStatisticsFunc(ctx smat.Context) (next smat.State, err error) { 193 c := ctx.(*smatContext) 194 195 storageStatistics, err := c.slice.Statistics() 196 if err != nil { 197 return nil, err 198 } 199 fmt.Print("Statistics \n") 200 fmt.Print(storageStatistics) 201 202 return running, nil 203} 204 205// Create snapshot, Open snapshot and Get snapshots 206func createSnapshotFunc(ctx smat.Context) (next smat.State, err error) { 207 c := ctx.(*smatContext) 208 var info indexer.SnapshotInfo 209 210 ts := common.NewTsVbuuid("default", 1024) 211 for i := uint64(1); i < uint64(1024); i++ { 212 ts.Seqnos[i] = uint64(1000000 + i) 213 ts.Vbuuids[i] = uint64(2000000 + i) 214 ts.Snapshots[i] = [2]uint64{i, i + 10} 215 } 216 info, err = c.slice.NewSnapshot(ts, true) 217 if err != nil { 218 return nil, err 219 } 220 fmt.Print(info, "\n") 221 222 c.snap, err = c.slice.OpenSnapshot(info) 223 if err != nil { 224 return nil, err 225 } 226 fmt.Print(c.snap, "\n") 227 228 c.infos, err = c.slice.GetSnapshots() 229 if err != nil { 230 return nil, err 231 } 232 233 return running, nil 234} 235 236// Do Scans 237// TODO: Add Range queries 238func scanFunc(ctx smat.Context) (next smat.State, err error) { 239 c := ctx.(*smatContext) 240 241 reader := c.slice.GetReaderContext() 242 reader.Init() 243 scanReq := new(indexer.ScanRequest) 244 scanReq.Ctx = reader 245 stopch := make(indexer.StopChannel) 246 for i := 1; i < 5; i++ { 247 if scanReq.Low == nil && scanReq.High == nil { 248 var statsCountTotal uint64 249 statsCountTotal, err7 := c.snap.StatCountTotal() 250 if err7 != nil { 251 return nil, err7 252 } 253 fmt.Print("Stats Count Total : ", statsCountTotal, "\n") 254 } else { 255 var countRange uint64 256 countRange, err11 := c.snap.CountRange(scanReq.Ctx, scanReq.Low, scanReq.High, scanReq.Incl, stopch) 257 if err11 != nil { 258 return nil, err11 259 } 260 fmt.Print("Count Range : ", countRange, "\n") 261 } 262 263 var countTotal uint64 264 countTotal, err10 := c.snap.CountTotal(scanReq.Ctx, stopch) 265 if err10 != nil { 266 return nil, err10 267 } 268 fmt.Print("Count Total : ", countTotal, "\n") 269 270 var countLookup uint64 271 countLookup, err12 := c.snap.CountLookup(scanReq.Ctx, scanReq.Keys, stopch) 272 if err12 != nil { 273 return nil, err12 274 } 275 fmt.Print("Count Lookup : ", countLookup, "\n") 276 } 277 return running, nil 278} 279 280// Function to do a rollback 281func rollbackFunc(ctx smat.Context) (next smat.State, err error) { 282 c := ctx.(*smatContext) 283 284 err = c.slice.Rollback(c.infos[0]) 285 if err != nil { 286 return nil, err 287 } 288 err = c.slice.RollbackToZero() 289 if err != nil { 290 return nil, err 291 } 292 293 return running, nil 294} 295 296// Cleanup function to close snapshots , clsose and destroy slices 297func closeFunc(ctx smat.Context) (next smat.State, err error) { 298 c := ctx.(*smatContext) 299 300 for i := 1; i < 5; i++ { 301 c.snap.Close() 302 } 303 c.slice.Close() 304 c.slice.Destroy() 305 306 err = os.RemoveAll(c.plasmaDir) 307 if err != nil { 308 return nil, err 309 } 310 return running, nil 311} 312