1package gocb
2
3import (
4	"encoding/json"
5	"fmt"
6	"net/http"
7	"sync"
8	"time"
9
10	"github.com/google/uuid"
11	"gopkg.in/couchbase/gocbcore.v7"
12)
13
14func diagServiceString(service ServiceType) string {
15	switch service {
16	case MemdService:
17		return "kv"
18	case CapiService:
19		return "view"
20	case MgmtService:
21		return "mgmt"
22	case N1qlService:
23		return "n1ql"
24	case FtsService:
25		return "fts"
26	case CbasService:
27		return "cbas"
28	}
29	return "?"
30}
31
32// PingServiceEntry represents a single entry in a ping report.
33type PingServiceEntry struct {
34	Service  ServiceType
35	Endpoint string
36	Success  bool
37	Latency  time.Duration
38}
39
40// PingReport encapsulates the details from a executed ping operation.
41type PingReport struct {
42	Services []PingServiceEntry
43}
44
45type jsonPingServiceEntry struct {
46	Remote    string `json:"remote"`
47	LatencyUs uint64 `json:"latency_us"`
48	Success   bool   `json:"success"`
49}
50
51type jsonPingReport struct {
52	Version  int                               `json:"version"`
53	Id       string                            `json:"id"`
54	Sdk      string                            `json:"sdk"`
55	Services map[string][]jsonPingServiceEntry `json:"services"`
56}
57
58// MarshalJSON generates a JSON representation of this ping report.
59func (report *PingReport) MarshalJSON() ([]byte, error) {
60	jsonReport := jsonPingReport{
61		Version:  1,
62		Id:       uuid.New().String(),
63		Sdk:      "gocb/" + Version() + " " + "gocbcore/" + gocbcore.Version(),
64		Services: make(map[string][]jsonPingServiceEntry),
65	}
66
67	for _, service := range report.Services {
68		serviceStr := diagServiceString(service.Service)
69		jsonReport.Services[serviceStr] = append(jsonReport.Services[serviceStr], jsonPingServiceEntry{
70			Remote:    service.Endpoint,
71			LatencyUs: uint64(service.Latency / time.Nanosecond),
72		})
73	}
74
75	return json.Marshal(&jsonReport)
76}
77
78func (b *Bucket) pingKv() (pingsOut []gocbcore.PingResult, errOut error) {
79	signal := make(chan bool, 1)
80
81	op, err := b.client.Ping(func(results []gocbcore.PingResult) {
82		pingsOut = make([]gocbcore.PingResult, len(results))
83		for pingIdx, ping := range results {
84			// We rewrite the cancelled errors into timeout errors here.
85			if ping.Error == gocbcore.ErrCancelled {
86				ping.Error = ErrTimeout
87			}
88			pingsOut[pingIdx] = ping
89		}
90		signal <- true
91	})
92	if err != nil {
93		return nil, err
94	}
95
96	timeoutTmr := gocbcore.AcquireTimer(b.opTimeout)
97	select {
98	case <-signal:
99		gocbcore.ReleaseTimer(timeoutTmr, false)
100		return
101	case <-timeoutTmr.C:
102		gocbcore.ReleaseTimer(timeoutTmr, true)
103		if !op.Cancel() {
104			<-signal
105			return
106		}
107		return nil, ErrTimeout
108	}
109}
110
111// Ping will ping a list of services and verify they are active and
112// responding in an acceptable period of time.
113//
114// Experimental: This API is subject to change at any time.
115func (b *Bucket) Ping(services []ServiceType) (*PingReport, error) {
116	numServices := 0
117	waitCh := make(chan error, 10)
118	report := &PingReport{}
119	var reportLock sync.Mutex
120
121	if services == nil {
122		services = []ServiceType{
123			MemdService,
124			CapiService,
125			N1qlService,
126			FtsService,
127		}
128	}
129
130	httpReq := func(service ServiceType, endpoint, url string) (time.Duration, error) {
131		c := b.cluster
132
133		startTime := time.Now()
134
135		client := b.client.HttpClient()
136
137		reqUri := fmt.Sprintf("%s/%s", endpoint, url)
138		req, err := http.NewRequest("GET", reqUri, nil)
139		if err != nil {
140			return 0, err
141		}
142
143		timeout := 60 * time.Second
144		if service == N1qlService {
145			if b.n1qlTimeout < c.n1qlTimeout {
146				timeout = b.n1qlTimeout
147			} else {
148				timeout = c.n1qlTimeout
149			}
150		} else if service == FtsService {
151			if b.ftsTimeout < c.ftsTimeout {
152				timeout = b.ftsTimeout
153			} else {
154				timeout = c.ftsTimeout
155			}
156		} else if service == CbasService {
157			timeout = c.analyticsTimeout
158		}
159
160		resp, err := doHttpWithTimeout(client, req, timeout)
161		if err != nil {
162			return 0, err
163		}
164
165		err = resp.Body.Close()
166		if err != nil {
167			logDebugf("Failed to close http request: %s", err)
168		}
169
170		pingLatency := time.Now().Sub(startTime)
171
172		return pingLatency, err
173	}
174
175	for _, serviceType := range services {
176		switch serviceType {
177		case MemdService:
178			numServices++
179			go func() {
180				pings, err := b.pingKv()
181				if err != nil {
182					logWarnf("Failed to ping KV for report: %s", err)
183					waitCh <- nil
184					return
185				}
186
187				reportLock.Lock()
188				// We intentionally ignore errors here and simply include
189				// any non-error pings that we have received.  Note that
190				// gocbcore's ping command, when cancelled, still returns
191				// any pings that had occurred before the operation was
192				// cancelled and then marks the rest as errors.
193				for _, ping := range pings {
194					wasSuccess := true
195					if ping.Error != nil {
196						wasSuccess = false
197					}
198
199					report.Services = append(report.Services, PingServiceEntry{
200						Service:  MemdService,
201						Endpoint: ping.Endpoint,
202						Success:  wasSuccess,
203						Latency:  ping.Latency,
204					})
205				}
206				reportLock.Unlock()
207				waitCh <- nil
208			}()
209		case CapiService:
210			// View Service is not currently supported as a ping target
211		case N1qlService:
212			numServices++
213			go func() {
214				pingLatency := time.Duration(0)
215
216				endpoint, err := b.getN1qlEp()
217				if err == nil {
218					pingLatency, err = httpReq(N1qlService, endpoint, "/admin/ping")
219				}
220
221				reportLock.Lock()
222				if err != nil {
223					report.Services = append(report.Services, PingServiceEntry{
224						Service:  N1qlService,
225						Endpoint: endpoint,
226						Success:  false,
227					})
228				} else {
229					report.Services = append(report.Services, PingServiceEntry{
230						Service:  N1qlService,
231						Endpoint: endpoint,
232						Success:  true,
233						Latency:  pingLatency,
234					})
235				}
236				reportLock.Unlock()
237
238				waitCh <- nil
239			}()
240		case FtsService:
241			numServices++
242			go func() {
243				pingLatency := time.Duration(0)
244
245				endpoint, err := b.getFtsEp()
246				if err == nil {
247					pingLatency, err = httpReq(FtsService, endpoint, "/api/ping")
248				}
249
250				reportLock.Lock()
251				if err != nil {
252					report.Services = append(report.Services, PingServiceEntry{
253						Service:  FtsService,
254						Endpoint: endpoint,
255						Success:  false,
256					})
257				} else {
258					report.Services = append(report.Services, PingServiceEntry{
259						Service:  FtsService,
260						Endpoint: endpoint,
261						Success:  true,
262						Latency:  pingLatency,
263					})
264				}
265				reportLock.Unlock()
266
267				waitCh <- nil
268			}()
269		case CbasService:
270			numServices++
271			go func() {
272				pingLatency := time.Duration(0)
273
274				endpoint, err := b.getCbasEp()
275				if err == nil {
276					pingLatency, err = httpReq(CbasService, endpoint, "/admin/ping")
277				}
278
279				reportLock.Lock()
280				if err != nil {
281					report.Services = append(report.Services, PingServiceEntry{
282						Service:  CbasService,
283						Endpoint: endpoint,
284						Success:  false,
285					})
286				} else {
287					report.Services = append(report.Services, PingServiceEntry{
288						Service:  CbasService,
289						Endpoint: endpoint,
290						Success:  true,
291						Latency:  pingLatency,
292					})
293				}
294				reportLock.Unlock()
295
296				waitCh <- nil
297			}()
298		}
299	}
300
301	for i := 0; i < numServices; i++ {
302		<-waitCh
303	}
304
305	return report, nil
306}
307