1// unit test for xdcr pipeline factory.
2package main
3
4import (
5	"errors"
6	"flag"
7	"fmt"
8	base "github.com/couchbase/goxdcr/base"
9	"github.com/couchbase/goxdcr/factory"
10	"github.com/couchbase/goxdcr/log"
11	"github.com/couchbase/goxdcr/metadata_svc"
12	"github.com/couchbase/goxdcr/parts"
13	"github.com/couchbase/goxdcr/replication_manager"
14	"github.com/couchbase/goxdcr/service_impl"
15	utilities "github.com/couchbase/goxdcr/utils"
16	"github.com/couchbase/goxdcr/tests/common"
17	"os"
18)
19
20var options struct {
21	sourceKVHost      string //source kv host name
22	sourceKVAdminPort uint64 //source kv admin port
23	sourceBucket      string // source bucket
24	targetBucket      string //target bucket
25
26	// parameters of remote cluster
27	remoteUuid             string // remote cluster uuid
28	remoteName             string // remote cluster name
29	remoteHostName         string // remote cluster host name
30	remoteUserName         string //remote cluster userName
31	remotePassword         string //remote cluster password
32	remoteDemandEncryption uint64 // whether encryption is needed
33	remoteCertificateFile  string // file containing certificate for encryption
34}
35
36const (
37	NUM_SOURCE_CONN = 2
38	NUM_TARGET_CONN = 3
39)
40
41func argParse() {
42	flag.Uint64Var(&options.sourceKVAdminPort, "sourceKVAdminPort", 9000,
43		"admin port number for source kv")
44	flag.StringVar(&options.sourceBucket, "source_bucket", "default",
45		"bucket to replicate from")
46	flag.StringVar(&options.targetBucket, "target_bucket", "target",
47		"bucket to replicate to")
48
49	flag.StringVar(&options.remoteUuid, "remoteUuid", "1234567",
50		"remote cluster uuid")
51	flag.StringVar(&options.remoteName, "remoteName", "remote",
52		"remote cluster name")
53	flag.StringVar(&options.remoteHostName, "remoteHostName", "127.0.0.1:9000",
54		"remote cluster host name")
55	flag.StringVar(&options.remoteUserName, "remoteUserName", "Administrator", "remote cluster userName")
56	flag.StringVar(&options.remotePassword, "remotePassword", "welcome", "remote cluster password")
57	flag.Uint64Var(&options.remoteDemandEncryption, "remoteDemandEncryption", 0, "whether encryption is needed")
58	flag.StringVar(&options.remoteCertificateFile, "remoteCertificateFile", "", "file containing certificate for encryption")
59
60	flag.Parse()
61}
62
63func usage() {
64	fmt.Fprintf(os.Stderr, "Usage : %s [OPTIONS] \n", os.Args[0])
65	flag.PrintDefaults()
66}
67
68func main() {
69	fmt.Println("Start Testing ...")
70	argParse()
71	fmt.Println("Done with parsing the arguments")
72	err := invokeFactory()
73	if err == nil {
74		fmt.Println("Test passed.")
75	} else {
76		fmt.Println(err)
77	}
78}
79
80func invokeFactory() error {
81	cluster_info_svc := service_impl.NewClusterInfoSvc(nil)
82
83	utils := utilities.NewUtilities()
84	top_svc, err := service_impl.NewXDCRTopologySvc(uint16(options.sourceKVAdminPort), base.AdminportNumber, 12001, true, cluster_info_svc, nil, utils)
85	if err != nil {
86		fmt.Printf("Error starting xdcr topology service. err=%v\n", err)
87		os.Exit(1)
88	}
89
90	options.sourceKVHost, err = top_svc.MyHost()
91	if err != nil {
92		fmt.Printf("Error getting current host. err=%v\n", err)
93		os.Exit(1)
94	}
95
96	msvc, err := metadata_svc.NewMetaKVMetadataSvc(nil)
97	if err != nil {
98		fmt.Printf("Error creating metadata service. err=%v\n", err)
99		os.Exit(1)
100	}
101
102	audit_svc, err := service_impl.NewAuditSvc(top_svc, nil, utils)
103	if err != nil {
104		fmt.Printf("Error starting audit service. err=%v\n", err)
105		os.Exit(1)
106	}
107
108	uilog_svc := service_impl.NewUILogSvc(top_svc, nil, utils)
109	remote_cluster_svc, err := metadata_svc.NewRemoteClusterService(uilog_svc, msvc, top_svc, cluster_info_svc, nil, utils)
110	if err != nil {
111		fmt.Println(err.Error())
112		return err
113	}
114
115	repl_spec_svc, err := metadata_svc.NewReplicationSpecService(uilog_svc, remote_cluster_svc, msvc, top_svc, cluster_info_svc, nil)
116	if err != nil {
117		fmt.Println(err.Error())
118		return err
119	}
120
121	processSetting_svc := metadata_svc.NewGlobalSettingsSvc(msvc, nil)
122	bucketSettings_svc := metadata_svc.NewBucketSettingsService(msvc, top_svc, nil)
123	internalSettings_svc := metadata_svc.NewInternalSettingsSvc(msvc, nil)
124
125	checkpoints_svc := metadata_svc.NewCheckpointsService(msvc, nil)
126	capi_svc := service_impl.NewCAPIService(cluster_info_svc, nil, utils)
127
128	replication_manager.StartReplicationManager(options.sourceKVHost, base.AdminportNumber,
129		repl_spec_svc,
130		remote_cluster_svc,
131		cluster_info_svc, top_svc, metadata_svc.NewReplicationSettingsSvc(msvc, nil), checkpoints_svc, capi_svc, audit_svc, uilog_svc, processSetting_svc, bucketSettings_svc, internalSettings_svc)
132
133	fac := factory.NewXDCRFactory(repl_spec_svc, remote_cluster_svc, cluster_info_svc, top_svc, checkpoints_svc, capi_svc, uilog_svc, bucketSettings_svc, log.DefaultLoggerContext, log.DefaultLoggerContext, nil, nil)
134
135	// create remote cluster reference needed by replication
136	err = common.CreateTestRemoteCluster(remote_cluster_svc, options.remoteUuid, options.remoteName, options.remoteHostName, options.remoteUserName, options.remotePassword,
137		options.remoteDemandEncryption, options.remoteCertificateFile)
138	if err != nil {
139		fmt.Println(err.Error())
140		return err
141	}
142
143	defer common.DeleteTestRemoteCluster(remote_cluster_svc, options.remoteName)
144
145	remoteClusterRef, err := remote_cluster_svc.RemoteClusterByRefName(options.remoteName, false)
146	if err != nil {
147		fmt.Println(err.Error())
148		return err
149	}
150
151	replSpec, err := repl_spec_svc.ConstructNewReplicationSpec(options.sourceBucket, remoteClusterRef.Uuid, options.targetBucket)
152	if err != nil {
153		fmt.Println(err.Error())
154		return err
155	}
156
157	replSpec.Settings.SourceNozzlePerNode = NUM_SOURCE_CONN
158	replSpec.Settings.TargetNozzlePerNode = NUM_TARGET_CONN
159	err = repl_spec_svc.AddReplicationSpec(replSpec)
160	if err != nil {
161		return err
162	}
163	defer repl_spec_svc.DelReplicationSpec(replSpec.Id)
164
165	pl, err := fac.NewPipeline(replSpec.Id, nil)
166	if err != nil {
167		return err
168	}
169
170	sources := pl.Sources()
171	targets := pl.Targets()
172
173	if len(sources) != NUM_SOURCE_CONN {
174		return errors.New(fmt.Sprintf("incorrect source nozzles. expected %v; actual %v", NUM_SOURCE_CONN, len(sources)))
175	}
176	if len(targets) != NUM_TARGET_CONN {
177		return errors.New(fmt.Sprintf("incorrect target nozzles. expected %v; actual %v", NUM_TARGET_CONN, len(targets)))
178	}
179	for sourceId, source := range sources {
180		_, ok := source.(*parts.DcpNozzle)
181		if !ok {
182			return errors.New(fmt.Sprintf("incorrect nozzle type for source nozzle %v.", sourceId))
183		}
184
185		// validate connector in source nozzles
186		connector := source.Connector()
187		if connector == nil {
188			return errors.New(fmt.Sprintf("no connector defined in source nozzle %v.", sourceId))
189		}
190		_, ok = connector.(*parts.Router)
191		if !ok {
192			return errors.New(fmt.Sprintf("incorrect connector type in source nozzle %v.", sourceId))
193		}
194		downStreamParts := source.Connector().DownStreams()
195		if len(downStreamParts) != NUM_TARGET_CONN {
196			return errors.New(fmt.Sprintf("incorrect number of downstream parts for source nozzle %v. expected %v; actual %v", sourceId, NUM_TARGET_CONN, len(downStreamParts)))
197		}
198		for partId := range downStreamParts {
199			if _, ok := targets[partId]; !ok {
200				return errors.New(fmt.Sprintf("invalid downstream part %v for source nozzle %v.", partId, sourceId))
201			}
202		}
203	}
204	//validate that target nozzles do not have connectors
205	for targetId, target := range targets {
206		_, ok := target.(*parts.XmemNozzle)
207		if !ok {
208			return errors.New(fmt.Sprintf("incorrect nozzle type for target nozzle %v.", targetId))
209		}
210		if target.Connector() != nil {
211			return errors.New(fmt.Sprintf("target nozzle %v has connector, which is invalid.", targetId))
212		}
213	}
214
215	return nil
216}
217