Code refactoring for bpa operator
[icn.git] / cmd / bpa-operator / vendor / go.opencensus.io / stats / view / worker_commands.go
1 // Copyright 2017, OpenCensus Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 //
15
16 package view
17
18 import (
19         "errors"
20         "fmt"
21         "strings"
22         "time"
23
24         "go.opencensus.io/stats"
25         "go.opencensus.io/stats/internal"
26         "go.opencensus.io/tag"
27 )
28
29 type command interface {
30         handleCommand(w *worker)
31 }
32
33 // getViewByNameReq is the command to get a view given its name.
34 type getViewByNameReq struct {
35         name string
36         c    chan *getViewByNameResp
37 }
38
39 type getViewByNameResp struct {
40         v *View
41 }
42
43 func (cmd *getViewByNameReq) handleCommand(w *worker) {
44         v := w.views[cmd.name]
45         if v == nil {
46                 cmd.c <- &getViewByNameResp{nil}
47                 return
48         }
49         cmd.c <- &getViewByNameResp{v.view}
50 }
51
52 // registerViewReq is the command to register a view.
53 type registerViewReq struct {
54         views []*View
55         err   chan error
56 }
57
58 func (cmd *registerViewReq) handleCommand(w *worker) {
59         for _, v := range cmd.views {
60                 if err := v.canonicalize(); err != nil {
61                         cmd.err <- err
62                         return
63                 }
64         }
65         var errstr []string
66         for _, view := range cmd.views {
67                 vi, err := w.tryRegisterView(view)
68                 if err != nil {
69                         errstr = append(errstr, fmt.Sprintf("%s: %v", view.Name, err))
70                         continue
71                 }
72                 internal.SubscriptionReporter(view.Measure.Name())
73                 vi.subscribe()
74         }
75         if len(errstr) > 0 {
76                 cmd.err <- errors.New(strings.Join(errstr, "\n"))
77         } else {
78                 cmd.err <- nil
79         }
80 }
81
82 // unregisterFromViewReq is the command to unregister to a view. Has no
83 // impact on the data collection for client that are pulling data from the
84 // library.
85 type unregisterFromViewReq struct {
86         views []string
87         done  chan struct{}
88 }
89
90 func (cmd *unregisterFromViewReq) handleCommand(w *worker) {
91         for _, name := range cmd.views {
92                 vi, ok := w.views[name]
93                 if !ok {
94                         continue
95                 }
96
97                 // Report pending data for this view before removing it.
98                 w.reportView(vi, time.Now())
99
100                 vi.unsubscribe()
101                 if !vi.isSubscribed() {
102                         // this was the last subscription and view is not collecting anymore.
103                         // The collected data can be cleared.
104                         vi.clearRows()
105                 }
106                 w.unregisterView(name)
107         }
108         cmd.done <- struct{}{}
109 }
110
111 // retrieveDataReq is the command to retrieve data for a view.
112 type retrieveDataReq struct {
113         now time.Time
114         v   string
115         c   chan *retrieveDataResp
116 }
117
118 type retrieveDataResp struct {
119         rows []*Row
120         err  error
121 }
122
123 func (cmd *retrieveDataReq) handleCommand(w *worker) {
124         vi, ok := w.views[cmd.v]
125         if !ok {
126                 cmd.c <- &retrieveDataResp{
127                         nil,
128                         fmt.Errorf("cannot retrieve data; view %q is not registered", cmd.v),
129                 }
130                 return
131         }
132
133         if !vi.isSubscribed() {
134                 cmd.c <- &retrieveDataResp{
135                         nil,
136                         fmt.Errorf("cannot retrieve data; view %q has no subscriptions or collection is not forcibly started", cmd.v),
137                 }
138                 return
139         }
140         cmd.c <- &retrieveDataResp{
141                 vi.collectedRows(),
142                 nil,
143         }
144 }
145
146 // recordReq is the command to record data related to multiple measures
147 // at once.
148 type recordReq struct {
149         tm          *tag.Map
150         ms          []stats.Measurement
151         attachments map[string]interface{}
152         t           time.Time
153 }
154
155 func (cmd *recordReq) handleCommand(w *worker) {
156         for _, m := range cmd.ms {
157                 if (m == stats.Measurement{}) { // not registered
158                         continue
159                 }
160                 ref := w.getMeasureRef(m.Measure().Name())
161                 for v := range ref.views {
162                         v.addSample(cmd.tm, m.Value(), cmd.attachments, time.Now())
163                 }
164         }
165 }
166
167 // setReportingPeriodReq is the command to modify the duration between
168 // reporting the collected data to the registered clients.
169 type setReportingPeriodReq struct {
170         d time.Duration
171         c chan bool
172 }
173
174 func (cmd *setReportingPeriodReq) handleCommand(w *worker) {
175         w.timer.Stop()
176         if cmd.d <= 0 {
177                 w.timer = time.NewTicker(defaultReportingDuration)
178         } else {
179                 w.timer = time.NewTicker(cmd.d)
180         }
181         cmd.c <- true
182 }