Remove BPA from Makefile
[icn.git] / cmd / bpa-operator / vendor / google.golang.org / grpc / internal / channelz / funcs.go
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 // Package channelz defines APIs for enabling channelz service, entry
20 // registration/deletion, and accessing channelz data. It also defines channelz
21 // metric struct formats.
22 //
23 // All APIs in this package are experimental.
24 package channelz
25
26 import (
27         "sort"
28         "sync"
29         "sync/atomic"
30         "time"
31
32         "google.golang.org/grpc/grpclog"
33 )
34
35 const (
36         defaultMaxTraceEntry int32 = 30
37 )
38
39 var (
40         db    dbWrapper
41         idGen idGenerator
42         // EntryPerPage defines the number of channelz entries to be shown on a web page.
43         EntryPerPage  = int64(50)
44         curState      int32
45         maxTraceEntry = defaultMaxTraceEntry
46 )
47
48 // TurnOn turns on channelz data collection.
49 func TurnOn() {
50         if !IsOn() {
51                 NewChannelzStorage()
52                 atomic.StoreInt32(&curState, 1)
53         }
54 }
55
56 // IsOn returns whether channelz data collection is on.
57 func IsOn() bool {
58         return atomic.CompareAndSwapInt32(&curState, 1, 1)
59 }
60
61 // SetMaxTraceEntry sets maximum number of trace entry per entity (i.e. channel/subchannel).
62 // Setting it to 0 will disable channel tracing.
63 func SetMaxTraceEntry(i int32) {
64         atomic.StoreInt32(&maxTraceEntry, i)
65 }
66
67 // ResetMaxTraceEntryToDefault resets the maximum number of trace entry per entity to default.
68 func ResetMaxTraceEntryToDefault() {
69         atomic.StoreInt32(&maxTraceEntry, defaultMaxTraceEntry)
70 }
71
72 func getMaxTraceEntry() int {
73         i := atomic.LoadInt32(&maxTraceEntry)
74         return int(i)
75 }
76
77 // dbWarpper wraps around a reference to internal channelz data storage, and
78 // provide synchronized functionality to set and get the reference.
79 type dbWrapper struct {
80         mu sync.RWMutex
81         DB *channelMap
82 }
83
84 func (d *dbWrapper) set(db *channelMap) {
85         d.mu.Lock()
86         d.DB = db
87         d.mu.Unlock()
88 }
89
90 func (d *dbWrapper) get() *channelMap {
91         d.mu.RLock()
92         defer d.mu.RUnlock()
93         return d.DB
94 }
95
96 // NewChannelzStorage initializes channelz data storage and id generator.
97 //
98 // Note: This function is exported for testing purpose only. User should not call
99 // it in most cases.
100 func NewChannelzStorage() {
101         db.set(&channelMap{
102                 topLevelChannels: make(map[int64]struct{}),
103                 channels:         make(map[int64]*channel),
104                 listenSockets:    make(map[int64]*listenSocket),
105                 normalSockets:    make(map[int64]*normalSocket),
106                 servers:          make(map[int64]*server),
107                 subChannels:      make(map[int64]*subChannel),
108         })
109         idGen.reset()
110 }
111
112 // GetTopChannels returns a slice of top channel's ChannelMetric, along with a
113 // boolean indicating whether there's more top channels to be queried for.
114 //
115 // The arg id specifies that only top channel with id at or above it will be included
116 // in the result. The returned slice is up to a length of the arg maxResults or
117 // EntryPerPage if maxResults is zero, and is sorted in ascending id order.
118 func GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) {
119         return db.get().GetTopChannels(id, maxResults)
120 }
121
122 // GetServers returns a slice of server's ServerMetric, along with a
123 // boolean indicating whether there's more servers to be queried for.
124 //
125 // The arg id specifies that only server with id at or above it will be included
126 // in the result. The returned slice is up to a length of the arg maxResults or
127 // EntryPerPage if maxResults is zero, and is sorted in ascending id order.
128 func GetServers(id int64, maxResults int64) ([]*ServerMetric, bool) {
129         return db.get().GetServers(id, maxResults)
130 }
131
132 // GetServerSockets returns a slice of server's (identified by id) normal socket's
133 // SocketMetric, along with a boolean indicating whether there's more sockets to
134 // be queried for.
135 //
136 // The arg startID specifies that only sockets with id at or above it will be
137 // included in the result. The returned slice is up to a length of the arg maxResults
138 // or EntryPerPage if maxResults is zero, and is sorted in ascending id order.
139 func GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) {
140         return db.get().GetServerSockets(id, startID, maxResults)
141 }
142
143 // GetChannel returns the ChannelMetric for the channel (identified by id).
144 func GetChannel(id int64) *ChannelMetric {
145         return db.get().GetChannel(id)
146 }
147
148 // GetSubChannel returns the SubChannelMetric for the subchannel (identified by id).
149 func GetSubChannel(id int64) *SubChannelMetric {
150         return db.get().GetSubChannel(id)
151 }
152
153 // GetSocket returns the SocketInternalMetric for the socket (identified by id).
154 func GetSocket(id int64) *SocketMetric {
155         return db.get().GetSocket(id)
156 }
157
158 // GetServer returns the ServerMetric for the server (identified by id).
159 func GetServer(id int64) *ServerMetric {
160         return db.get().GetServer(id)
161 }
162
163 // RegisterChannel registers the given channel c in channelz database with ref
164 // as its reference name, and add it to the child list of its parent (identified
165 // by pid). pid = 0 means no parent. It returns the unique channelz tracking id
166 // assigned to this channel.
167 func RegisterChannel(c Channel, pid int64, ref string) int64 {
168         id := idGen.genID()
169         cn := &channel{
170                 refName:     ref,
171                 c:           c,
172                 subChans:    make(map[int64]string),
173                 nestedChans: make(map[int64]string),
174                 id:          id,
175                 pid:         pid,
176                 trace:       &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
177         }
178         if pid == 0 {
179                 db.get().addChannel(id, cn, true, pid, ref)
180         } else {
181                 db.get().addChannel(id, cn, false, pid, ref)
182         }
183         return id
184 }
185
186 // RegisterSubChannel registers the given channel c in channelz database with ref
187 // as its reference name, and add it to the child list of its parent (identified
188 // by pid). It returns the unique channelz tracking id assigned to this subchannel.
189 func RegisterSubChannel(c Channel, pid int64, ref string) int64 {
190         if pid == 0 {
191                 grpclog.Error("a SubChannel's parent id cannot be 0")
192                 return 0
193         }
194         id := idGen.genID()
195         sc := &subChannel{
196                 refName: ref,
197                 c:       c,
198                 sockets: make(map[int64]string),
199                 id:      id,
200                 pid:     pid,
201                 trace:   &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
202         }
203         db.get().addSubChannel(id, sc, pid, ref)
204         return id
205 }
206
207 // RegisterServer registers the given server s in channelz database. It returns
208 // the unique channelz tracking id assigned to this server.
209 func RegisterServer(s Server, ref string) int64 {
210         id := idGen.genID()
211         svr := &server{
212                 refName:       ref,
213                 s:             s,
214                 sockets:       make(map[int64]string),
215                 listenSockets: make(map[int64]string),
216                 id:            id,
217         }
218         db.get().addServer(id, svr)
219         return id
220 }
221
222 // RegisterListenSocket registers the given listen socket s in channelz database
223 // with ref as its reference name, and add it to the child list of its parent
224 // (identified by pid). It returns the unique channelz tracking id assigned to
225 // this listen socket.
226 func RegisterListenSocket(s Socket, pid int64, ref string) int64 {
227         if pid == 0 {
228                 grpclog.Error("a ListenSocket's parent id cannot be 0")
229                 return 0
230         }
231         id := idGen.genID()
232         ls := &listenSocket{refName: ref, s: s, id: id, pid: pid}
233         db.get().addListenSocket(id, ls, pid, ref)
234         return id
235 }
236
237 // RegisterNormalSocket registers the given normal socket s in channelz database
238 // with ref as its reference name, and add it to the child list of its parent
239 // (identified by pid). It returns the unique channelz tracking id assigned to
240 // this normal socket.
241 func RegisterNormalSocket(s Socket, pid int64, ref string) int64 {
242         if pid == 0 {
243                 grpclog.Error("a NormalSocket's parent id cannot be 0")
244                 return 0
245         }
246         id := idGen.genID()
247         ns := &normalSocket{refName: ref, s: s, id: id, pid: pid}
248         db.get().addNormalSocket(id, ns, pid, ref)
249         return id
250 }
251
252 // RemoveEntry removes an entry with unique channelz trakcing id to be id from
253 // channelz database.
254 func RemoveEntry(id int64) {
255         db.get().removeEntry(id)
256 }
257
258 // TraceEventDesc is what the caller of AddTraceEvent should provide to describe the event to be added
259 // to the channel trace.
260 // The Parent field is optional. It is used for event that will be recorded in the entity's parent
261 // trace also.
262 type TraceEventDesc struct {
263         Desc     string
264         Severity Severity
265         Parent   *TraceEventDesc
266 }
267
268 // AddTraceEvent adds trace related to the entity with specified id, using the provided TraceEventDesc.
269 func AddTraceEvent(id int64, desc *TraceEventDesc) {
270         if getMaxTraceEntry() == 0 {
271                 return
272         }
273         db.get().traceEvent(id, desc)
274 }
275
276 // channelMap is the storage data structure for channelz.
277 // Methods of channelMap can be divided in two two categories with respect to locking.
278 // 1. Methods acquire the global lock.
279 // 2. Methods that can only be called when global lock is held.
280 // A second type of method need always to be called inside a first type of method.
281 type channelMap struct {
282         mu               sync.RWMutex
283         topLevelChannels map[int64]struct{}
284         servers          map[int64]*server
285         channels         map[int64]*channel
286         subChannels      map[int64]*subChannel
287         listenSockets    map[int64]*listenSocket
288         normalSockets    map[int64]*normalSocket
289 }
290
291 func (c *channelMap) addServer(id int64, s *server) {
292         c.mu.Lock()
293         s.cm = c
294         c.servers[id] = s
295         c.mu.Unlock()
296 }
297
298 func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid int64, ref string) {
299         c.mu.Lock()
300         cn.cm = c
301         cn.trace.cm = c
302         c.channels[id] = cn
303         if isTopChannel {
304                 c.topLevelChannels[id] = struct{}{}
305         } else {
306                 c.findEntry(pid).addChild(id, cn)
307         }
308         c.mu.Unlock()
309 }
310
311 func (c *channelMap) addSubChannel(id int64, sc *subChannel, pid int64, ref string) {
312         c.mu.Lock()
313         sc.cm = c
314         sc.trace.cm = c
315         c.subChannels[id] = sc
316         c.findEntry(pid).addChild(id, sc)
317         c.mu.Unlock()
318 }
319
320 func (c *channelMap) addListenSocket(id int64, ls *listenSocket, pid int64, ref string) {
321         c.mu.Lock()
322         ls.cm = c
323         c.listenSockets[id] = ls
324         c.findEntry(pid).addChild(id, ls)
325         c.mu.Unlock()
326 }
327
328 func (c *channelMap) addNormalSocket(id int64, ns *normalSocket, pid int64, ref string) {
329         c.mu.Lock()
330         ns.cm = c
331         c.normalSockets[id] = ns
332         c.findEntry(pid).addChild(id, ns)
333         c.mu.Unlock()
334 }
335
336 // removeEntry triggers the removal of an entry, which may not indeed delete the entry, if it has to
337 // wait on the deletion of its children and until no other entity's channel trace references it.
338 // It may lead to a chain of entry deletion. For example, deleting the last socket of a gracefully
339 // shutting down server will lead to the server being also deleted.
340 func (c *channelMap) removeEntry(id int64) {
341         c.mu.Lock()
342         c.findEntry(id).triggerDelete()
343         c.mu.Unlock()
344 }
345
346 // c.mu must be held by the caller
347 func (c *channelMap) decrTraceRefCount(id int64) {
348         e := c.findEntry(id)
349         if v, ok := e.(tracedChannel); ok {
350                 v.decrTraceRefCount()
351                 e.deleteSelfIfReady()
352         }
353 }
354
355 // c.mu must be held by the caller.
356 func (c *channelMap) findEntry(id int64) entry {
357         var v entry
358         var ok bool
359         if v, ok = c.channels[id]; ok {
360                 return v
361         }
362         if v, ok = c.subChannels[id]; ok {
363                 return v
364         }
365         if v, ok = c.servers[id]; ok {
366                 return v
367         }
368         if v, ok = c.listenSockets[id]; ok {
369                 return v
370         }
371         if v, ok = c.normalSockets[id]; ok {
372                 return v
373         }
374         return &dummyEntry{idNotFound: id}
375 }
376
377 // c.mu must be held by the caller
378 // deleteEntry simply deletes an entry from the channelMap. Before calling this
379 // method, caller must check this entry is ready to be deleted, i.e removeEntry()
380 // has been called on it, and no children still exist.
381 // Conditionals are ordered by the expected frequency of deletion of each entity
382 // type, in order to optimize performance.
383 func (c *channelMap) deleteEntry(id int64) {
384         var ok bool
385         if _, ok = c.normalSockets[id]; ok {
386                 delete(c.normalSockets, id)
387                 return
388         }
389         if _, ok = c.subChannels[id]; ok {
390                 delete(c.subChannels, id)
391                 return
392         }
393         if _, ok = c.channels[id]; ok {
394                 delete(c.channels, id)
395                 delete(c.topLevelChannels, id)
396                 return
397         }
398         if _, ok = c.listenSockets[id]; ok {
399                 delete(c.listenSockets, id)
400                 return
401         }
402         if _, ok = c.servers[id]; ok {
403                 delete(c.servers, id)
404                 return
405         }
406 }
407
408 func (c *channelMap) traceEvent(id int64, desc *TraceEventDesc) {
409         c.mu.Lock()
410         child := c.findEntry(id)
411         childTC, ok := child.(tracedChannel)
412         if !ok {
413                 c.mu.Unlock()
414                 return
415         }
416         childTC.getChannelTrace().append(&TraceEvent{Desc: desc.Desc, Severity: desc.Severity, Timestamp: time.Now()})
417         if desc.Parent != nil {
418                 parent := c.findEntry(child.getParentID())
419                 var chanType RefChannelType
420                 switch child.(type) {
421                 case *channel:
422                         chanType = RefChannel
423                 case *subChannel:
424                         chanType = RefSubChannel
425                 }
426                 if parentTC, ok := parent.(tracedChannel); ok {
427                         parentTC.getChannelTrace().append(&TraceEvent{
428                                 Desc:      desc.Parent.Desc,
429                                 Severity:  desc.Parent.Severity,
430                                 Timestamp: time.Now(),
431                                 RefID:     id,
432                                 RefName:   childTC.getRefName(),
433                                 RefType:   chanType,
434                         })
435                         childTC.incrTraceRefCount()
436                 }
437         }
438         c.mu.Unlock()
439 }
440
441 type int64Slice []int64
442
443 func (s int64Slice) Len() int           { return len(s) }
444 func (s int64Slice) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
445 func (s int64Slice) Less(i, j int) bool { return s[i] < s[j] }
446
447 func copyMap(m map[int64]string) map[int64]string {
448         n := make(map[int64]string)
449         for k, v := range m {
450                 n[k] = v
451         }
452         return n
453 }
454
455 func min(a, b int64) int64 {
456         if a < b {
457                 return a
458         }
459         return b
460 }
461
462 func (c *channelMap) GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) {
463         if maxResults <= 0 {
464                 maxResults = EntryPerPage
465         }
466         c.mu.RLock()
467         l := int64(len(c.topLevelChannels))
468         ids := make([]int64, 0, l)
469         cns := make([]*channel, 0, min(l, maxResults))
470
471         for k := range c.topLevelChannels {
472                 ids = append(ids, k)
473         }
474         sort.Sort(int64Slice(ids))
475         idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
476         count := int64(0)
477         var end bool
478         var t []*ChannelMetric
479         for i, v := range ids[idx:] {
480                 if count == maxResults {
481                         break
482                 }
483                 if cn, ok := c.channels[v]; ok {
484                         cns = append(cns, cn)
485                         t = append(t, &ChannelMetric{
486                                 NestedChans: copyMap(cn.nestedChans),
487                                 SubChans:    copyMap(cn.subChans),
488                         })
489                         count++
490                 }
491                 if i == len(ids[idx:])-1 {
492                         end = true
493                         break
494                 }
495         }
496         c.mu.RUnlock()
497         if count == 0 {
498                 end = true
499         }
500
501         for i, cn := range cns {
502                 t[i].ChannelData = cn.c.ChannelzMetric()
503                 t[i].ID = cn.id
504                 t[i].RefName = cn.refName
505                 t[i].Trace = cn.trace.dumpData()
506         }
507         return t, end
508 }
509
510 func (c *channelMap) GetServers(id, maxResults int64) ([]*ServerMetric, bool) {
511         if maxResults <= 0 {
512                 maxResults = EntryPerPage
513         }
514         c.mu.RLock()
515         l := int64(len(c.servers))
516         ids := make([]int64, 0, l)
517         ss := make([]*server, 0, min(l, maxResults))
518         for k := range c.servers {
519                 ids = append(ids, k)
520         }
521         sort.Sort(int64Slice(ids))
522         idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
523         count := int64(0)
524         var end bool
525         var s []*ServerMetric
526         for i, v := range ids[idx:] {
527                 if count == maxResults {
528                         break
529                 }
530                 if svr, ok := c.servers[v]; ok {
531                         ss = append(ss, svr)
532                         s = append(s, &ServerMetric{
533                                 ListenSockets: copyMap(svr.listenSockets),
534                         })
535                         count++
536                 }
537                 if i == len(ids[idx:])-1 {
538                         end = true
539                         break
540                 }
541         }
542         c.mu.RUnlock()
543         if count == 0 {
544                 end = true
545         }
546
547         for i, svr := range ss {
548                 s[i].ServerData = svr.s.ChannelzMetric()
549                 s[i].ID = svr.id
550                 s[i].RefName = svr.refName
551         }
552         return s, end
553 }
554
555 func (c *channelMap) GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) {
556         if maxResults <= 0 {
557                 maxResults = EntryPerPage
558         }
559         var svr *server
560         var ok bool
561         c.mu.RLock()
562         if svr, ok = c.servers[id]; !ok {
563                 // server with id doesn't exist.
564                 c.mu.RUnlock()
565                 return nil, true
566         }
567         svrskts := svr.sockets
568         l := int64(len(svrskts))
569         ids := make([]int64, 0, l)
570         sks := make([]*normalSocket, 0, min(l, maxResults))
571         for k := range svrskts {
572                 ids = append(ids, k)
573         }
574         sort.Sort(int64Slice(ids))
575         idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= startID })
576         count := int64(0)
577         var end bool
578         for i, v := range ids[idx:] {
579                 if count == maxResults {
580                         break
581                 }
582                 if ns, ok := c.normalSockets[v]; ok {
583                         sks = append(sks, ns)
584                         count++
585                 }
586                 if i == len(ids[idx:])-1 {
587                         end = true
588                         break
589                 }
590         }
591         c.mu.RUnlock()
592         if count == 0 {
593                 end = true
594         }
595         var s []*SocketMetric
596         for _, ns := range sks {
597                 sm := &SocketMetric{}
598                 sm.SocketData = ns.s.ChannelzMetric()
599                 sm.ID = ns.id
600                 sm.RefName = ns.refName
601                 s = append(s, sm)
602         }
603         return s, end
604 }
605
606 func (c *channelMap) GetChannel(id int64) *ChannelMetric {
607         cm := &ChannelMetric{}
608         var cn *channel
609         var ok bool
610         c.mu.RLock()
611         if cn, ok = c.channels[id]; !ok {
612                 // channel with id doesn't exist.
613                 c.mu.RUnlock()
614                 return nil
615         }
616         cm.NestedChans = copyMap(cn.nestedChans)
617         cm.SubChans = copyMap(cn.subChans)
618         // cn.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of cn.c when
619         // holding the lock to prevent potential data race.
620         chanCopy := cn.c
621         c.mu.RUnlock()
622         cm.ChannelData = chanCopy.ChannelzMetric()
623         cm.ID = cn.id
624         cm.RefName = cn.refName
625         cm.Trace = cn.trace.dumpData()
626         return cm
627 }
628
629 func (c *channelMap) GetSubChannel(id int64) *SubChannelMetric {
630         cm := &SubChannelMetric{}
631         var sc *subChannel
632         var ok bool
633         c.mu.RLock()
634         if sc, ok = c.subChannels[id]; !ok {
635                 // subchannel with id doesn't exist.
636                 c.mu.RUnlock()
637                 return nil
638         }
639         cm.Sockets = copyMap(sc.sockets)
640         // sc.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of sc.c when
641         // holding the lock to prevent potential data race.
642         chanCopy := sc.c
643         c.mu.RUnlock()
644         cm.ChannelData = chanCopy.ChannelzMetric()
645         cm.ID = sc.id
646         cm.RefName = sc.refName
647         cm.Trace = sc.trace.dumpData()
648         return cm
649 }
650
651 func (c *channelMap) GetSocket(id int64) *SocketMetric {
652         sm := &SocketMetric{}
653         c.mu.RLock()
654         if ls, ok := c.listenSockets[id]; ok {
655                 c.mu.RUnlock()
656                 sm.SocketData = ls.s.ChannelzMetric()
657                 sm.ID = ls.id
658                 sm.RefName = ls.refName
659                 return sm
660         }
661         if ns, ok := c.normalSockets[id]; ok {
662                 c.mu.RUnlock()
663                 sm.SocketData = ns.s.ChannelzMetric()
664                 sm.ID = ns.id
665                 sm.RefName = ns.refName
666                 return sm
667         }
668         c.mu.RUnlock()
669         return nil
670 }
671
672 func (c *channelMap) GetServer(id int64) *ServerMetric {
673         sm := &ServerMetric{}
674         var svr *server
675         var ok bool
676         c.mu.RLock()
677         if svr, ok = c.servers[id]; !ok {
678                 c.mu.RUnlock()
679                 return nil
680         }
681         sm.ListenSockets = copyMap(svr.listenSockets)
682         c.mu.RUnlock()
683         sm.ID = svr.id
684         sm.RefName = svr.refName
685         sm.ServerData = svr.s.ChannelzMetric()
686         return sm
687 }
688
689 type idGenerator struct {
690         id int64
691 }
692
693 func (i *idGenerator) reset() {
694         atomic.StoreInt64(&i.id, 0)
695 }
696
697 func (i *idGenerator) genID() int64 {
698         return atomic.AddInt64(&i.id, 1)
699 }