3 * Copyright 2018 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 // Package channelz defines APIs for enabling channelz service, entry
20 // registration/deletion, and accessing channelz data. It also defines channelz
21 // metric struct formats.
23 // All APIs in this package are experimental.
32 "google.golang.org/grpc/grpclog"
36 defaultMaxTraceEntry int32 = 30
42 // EntryPerPage defines the number of channelz entries to be shown on a web page.
43 EntryPerPage = int64(50)
45 maxTraceEntry = defaultMaxTraceEntry
48 // TurnOn turns on channelz data collection.
52 atomic.StoreInt32(&curState, 1)
56 // IsOn returns whether channelz data collection is on.
58 return atomic.CompareAndSwapInt32(&curState, 1, 1)
61 // SetMaxTraceEntry sets maximum number of trace entry per entity (i.e. channel/subchannel).
62 // Setting it to 0 will disable channel tracing.
63 func SetMaxTraceEntry(i int32) {
64 atomic.StoreInt32(&maxTraceEntry, i)
67 // ResetMaxTraceEntryToDefault resets the maximum number of trace entry per entity to default.
68 func ResetMaxTraceEntryToDefault() {
69 atomic.StoreInt32(&maxTraceEntry, defaultMaxTraceEntry)
72 func getMaxTraceEntry() int {
73 i := atomic.LoadInt32(&maxTraceEntry)
77 // dbWarpper wraps around a reference to internal channelz data storage, and
78 // provide synchronized functionality to set and get the reference.
79 type dbWrapper struct {
84 func (d *dbWrapper) set(db *channelMap) {
90 func (d *dbWrapper) get() *channelMap {
96 // NewChannelzStorage initializes channelz data storage and id generator.
98 // Note: This function is exported for testing purpose only. User should not call
100 func NewChannelzStorage() {
102 topLevelChannels: make(map[int64]struct{}),
103 channels: make(map[int64]*channel),
104 listenSockets: make(map[int64]*listenSocket),
105 normalSockets: make(map[int64]*normalSocket),
106 servers: make(map[int64]*server),
107 subChannels: make(map[int64]*subChannel),
112 // GetTopChannels returns a slice of top channel's ChannelMetric, along with a
113 // boolean indicating whether there's more top channels to be queried for.
115 // The arg id specifies that only top channel with id at or above it will be included
116 // in the result. The returned slice is up to a length of the arg maxResults or
117 // EntryPerPage if maxResults is zero, and is sorted in ascending id order.
118 func GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) {
119 return db.get().GetTopChannels(id, maxResults)
122 // GetServers returns a slice of server's ServerMetric, along with a
123 // boolean indicating whether there's more servers to be queried for.
125 // The arg id specifies that only server with id at or above it will be included
126 // in the result. The returned slice is up to a length of the arg maxResults or
127 // EntryPerPage if maxResults is zero, and is sorted in ascending id order.
128 func GetServers(id int64, maxResults int64) ([]*ServerMetric, bool) {
129 return db.get().GetServers(id, maxResults)
132 // GetServerSockets returns a slice of server's (identified by id) normal socket's
133 // SocketMetric, along with a boolean indicating whether there's more sockets to
136 // The arg startID specifies that only sockets with id at or above it will be
137 // included in the result. The returned slice is up to a length of the arg maxResults
138 // or EntryPerPage if maxResults is zero, and is sorted in ascending id order.
139 func GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) {
140 return db.get().GetServerSockets(id, startID, maxResults)
143 // GetChannel returns the ChannelMetric for the channel (identified by id).
144 func GetChannel(id int64) *ChannelMetric {
145 return db.get().GetChannel(id)
148 // GetSubChannel returns the SubChannelMetric for the subchannel (identified by id).
149 func GetSubChannel(id int64) *SubChannelMetric {
150 return db.get().GetSubChannel(id)
153 // GetSocket returns the SocketInternalMetric for the socket (identified by id).
154 func GetSocket(id int64) *SocketMetric {
155 return db.get().GetSocket(id)
158 // GetServer returns the ServerMetric for the server (identified by id).
159 func GetServer(id int64) *ServerMetric {
160 return db.get().GetServer(id)
163 // RegisterChannel registers the given channel c in channelz database with ref
164 // as its reference name, and add it to the child list of its parent (identified
165 // by pid). pid = 0 means no parent. It returns the unique channelz tracking id
166 // assigned to this channel.
167 func RegisterChannel(c Channel, pid int64, ref string) int64 {
172 subChans: make(map[int64]string),
173 nestedChans: make(map[int64]string),
176 trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
179 db.get().addChannel(id, cn, true, pid, ref)
181 db.get().addChannel(id, cn, false, pid, ref)
186 // RegisterSubChannel registers the given channel c in channelz database with ref
187 // as its reference name, and add it to the child list of its parent (identified
188 // by pid). It returns the unique channelz tracking id assigned to this subchannel.
189 func RegisterSubChannel(c Channel, pid int64, ref string) int64 {
191 grpclog.Error("a SubChannel's parent id cannot be 0")
198 sockets: make(map[int64]string),
201 trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
203 db.get().addSubChannel(id, sc, pid, ref)
207 // RegisterServer registers the given server s in channelz database. It returns
208 // the unique channelz tracking id assigned to this server.
209 func RegisterServer(s Server, ref string) int64 {
214 sockets: make(map[int64]string),
215 listenSockets: make(map[int64]string),
218 db.get().addServer(id, svr)
222 // RegisterListenSocket registers the given listen socket s in channelz database
223 // with ref as its reference name, and add it to the child list of its parent
224 // (identified by pid). It returns the unique channelz tracking id assigned to
225 // this listen socket.
226 func RegisterListenSocket(s Socket, pid int64, ref string) int64 {
228 grpclog.Error("a ListenSocket's parent id cannot be 0")
232 ls := &listenSocket{refName: ref, s: s, id: id, pid: pid}
233 db.get().addListenSocket(id, ls, pid, ref)
237 // RegisterNormalSocket registers the given normal socket s in channelz database
238 // with ref as its reference name, and add it to the child list of its parent
239 // (identified by pid). It returns the unique channelz tracking id assigned to
240 // this normal socket.
241 func RegisterNormalSocket(s Socket, pid int64, ref string) int64 {
243 grpclog.Error("a NormalSocket's parent id cannot be 0")
247 ns := &normalSocket{refName: ref, s: s, id: id, pid: pid}
248 db.get().addNormalSocket(id, ns, pid, ref)
252 // RemoveEntry removes an entry with unique channelz trakcing id to be id from
253 // channelz database.
254 func RemoveEntry(id int64) {
255 db.get().removeEntry(id)
258 // TraceEventDesc is what the caller of AddTraceEvent should provide to describe the event to be added
259 // to the channel trace.
260 // The Parent field is optional. It is used for event that will be recorded in the entity's parent
262 type TraceEventDesc struct {
265 Parent *TraceEventDesc
268 // AddTraceEvent adds trace related to the entity with specified id, using the provided TraceEventDesc.
269 func AddTraceEvent(id int64, desc *TraceEventDesc) {
270 if getMaxTraceEntry() == 0 {
273 db.get().traceEvent(id, desc)
276 // channelMap is the storage data structure for channelz.
277 // Methods of channelMap can be divided in two two categories with respect to locking.
278 // 1. Methods acquire the global lock.
279 // 2. Methods that can only be called when global lock is held.
280 // A second type of method need always to be called inside a first type of method.
281 type channelMap struct {
283 topLevelChannels map[int64]struct{}
284 servers map[int64]*server
285 channels map[int64]*channel
286 subChannels map[int64]*subChannel
287 listenSockets map[int64]*listenSocket
288 normalSockets map[int64]*normalSocket
291 func (c *channelMap) addServer(id int64, s *server) {
298 func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid int64, ref string) {
304 c.topLevelChannels[id] = struct{}{}
306 c.findEntry(pid).addChild(id, cn)
311 func (c *channelMap) addSubChannel(id int64, sc *subChannel, pid int64, ref string) {
315 c.subChannels[id] = sc
316 c.findEntry(pid).addChild(id, sc)
320 func (c *channelMap) addListenSocket(id int64, ls *listenSocket, pid int64, ref string) {
323 c.listenSockets[id] = ls
324 c.findEntry(pid).addChild(id, ls)
328 func (c *channelMap) addNormalSocket(id int64, ns *normalSocket, pid int64, ref string) {
331 c.normalSockets[id] = ns
332 c.findEntry(pid).addChild(id, ns)
336 // removeEntry triggers the removal of an entry, which may not indeed delete the entry, if it has to
337 // wait on the deletion of its children and until no other entity's channel trace references it.
338 // It may lead to a chain of entry deletion. For example, deleting the last socket of a gracefully
339 // shutting down server will lead to the server being also deleted.
340 func (c *channelMap) removeEntry(id int64) {
342 c.findEntry(id).triggerDelete()
346 // c.mu must be held by the caller
347 func (c *channelMap) decrTraceRefCount(id int64) {
349 if v, ok := e.(tracedChannel); ok {
350 v.decrTraceRefCount()
351 e.deleteSelfIfReady()
355 // c.mu must be held by the caller.
356 func (c *channelMap) findEntry(id int64) entry {
359 if v, ok = c.channels[id]; ok {
362 if v, ok = c.subChannels[id]; ok {
365 if v, ok = c.servers[id]; ok {
368 if v, ok = c.listenSockets[id]; ok {
371 if v, ok = c.normalSockets[id]; ok {
374 return &dummyEntry{idNotFound: id}
377 // c.mu must be held by the caller
378 // deleteEntry simply deletes an entry from the channelMap. Before calling this
379 // method, caller must check this entry is ready to be deleted, i.e removeEntry()
380 // has been called on it, and no children still exist.
381 // Conditionals are ordered by the expected frequency of deletion of each entity
382 // type, in order to optimize performance.
383 func (c *channelMap) deleteEntry(id int64) {
385 if _, ok = c.normalSockets[id]; ok {
386 delete(c.normalSockets, id)
389 if _, ok = c.subChannels[id]; ok {
390 delete(c.subChannels, id)
393 if _, ok = c.channels[id]; ok {
394 delete(c.channels, id)
395 delete(c.topLevelChannels, id)
398 if _, ok = c.listenSockets[id]; ok {
399 delete(c.listenSockets, id)
402 if _, ok = c.servers[id]; ok {
403 delete(c.servers, id)
408 func (c *channelMap) traceEvent(id int64, desc *TraceEventDesc) {
410 child := c.findEntry(id)
411 childTC, ok := child.(tracedChannel)
416 childTC.getChannelTrace().append(&TraceEvent{Desc: desc.Desc, Severity: desc.Severity, Timestamp: time.Now()})
417 if desc.Parent != nil {
418 parent := c.findEntry(child.getParentID())
419 var chanType RefChannelType
420 switch child.(type) {
422 chanType = RefChannel
424 chanType = RefSubChannel
426 if parentTC, ok := parent.(tracedChannel); ok {
427 parentTC.getChannelTrace().append(&TraceEvent{
428 Desc: desc.Parent.Desc,
429 Severity: desc.Parent.Severity,
430 Timestamp: time.Now(),
432 RefName: childTC.getRefName(),
435 childTC.incrTraceRefCount()
441 type int64Slice []int64
443 func (s int64Slice) Len() int { return len(s) }
444 func (s int64Slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
445 func (s int64Slice) Less(i, j int) bool { return s[i] < s[j] }
447 func copyMap(m map[int64]string) map[int64]string {
448 n := make(map[int64]string)
449 for k, v := range m {
455 func min(a, b int64) int64 {
462 func (c *channelMap) GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) {
464 maxResults = EntryPerPage
467 l := int64(len(c.topLevelChannels))
468 ids := make([]int64, 0, l)
469 cns := make([]*channel, 0, min(l, maxResults))
471 for k := range c.topLevelChannels {
474 sort.Sort(int64Slice(ids))
475 idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
478 var t []*ChannelMetric
479 for i, v := range ids[idx:] {
480 if count == maxResults {
483 if cn, ok := c.channels[v]; ok {
484 cns = append(cns, cn)
485 t = append(t, &ChannelMetric{
486 NestedChans: copyMap(cn.nestedChans),
487 SubChans: copyMap(cn.subChans),
491 if i == len(ids[idx:])-1 {
501 for i, cn := range cns {
502 t[i].ChannelData = cn.c.ChannelzMetric()
504 t[i].RefName = cn.refName
505 t[i].Trace = cn.trace.dumpData()
510 func (c *channelMap) GetServers(id, maxResults int64) ([]*ServerMetric, bool) {
512 maxResults = EntryPerPage
515 l := int64(len(c.servers))
516 ids := make([]int64, 0, l)
517 ss := make([]*server, 0, min(l, maxResults))
518 for k := range c.servers {
521 sort.Sort(int64Slice(ids))
522 idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
525 var s []*ServerMetric
526 for i, v := range ids[idx:] {
527 if count == maxResults {
530 if svr, ok := c.servers[v]; ok {
532 s = append(s, &ServerMetric{
533 ListenSockets: copyMap(svr.listenSockets),
537 if i == len(ids[idx:])-1 {
547 for i, svr := range ss {
548 s[i].ServerData = svr.s.ChannelzMetric()
550 s[i].RefName = svr.refName
555 func (c *channelMap) GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) {
557 maxResults = EntryPerPage
562 if svr, ok = c.servers[id]; !ok {
563 // server with id doesn't exist.
567 svrskts := svr.sockets
568 l := int64(len(svrskts))
569 ids := make([]int64, 0, l)
570 sks := make([]*normalSocket, 0, min(l, maxResults))
571 for k := range svrskts {
574 sort.Sort(int64Slice(ids))
575 idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= startID })
578 for i, v := range ids[idx:] {
579 if count == maxResults {
582 if ns, ok := c.normalSockets[v]; ok {
583 sks = append(sks, ns)
586 if i == len(ids[idx:])-1 {
595 var s []*SocketMetric
596 for _, ns := range sks {
597 sm := &SocketMetric{}
598 sm.SocketData = ns.s.ChannelzMetric()
600 sm.RefName = ns.refName
606 func (c *channelMap) GetChannel(id int64) *ChannelMetric {
607 cm := &ChannelMetric{}
611 if cn, ok = c.channels[id]; !ok {
612 // channel with id doesn't exist.
616 cm.NestedChans = copyMap(cn.nestedChans)
617 cm.SubChans = copyMap(cn.subChans)
618 // cn.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of cn.c when
619 // holding the lock to prevent potential data race.
622 cm.ChannelData = chanCopy.ChannelzMetric()
624 cm.RefName = cn.refName
625 cm.Trace = cn.trace.dumpData()
629 func (c *channelMap) GetSubChannel(id int64) *SubChannelMetric {
630 cm := &SubChannelMetric{}
634 if sc, ok = c.subChannels[id]; !ok {
635 // subchannel with id doesn't exist.
639 cm.Sockets = copyMap(sc.sockets)
640 // sc.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of sc.c when
641 // holding the lock to prevent potential data race.
644 cm.ChannelData = chanCopy.ChannelzMetric()
646 cm.RefName = sc.refName
647 cm.Trace = sc.trace.dumpData()
651 func (c *channelMap) GetSocket(id int64) *SocketMetric {
652 sm := &SocketMetric{}
654 if ls, ok := c.listenSockets[id]; ok {
656 sm.SocketData = ls.s.ChannelzMetric()
658 sm.RefName = ls.refName
661 if ns, ok := c.normalSockets[id]; ok {
663 sm.SocketData = ns.s.ChannelzMetric()
665 sm.RefName = ns.refName
672 func (c *channelMap) GetServer(id int64) *ServerMetric {
673 sm := &ServerMetric{}
677 if svr, ok = c.servers[id]; !ok {
681 sm.ListenSockets = copyMap(svr.listenSockets)
684 sm.RefName = svr.refName
685 sm.ServerData = svr.s.ChannelzMetric()
689 type idGenerator struct {
693 func (i *idGenerator) reset() {
694 atomic.StoreInt64(&i.id, 0)
697 func (i *idGenerator) genID() int64 {
698 return atomic.AddInt64(&i.id, 1)