1 // Copyright 2017, OpenCensus Authors
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
23 "go.opencensus.io/metric/metricdata"
24 "go.opencensus.io/metric/metricproducer"
25 "go.opencensus.io/stats"
26 "go.opencensus.io/stats/internal"
27 "go.opencensus.io/tag"
31 defaultWorker = newWorker()
32 go defaultWorker.start()
33 internal.DefaultRecorder = record
36 type measureRef struct {
38 views map[*viewInternal]struct{}
42 measures map[string]*measureRef
43 views map[string]*viewInternal
44 startTimes map[*viewInternal]time.Time
52 var defaultWorker *worker
54 var defaultReportingDuration = 10 * time.Second
56 // Find returns a registered view associated with this name.
57 // If no registered view is found, nil is returned.
58 func Find(name string) (v *View) {
59 req := &getViewByNameReq{
61 c: make(chan *getViewByNameResp),
63 defaultWorker.c <- req
68 // Register begins collecting data for the given views.
69 // Once a view is registered, it reports data to the registered exporters.
70 func Register(views ...*View) error {
71 req := ®isterViewReq{
73 err: make(chan error),
75 defaultWorker.c <- req
79 // Unregister the given views. Data will not longer be exported for these views
80 // after Unregister returns.
81 // It is not necessary to unregister from views you expect to collect for the
82 // duration of your program execution.
83 func Unregister(views ...*View) {
84 names := make([]string, len(views))
85 for i := range views {
86 names[i] = views[i].Name
88 req := &unregisterFromViewReq{
90 done: make(chan struct{}),
92 defaultWorker.c <- req
96 // RetrieveData gets a snapshot of the data collected for the the view registered
97 // with the given name. It is intended for testing only.
98 func RetrieveData(viewName string) ([]*Row, error) {
99 req := &retrieveDataReq{
102 c: make(chan *retrieveDataResp),
104 defaultWorker.c <- req
106 return resp.rows, resp.err
109 func record(tags *tag.Map, ms interface{}, attachments map[string]interface{}) {
112 ms: ms.([]stats.Measurement),
113 attachments: attachments,
116 defaultWorker.c <- req
119 // SetReportingPeriod sets the interval between reporting aggregated views in
120 // the program. If duration is less than or equal to zero, it enables the
123 // Note: each exporter makes different promises about what the lowest supported
124 // duration is. For example, the Stackdriver exporter recommends a value no
125 // lower than 1 minute. Consult each exporter per your needs.
126 func SetReportingPeriod(d time.Duration) {
127 // TODO(acetechnologist): ensure that the duration d is more than a certain
129 req := &setReportingPeriodReq{
133 defaultWorker.c <- req
134 <-req.c // don't return until the timer is set to the new duration.
137 func newWorker() *worker {
139 measures: make(map[string]*measureRef),
140 views: make(map[string]*viewInternal),
141 startTimes: make(map[*viewInternal]time.Time),
142 timer: time.NewTicker(defaultReportingDuration),
143 c: make(chan command, 1024),
144 quit: make(chan bool),
145 done: make(chan bool),
149 func (w *worker) start() {
150 prodMgr := metricproducer.GlobalManager()
151 prodMgr.AddProducer(w)
158 w.reportUsage(time.Now())
168 func (w *worker) stop() {
169 prodMgr := metricproducer.GlobalManager()
170 prodMgr.DeleteProducer(w)
176 func (w *worker) getMeasureRef(name string) *measureRef {
177 if mr, ok := w.measures[name]; ok {
182 views: make(map[*viewInternal]struct{}),
184 w.measures[name] = mr
188 func (w *worker) tryRegisterView(v *View) (*viewInternal, error) {
191 vi, err := newViewInternal(v)
195 if x, ok := w.views[vi.view.Name]; ok {
196 if !x.view.same(vi.view) {
197 return nil, fmt.Errorf("cannot register view %q; a different view with the same name is already registered", v.Name)
200 // the view is already registered so there is nothing to do and the
201 // command is considered successful.
204 w.views[vi.view.Name] = vi
205 ref := w.getMeasureRef(vi.view.Measure.Name())
206 ref.views[vi] = struct{}{}
210 func (w *worker) unregisterView(viewName string) {
213 delete(w.views, viewName)
216 func (w *worker) reportView(v *viewInternal, now time.Time) {
217 if !v.isSubscribed() {
220 rows := v.collectedRows()
221 _, ok := w.startTimes[v]
223 w.startTimes[v] = now
227 Start: w.startTimes[v],
232 for e := range exporters {
233 e.ExportView(viewData)
238 func (w *worker) reportUsage(now time.Time) {
239 for _, v := range w.views {
244 func (w *worker) toMetric(v *viewInternal, now time.Time) *metricdata.Metric {
245 if !v.isSubscribed() {
249 _, ok := w.startTimes[v]
251 w.startTimes[v] = now
254 var startTime time.Time
255 if v.metricDescriptor.Type == metricdata.TypeGaugeInt64 ||
256 v.metricDescriptor.Type == metricdata.TypeGaugeFloat64 {
257 startTime = time.Time{}
259 startTime = w.startTimes[v]
262 return viewToMetric(v, now, startTime)
265 // Read reads all view data and returns them as metrics.
266 // It is typically invoked by metric reader to export stats in metric format.
267 func (w *worker) Read() []*metricdata.Metric {
271 metrics := make([]*metricdata.Metric, 0, len(w.views))
272 for _, v := range w.views {
273 metric := w.toMetric(v, now)
275 metrics = append(metrics, metric)