Remove BPA from Makefile
[icn.git] / cmd / bpa-operator / vendor / go.opencensus.io / stats / view / worker.go
1 // Copyright 2017, OpenCensus Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 //
15
16 package view
17
18 import (
19         "fmt"
20         "sync"
21         "time"
22
23         "go.opencensus.io/metric/metricdata"
24         "go.opencensus.io/metric/metricproducer"
25         "go.opencensus.io/stats"
26         "go.opencensus.io/stats/internal"
27         "go.opencensus.io/tag"
28 )
29
30 func init() {
31         defaultWorker = newWorker()
32         go defaultWorker.start()
33         internal.DefaultRecorder = record
34 }
35
36 type measureRef struct {
37         measure string
38         views   map[*viewInternal]struct{}
39 }
40
41 type worker struct {
42         measures   map[string]*measureRef
43         views      map[string]*viewInternal
44         startTimes map[*viewInternal]time.Time
45
46         timer      *time.Ticker
47         c          chan command
48         quit, done chan bool
49         mu         sync.RWMutex
50 }
51
52 var defaultWorker *worker
53
54 var defaultReportingDuration = 10 * time.Second
55
56 // Find returns a registered view associated with this name.
57 // If no registered view is found, nil is returned.
58 func Find(name string) (v *View) {
59         req := &getViewByNameReq{
60                 name: name,
61                 c:    make(chan *getViewByNameResp),
62         }
63         defaultWorker.c <- req
64         resp := <-req.c
65         return resp.v
66 }
67
68 // Register begins collecting data for the given views.
69 // Once a view is registered, it reports data to the registered exporters.
70 func Register(views ...*View) error {
71         req := &registerViewReq{
72                 views: views,
73                 err:   make(chan error),
74         }
75         defaultWorker.c <- req
76         return <-req.err
77 }
78
79 // Unregister the given views. Data will not longer be exported for these views
80 // after Unregister returns.
81 // It is not necessary to unregister from views you expect to collect for the
82 // duration of your program execution.
83 func Unregister(views ...*View) {
84         names := make([]string, len(views))
85         for i := range views {
86                 names[i] = views[i].Name
87         }
88         req := &unregisterFromViewReq{
89                 views: names,
90                 done:  make(chan struct{}),
91         }
92         defaultWorker.c <- req
93         <-req.done
94 }
95
96 // RetrieveData gets a snapshot of the data collected for the the view registered
97 // with the given name. It is intended for testing only.
98 func RetrieveData(viewName string) ([]*Row, error) {
99         req := &retrieveDataReq{
100                 now: time.Now(),
101                 v:   viewName,
102                 c:   make(chan *retrieveDataResp),
103         }
104         defaultWorker.c <- req
105         resp := <-req.c
106         return resp.rows, resp.err
107 }
108
109 func record(tags *tag.Map, ms interface{}, attachments map[string]interface{}) {
110         req := &recordReq{
111                 tm:          tags,
112                 ms:          ms.([]stats.Measurement),
113                 attachments: attachments,
114                 t:           time.Now(),
115         }
116         defaultWorker.c <- req
117 }
118
119 // SetReportingPeriod sets the interval between reporting aggregated views in
120 // the program. If duration is less than or equal to zero, it enables the
121 // default behavior.
122 //
123 // Note: each exporter makes different promises about what the lowest supported
124 // duration is. For example, the Stackdriver exporter recommends a value no
125 // lower than 1 minute. Consult each exporter per your needs.
126 func SetReportingPeriod(d time.Duration) {
127         // TODO(acetechnologist): ensure that the duration d is more than a certain
128         // value. e.g. 1s
129         req := &setReportingPeriodReq{
130                 d: d,
131                 c: make(chan bool),
132         }
133         defaultWorker.c <- req
134         <-req.c // don't return until the timer is set to the new duration.
135 }
136
137 func newWorker() *worker {
138         return &worker{
139                 measures:   make(map[string]*measureRef),
140                 views:      make(map[string]*viewInternal),
141                 startTimes: make(map[*viewInternal]time.Time),
142                 timer:      time.NewTicker(defaultReportingDuration),
143                 c:          make(chan command, 1024),
144                 quit:       make(chan bool),
145                 done:       make(chan bool),
146         }
147 }
148
149 func (w *worker) start() {
150         prodMgr := metricproducer.GlobalManager()
151         prodMgr.AddProducer(w)
152
153         for {
154                 select {
155                 case cmd := <-w.c:
156                         cmd.handleCommand(w)
157                 case <-w.timer.C:
158                         w.reportUsage(time.Now())
159                 case <-w.quit:
160                         w.timer.Stop()
161                         close(w.c)
162                         w.done <- true
163                         return
164                 }
165         }
166 }
167
168 func (w *worker) stop() {
169         prodMgr := metricproducer.GlobalManager()
170         prodMgr.DeleteProducer(w)
171
172         w.quit <- true
173         <-w.done
174 }
175
176 func (w *worker) getMeasureRef(name string) *measureRef {
177         if mr, ok := w.measures[name]; ok {
178                 return mr
179         }
180         mr := &measureRef{
181                 measure: name,
182                 views:   make(map[*viewInternal]struct{}),
183         }
184         w.measures[name] = mr
185         return mr
186 }
187
188 func (w *worker) tryRegisterView(v *View) (*viewInternal, error) {
189         w.mu.Lock()
190         defer w.mu.Unlock()
191         vi, err := newViewInternal(v)
192         if err != nil {
193                 return nil, err
194         }
195         if x, ok := w.views[vi.view.Name]; ok {
196                 if !x.view.same(vi.view) {
197                         return nil, fmt.Errorf("cannot register view %q; a different view with the same name is already registered", v.Name)
198                 }
199
200                 // the view is already registered so there is nothing to do and the
201                 // command is considered successful.
202                 return x, nil
203         }
204         w.views[vi.view.Name] = vi
205         ref := w.getMeasureRef(vi.view.Measure.Name())
206         ref.views[vi] = struct{}{}
207         return vi, nil
208 }
209
210 func (w *worker) unregisterView(viewName string) {
211         w.mu.Lock()
212         defer w.mu.Unlock()
213         delete(w.views, viewName)
214 }
215
216 func (w *worker) reportView(v *viewInternal, now time.Time) {
217         if !v.isSubscribed() {
218                 return
219         }
220         rows := v.collectedRows()
221         _, ok := w.startTimes[v]
222         if !ok {
223                 w.startTimes[v] = now
224         }
225         viewData := &Data{
226                 View:  v.view,
227                 Start: w.startTimes[v],
228                 End:   time.Now(),
229                 Rows:  rows,
230         }
231         exportersMu.Lock()
232         for e := range exporters {
233                 e.ExportView(viewData)
234         }
235         exportersMu.Unlock()
236 }
237
238 func (w *worker) reportUsage(now time.Time) {
239         for _, v := range w.views {
240                 w.reportView(v, now)
241         }
242 }
243
244 func (w *worker) toMetric(v *viewInternal, now time.Time) *metricdata.Metric {
245         if !v.isSubscribed() {
246                 return nil
247         }
248
249         _, ok := w.startTimes[v]
250         if !ok {
251                 w.startTimes[v] = now
252         }
253
254         var startTime time.Time
255         if v.metricDescriptor.Type == metricdata.TypeGaugeInt64 ||
256                 v.metricDescriptor.Type == metricdata.TypeGaugeFloat64 {
257                 startTime = time.Time{}
258         } else {
259                 startTime = w.startTimes[v]
260         }
261
262         return viewToMetric(v, now, startTime)
263 }
264
265 // Read reads all view data and returns them as metrics.
266 // It is typically invoked by metric reader to export stats in metric format.
267 func (w *worker) Read() []*metricdata.Metric {
268         w.mu.Lock()
269         defer w.mu.Unlock()
270         now := time.Now()
271         metrics := make([]*metricdata.Metric, 0, len(w.views))
272         for _, v := range w.views {
273                 metric := w.toMetric(v, now)
274                 if metric != nil {
275                         metrics = append(metrics, metric)
276                 }
277         }
278         return metrics
279 }