Code refactoring for bpa operator
[icn.git] / cmd / bpa-operator / vendor / k8s.io / client-go / util / workqueue / metrics.go
1 /*
2 Copyright 2016 The Kubernetes Authors.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8     http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 package workqueue
18
19 import (
20         "sync"
21         "time"
22
23         "k8s.io/apimachinery/pkg/util/clock"
24 )
25
26 // This file provides abstractions for setting the provider (e.g., prometheus)
27 // of metrics.
28
29 type queueMetrics interface {
30         add(item t)
31         get(item t)
32         done(item t)
33         updateUnfinishedWork()
34 }
35
36 // GaugeMetric represents a single numerical value that can arbitrarily go up
37 // and down.
38 type GaugeMetric interface {
39         Inc()
40         Dec()
41 }
42
43 // SettableGaugeMetric represents a single numerical value that can arbitrarily go up
44 // and down. (Separate from GaugeMetric to preserve backwards compatibility.)
45 type SettableGaugeMetric interface {
46         Set(float64)
47 }
48
49 // CounterMetric represents a single numerical value that only ever
50 // goes up.
51 type CounterMetric interface {
52         Inc()
53 }
54
55 // SummaryMetric captures individual observations.
56 type SummaryMetric interface {
57         Observe(float64)
58 }
59
60 type noopMetric struct{}
61
62 func (noopMetric) Inc()            {}
63 func (noopMetric) Dec()            {}
64 func (noopMetric) Set(float64)     {}
65 func (noopMetric) Observe(float64) {}
66
67 // defaultQueueMetrics expects the caller to lock before setting any metrics.
68 type defaultQueueMetrics struct {
69         clock clock.Clock
70
71         // current depth of a workqueue
72         depth GaugeMetric
73         // total number of adds handled by a workqueue
74         adds CounterMetric
75         // how long an item stays in a workqueue
76         latency SummaryMetric
77         // how long processing an item from a workqueue takes
78         workDuration         SummaryMetric
79         addTimes             map[t]time.Time
80         processingStartTimes map[t]time.Time
81
82         // how long have current threads been working?
83         unfinishedWorkSeconds   SettableGaugeMetric
84         longestRunningProcessor SettableGaugeMetric
85 }
86
87 func (m *defaultQueueMetrics) add(item t) {
88         if m == nil {
89                 return
90         }
91
92         m.adds.Inc()
93         m.depth.Inc()
94         if _, exists := m.addTimes[item]; !exists {
95                 m.addTimes[item] = m.clock.Now()
96         }
97 }
98
99 func (m *defaultQueueMetrics) get(item t) {
100         if m == nil {
101                 return
102         }
103
104         m.depth.Dec()
105         m.processingStartTimes[item] = m.clock.Now()
106         if startTime, exists := m.addTimes[item]; exists {
107                 m.latency.Observe(m.sinceInMicroseconds(startTime))
108                 delete(m.addTimes, item)
109         }
110 }
111
112 func (m *defaultQueueMetrics) done(item t) {
113         if m == nil {
114                 return
115         }
116
117         if startTime, exists := m.processingStartTimes[item]; exists {
118                 m.workDuration.Observe(m.sinceInMicroseconds(startTime))
119                 delete(m.processingStartTimes, item)
120         }
121 }
122
123 func (m *defaultQueueMetrics) updateUnfinishedWork() {
124         // Note that a summary metric would be better for this, but prometheus
125         // doesn't seem to have non-hacky ways to reset the summary metrics.
126         var total float64
127         var oldest float64
128         for _, t := range m.processingStartTimes {
129                 age := m.sinceInMicroseconds(t)
130                 total += age
131                 if age > oldest {
132                         oldest = age
133                 }
134         }
135         // Convert to seconds; microseconds is unhelpfully granular for this.
136         total /= 1000000
137         m.unfinishedWorkSeconds.Set(total)
138         m.longestRunningProcessor.Set(oldest) // in microseconds.
139 }
140
141 type noMetrics struct{}
142
143 func (noMetrics) add(item t)            {}
144 func (noMetrics) get(item t)            {}
145 func (noMetrics) done(item t)           {}
146 func (noMetrics) updateUnfinishedWork() {}
147
148 // Gets the time since the specified start in microseconds.
149 func (m *defaultQueueMetrics) sinceInMicroseconds(start time.Time) float64 {
150         return float64(m.clock.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds())
151 }
152
153 type retryMetrics interface {
154         retry()
155 }
156
157 type defaultRetryMetrics struct {
158         retries CounterMetric
159 }
160
161 func (m *defaultRetryMetrics) retry() {
162         if m == nil {
163                 return
164         }
165
166         m.retries.Inc()
167 }
168
169 // MetricsProvider generates various metrics used by the queue.
170 type MetricsProvider interface {
171         NewDepthMetric(name string) GaugeMetric
172         NewAddsMetric(name string) CounterMetric
173         NewLatencyMetric(name string) SummaryMetric
174         NewWorkDurationMetric(name string) SummaryMetric
175         NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric
176         NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric
177         NewRetriesMetric(name string) CounterMetric
178 }
179
180 type noopMetricsProvider struct{}
181
182 func (_ noopMetricsProvider) NewDepthMetric(name string) GaugeMetric {
183         return noopMetric{}
184 }
185
186 func (_ noopMetricsProvider) NewAddsMetric(name string) CounterMetric {
187         return noopMetric{}
188 }
189
190 func (_ noopMetricsProvider) NewLatencyMetric(name string) SummaryMetric {
191         return noopMetric{}
192 }
193
194 func (_ noopMetricsProvider) NewWorkDurationMetric(name string) SummaryMetric {
195         return noopMetric{}
196 }
197
198 func (_ noopMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric {
199         return noopMetric{}
200 }
201
202 func (_ noopMetricsProvider) NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric {
203         return noopMetric{}
204 }
205
206 func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric {
207         return noopMetric{}
208 }
209
210 var globalMetricsFactory = queueMetricsFactory{
211         metricsProvider: noopMetricsProvider{},
212 }
213
214 type queueMetricsFactory struct {
215         metricsProvider MetricsProvider
216
217         onlyOnce sync.Once
218 }
219
220 func (f *queueMetricsFactory) setProvider(mp MetricsProvider) {
221         f.onlyOnce.Do(func() {
222                 f.metricsProvider = mp
223         })
224 }
225
226 func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) queueMetrics {
227         mp := f.metricsProvider
228         if len(name) == 0 || mp == (noopMetricsProvider{}) {
229                 return noMetrics{}
230         }
231         return &defaultQueueMetrics{
232                 clock:                   clock,
233                 depth:                   mp.NewDepthMetric(name),
234                 adds:                    mp.NewAddsMetric(name),
235                 latency:                 mp.NewLatencyMetric(name),
236                 workDuration:            mp.NewWorkDurationMetric(name),
237                 unfinishedWorkSeconds:   mp.NewUnfinishedWorkSecondsMetric(name),
238                 longestRunningProcessor: mp.NewLongestRunningProcessorMicrosecondsMetric(name),
239                 addTimes:                map[t]time.Time{},
240                 processingStartTimes:    map[t]time.Time{},
241         }
242 }
243
244 func newRetryMetrics(name string) retryMetrics {
245         var ret *defaultRetryMetrics
246         if len(name) == 0 {
247                 return ret
248         }
249         return &defaultRetryMetrics{
250                 retries: globalMetricsFactory.metricsProvider.NewRetriesMetric(name),
251         }
252 }
253
254 // SetProvider sets the metrics provider for all subsequently created work
255 // queues. Only the first call has an effect.
256 func SetProvider(metricsProvider MetricsProvider) {
257         globalMetricsFactory.setProvider(metricsProvider)
258 }