Code refactoring for bpa operator
[icn.git] / cmd / bpa-operator / vendor / contrib.go.opencensus.io / exporter / ocagent / ocagent.go
1 // Copyright 2018, OpenCensus Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package ocagent
16
17 import (
18         "context"
19         "errors"
20         "fmt"
21         "sync"
22         "time"
23
24         "google.golang.org/api/support/bundler"
25         "google.golang.org/grpc"
26         "google.golang.org/grpc/credentials"
27         "google.golang.org/grpc/metadata"
28
29         "go.opencensus.io/plugin/ocgrpc"
30         "go.opencensus.io/resource"
31         "go.opencensus.io/stats/view"
32         "go.opencensus.io/trace"
33
34         commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
35         agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1"
36         agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
37         metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
38         resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
39         tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
40 )
41
42 var startupMu sync.Mutex
43 var startTime time.Time
44
45 func init() {
46         startupMu.Lock()
47         startTime = time.Now()
48         startupMu.Unlock()
49 }
50
51 var _ trace.Exporter = (*Exporter)(nil)
52 var _ view.Exporter = (*Exporter)(nil)
53
54 type Exporter struct {
55         connectionState int32
56
57         // mu protects the non-atomic and non-channel variables
58         mu sync.RWMutex
59         // senderMu protects the concurrent unsafe traceExporter client
60         senderMu           sync.RWMutex
61         started            bool
62         stopped            bool
63         agentAddress       string
64         serviceName        string
65         canDialInsecure    bool
66         traceExporter      agenttracepb.TraceService_ExportClient
67         metricsExporter    agentmetricspb.MetricsService_ExportClient
68         nodeInfo           *commonpb.Node
69         grpcClientConn     *grpc.ClientConn
70         reconnectionPeriod time.Duration
71         resource           *resourcepb.Resource
72         compressor         string
73         headers            map[string]string
74
75         startOnce      sync.Once
76         stopCh         chan bool
77         disconnectedCh chan bool
78
79         backgroundConnectionDoneCh chan bool
80
81         traceBundler *bundler.Bundler
82
83         // viewDataBundler is the bundler to enable conversion
84         // from OpenCensus-Go view.Data to metricspb.Metric.
85         // Please do not confuse it with metricsBundler!
86         viewDataBundler *bundler.Bundler
87
88         clientTransportCredentials credentials.TransportCredentials
89 }
90
91 func NewExporter(opts ...ExporterOption) (*Exporter, error) {
92         exp, err := NewUnstartedExporter(opts...)
93         if err != nil {
94                 return nil, err
95         }
96         if err := exp.Start(); err != nil {
97                 return nil, err
98         }
99         return exp, nil
100 }
101
102 const spanDataBufferSize = 300
103
104 func NewUnstartedExporter(opts ...ExporterOption) (*Exporter, error) {
105         e := new(Exporter)
106         for _, opt := range opts {
107                 opt.withExporter(e)
108         }
109         traceBundler := bundler.NewBundler((*trace.SpanData)(nil), func(bundle interface{}) {
110                 e.uploadTraces(bundle.([]*trace.SpanData))
111         })
112         traceBundler.DelayThreshold = 2 * time.Second
113         traceBundler.BundleCountThreshold = spanDataBufferSize
114         e.traceBundler = traceBundler
115
116         viewDataBundler := bundler.NewBundler((*view.Data)(nil), func(bundle interface{}) {
117                 e.uploadViewData(bundle.([]*view.Data))
118         })
119         viewDataBundler.DelayThreshold = 2 * time.Second
120         viewDataBundler.BundleCountThreshold = 500 // TODO: (@odeke-em) make this configurable.
121         e.viewDataBundler = viewDataBundler
122         e.nodeInfo = NodeWithStartTime(e.serviceName)
123         e.resource = resourceProtoFromEnv()
124
125         return e, nil
126 }
127
128 const (
129         maxInitialConfigRetries = 10
130         maxInitialTracesRetries = 10
131 )
132
133 var (
134         errAlreadyStarted = errors.New("already started")
135         errNotStarted     = errors.New("not started")
136         errStopped        = errors.New("stopped")
137         errNoConnection   = errors.New("no active connection")
138 )
139
140 // Start dials to the agent, establishing a connection to it. It also
141 // initiates the Config and Trace services by sending over the initial
142 // messages that consist of the node identifier. Start invokes a background
143 // connector that will reattempt connections to the agent periodically
144 // if the connection dies.
145 func (ae *Exporter) Start() error {
146         var err = errAlreadyStarted
147         ae.startOnce.Do(func() {
148                 ae.mu.Lock()
149                 defer ae.mu.Unlock()
150
151                 ae.started = true
152                 ae.disconnectedCh = make(chan bool, 1)
153                 ae.stopCh = make(chan bool)
154                 ae.backgroundConnectionDoneCh = make(chan bool)
155
156                 ae.setStateDisconnected()
157                 go ae.indefiniteBackgroundConnection()
158
159                 err = nil
160         })
161
162         return err
163 }
164
165 func (ae *Exporter) prepareAgentAddress() string {
166         if ae.agentAddress != "" {
167                 return ae.agentAddress
168         }
169         return fmt.Sprintf("%s:%d", DefaultAgentHost, DefaultAgentPort)
170 }
171
172 func (ae *Exporter) enableConnectionStreams(cc *grpc.ClientConn) error {
173         ae.mu.RLock()
174         started := ae.started
175         nodeInfo := ae.nodeInfo
176         ae.mu.RUnlock()
177
178         if !started {
179                 return errNotStarted
180         }
181
182         ae.mu.Lock()
183         // If the previous clientConn was non-nil, close it
184         if ae.grpcClientConn != nil {
185                 _ = ae.grpcClientConn.Close()
186         }
187         ae.grpcClientConn = cc
188         ae.mu.Unlock()
189
190         if err := ae.createTraceServiceConnection(ae.grpcClientConn, nodeInfo); err != nil {
191                 return err
192         }
193
194         return ae.createMetricsServiceConnection(ae.grpcClientConn, nodeInfo)
195 }
196
197 func (ae *Exporter) createTraceServiceConnection(cc *grpc.ClientConn, node *commonpb.Node) error {
198         // Initiate the trace service by sending over node identifier info.
199         traceSvcClient := agenttracepb.NewTraceServiceClient(cc)
200         ctx := context.Background()
201         if len(ae.headers) > 0 {
202                 ctx = metadata.NewOutgoingContext(ctx, metadata.New(ae.headers))
203         }
204         traceExporter, err := traceSvcClient.Export(ctx)
205         if err != nil {
206                 return fmt.Errorf("Exporter.Start:: TraceServiceClient: %v", err)
207         }
208
209         firstTraceMessage := &agenttracepb.ExportTraceServiceRequest{
210                 Node:     node,
211                 Resource: ae.resource,
212         }
213         if err := traceExporter.Send(firstTraceMessage); err != nil {
214                 return fmt.Errorf("Exporter.Start:: Failed to initiate the Config service: %v", err)
215         }
216
217         ae.mu.Lock()
218         ae.traceExporter = traceExporter
219         ae.mu.Unlock()
220
221         // Initiate the config service by sending over node identifier info.
222         configStream, err := traceSvcClient.Config(context.Background())
223         if err != nil {
224                 return fmt.Errorf("Exporter.Start:: ConfigStream: %v", err)
225         }
226         firstCfgMessage := &agenttracepb.CurrentLibraryConfig{Node: node}
227         if err := configStream.Send(firstCfgMessage); err != nil {
228                 return fmt.Errorf("Exporter.Start:: Failed to initiate the Config service: %v", err)
229         }
230
231         // In the background, handle trace configurations that are beamed down
232         // by the agent, but also reply to it with the applied configuration.
233         go ae.handleConfigStreaming(configStream)
234
235         return nil
236 }
237
238 func (ae *Exporter) createMetricsServiceConnection(cc *grpc.ClientConn, node *commonpb.Node) error {
239         metricsSvcClient := agentmetricspb.NewMetricsServiceClient(cc)
240         metricsExporter, err := metricsSvcClient.Export(context.Background())
241         if err != nil {
242                 return fmt.Errorf("MetricsExporter: failed to start the service client: %v", err)
243         }
244         // Initiate the metrics service by sending over the first message just containing the Node and Resource.
245         firstMetricsMessage := &agentmetricspb.ExportMetricsServiceRequest{
246                 Node:     node,
247                 Resource: ae.resource,
248         }
249         if err := metricsExporter.Send(firstMetricsMessage); err != nil {
250                 return fmt.Errorf("MetricsExporter:: failed to send the first message: %v", err)
251         }
252
253         ae.mu.Lock()
254         ae.metricsExporter = metricsExporter
255         ae.mu.Unlock()
256
257         // With that we are good to go and can start sending metrics
258         return nil
259 }
260
261 func (ae *Exporter) dialToAgent() (*grpc.ClientConn, error) {
262         addr := ae.prepareAgentAddress()
263         var dialOpts []grpc.DialOption
264         if ae.clientTransportCredentials != nil {
265                 dialOpts = append(dialOpts, grpc.WithTransportCredentials(ae.clientTransportCredentials))
266         } else if ae.canDialInsecure {
267                 dialOpts = append(dialOpts, grpc.WithInsecure())
268         }
269         if ae.compressor != "" {
270                 dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(ae.compressor)))
271         }
272         dialOpts = append(dialOpts, grpc.WithStatsHandler(&ocgrpc.ClientHandler{}))
273
274         ctx := context.Background()
275         if len(ae.headers) > 0 {
276                 ctx = metadata.NewOutgoingContext(ctx, metadata.New(ae.headers))
277         }
278         return grpc.DialContext(ctx, addr, dialOpts...)
279 }
280
281 func (ae *Exporter) handleConfigStreaming(configStream agenttracepb.TraceService_ConfigClient) error {
282         // Note: We haven't yet implemented configuration sending so we
283         // should NOT be changing connection states within this function for now.
284         for {
285                 recv, err := configStream.Recv()
286                 if err != nil {
287                         // TODO: Check if this is a transient error or exponential backoff-able.
288                         return err
289                 }
290                 cfg := recv.Config
291                 if cfg == nil {
292                         continue
293                 }
294
295                 // Otherwise now apply the trace configuration sent down from the agent
296                 if psamp := cfg.GetProbabilitySampler(); psamp != nil {
297                         trace.ApplyConfig(trace.Config{DefaultSampler: trace.ProbabilitySampler(psamp.SamplingProbability)})
298                 } else if csamp := cfg.GetConstantSampler(); csamp != nil {
299                         alwaysSample := csamp.Decision == tracepb.ConstantSampler_ALWAYS_ON
300                         if alwaysSample {
301                                 trace.ApplyConfig(trace.Config{DefaultSampler: trace.AlwaysSample()})
302                         } else {
303                                 trace.ApplyConfig(trace.Config{DefaultSampler: trace.NeverSample()})
304                         }
305                 } else { // TODO: Add the rate limiting sampler here
306                 }
307
308                 // Then finally send back to upstream the newly applied configuration
309                 err = configStream.Send(&agenttracepb.CurrentLibraryConfig{Config: &tracepb.TraceConfig{Sampler: cfg.Sampler}})
310                 if err != nil {
311                         return err
312                 }
313         }
314 }
315
316 // Stop shuts down all the connections and resources
317 // related to the exporter.
318 func (ae *Exporter) Stop() error {
319         ae.mu.RLock()
320         cc := ae.grpcClientConn
321         started := ae.started
322         stopped := ae.stopped
323         ae.mu.RUnlock()
324
325         if !started {
326                 return errNotStarted
327         }
328         if stopped {
329                 // TODO: tell the user that we've already stopped, so perhaps a sentinel error?
330                 return nil
331         }
332
333         ae.Flush()
334
335         // Now close the underlying gRPC connection.
336         var err error
337         if cc != nil {
338                 err = cc.Close()
339         }
340
341         // At this point we can change the state variables: started and stopped
342         ae.mu.Lock()
343         ae.started = false
344         ae.stopped = true
345         ae.mu.Unlock()
346         close(ae.stopCh)
347
348         // Ensure that the backgroundConnector returns
349         <-ae.backgroundConnectionDoneCh
350
351         return err
352 }
353
354 func (ae *Exporter) ExportSpan(sd *trace.SpanData) {
355         if sd == nil {
356                 return
357         }
358         _ = ae.traceBundler.Add(sd, 1)
359 }
360
361 func (ae *Exporter) ExportTraceServiceRequest(batch *agenttracepb.ExportTraceServiceRequest) error {
362         if batch == nil || len(batch.Spans) == 0 {
363                 return nil
364         }
365
366         select {
367         case <-ae.stopCh:
368                 return errStopped
369
370         default:
371                 if !ae.connected() {
372                         return errNoConnection
373                 }
374
375                 ae.senderMu.Lock()
376                 err := ae.traceExporter.Send(batch)
377                 ae.senderMu.Unlock()
378                 if err != nil {
379                         ae.setStateDisconnected()
380                         return err
381                 }
382                 return nil
383         }
384 }
385
386 func (ae *Exporter) ExportView(vd *view.Data) {
387         if vd == nil {
388                 return
389         }
390         _ = ae.viewDataBundler.Add(vd, 1)
391 }
392
393 func ocSpanDataToPbSpans(sdl []*trace.SpanData) []*tracepb.Span {
394         if len(sdl) == 0 {
395                 return nil
396         }
397         protoSpans := make([]*tracepb.Span, 0, len(sdl))
398         for _, sd := range sdl {
399                 if sd != nil {
400                         protoSpans = append(protoSpans, ocSpanToProtoSpan(sd))
401                 }
402         }
403         return protoSpans
404 }
405
406 func (ae *Exporter) uploadTraces(sdl []*trace.SpanData) {
407         select {
408         case <-ae.stopCh:
409                 return
410
411         default:
412                 if !ae.connected() {
413                         return
414                 }
415
416                 protoSpans := ocSpanDataToPbSpans(sdl)
417                 if len(protoSpans) == 0 {
418                         return
419                 }
420                 ae.senderMu.Lock()
421                 err := ae.traceExporter.Send(&agenttracepb.ExportTraceServiceRequest{
422                         Spans: protoSpans,
423                 })
424                 ae.senderMu.Unlock()
425                 if err != nil {
426                         ae.setStateDisconnected()
427                 }
428         }
429 }
430
431 func ocViewDataToPbMetrics(vdl []*view.Data) []*metricspb.Metric {
432         if len(vdl) == 0 {
433                 return nil
434         }
435         metrics := make([]*metricspb.Metric, 0, len(vdl))
436         for _, vd := range vdl {
437                 if vd != nil {
438                         vmetric, err := viewDataToMetric(vd)
439                         // TODO: (@odeke-em) somehow report this error, if it is non-nil.
440                         if err == nil && vmetric != nil {
441                                 metrics = append(metrics, vmetric)
442                         }
443                 }
444         }
445         return metrics
446 }
447
448 func (ae *Exporter) uploadViewData(vdl []*view.Data) {
449         select {
450         case <-ae.stopCh:
451                 return
452
453         default:
454                 if !ae.connected() {
455                         return
456                 }
457
458                 protoMetrics := ocViewDataToPbMetrics(vdl)
459                 if len(protoMetrics) == 0 {
460                         return
461                 }
462                 err := ae.metricsExporter.Send(&agentmetricspb.ExportMetricsServiceRequest{
463                         Metrics: protoMetrics,
464                         // TODO:(@odeke-em)
465                         // a) Figure out how to derive a Node from the environment
466                         // b) Figure out how to derive a Resource from the environment
467                         // or better letting users of the exporter configure it.
468                 })
469                 if err != nil {
470                         ae.setStateDisconnected()
471                 }
472         }
473 }
474
475 func (ae *Exporter) Flush() {
476         ae.traceBundler.Flush()
477         ae.viewDataBundler.Flush()
478 }
479
480 func resourceProtoFromEnv() *resourcepb.Resource {
481         rs, _ := resource.FromEnv(context.Background())
482         if rs == nil {
483                 return nil
484         }
485
486         rprs := &resourcepb.Resource{
487                 Type: rs.Type,
488         }
489         if rs.Labels != nil {
490                 rprs.Labels = make(map[string]string)
491                 for k, v := range rs.Labels {
492                         rprs.Labels[k] = v
493                 }
494         }
495         return rprs
496 }