1 // Copyright 2018, OpenCensus Authors
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
24 "google.golang.org/api/support/bundler"
25 "google.golang.org/grpc"
26 "google.golang.org/grpc/credentials"
27 "google.golang.org/grpc/metadata"
29 "go.opencensus.io/plugin/ocgrpc"
30 "go.opencensus.io/resource"
31 "go.opencensus.io/stats/view"
32 "go.opencensus.io/trace"
34 commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
35 agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1"
36 agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
37 metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
38 resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
39 tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
42 var startupMu sync.Mutex
43 var startTime time.Time
47 startTime = time.Now()
51 var _ trace.Exporter = (*Exporter)(nil)
52 var _ view.Exporter = (*Exporter)(nil)
54 type Exporter struct {
57 // mu protects the non-atomic and non-channel variables
59 // senderMu protects the concurrent unsafe traceExporter client
66 traceExporter agenttracepb.TraceService_ExportClient
67 metricsExporter agentmetricspb.MetricsService_ExportClient
68 nodeInfo *commonpb.Node
69 grpcClientConn *grpc.ClientConn
70 reconnectionPeriod time.Duration
71 resource *resourcepb.Resource
73 headers map[string]string
77 disconnectedCh chan bool
79 backgroundConnectionDoneCh chan bool
81 traceBundler *bundler.Bundler
83 // viewDataBundler is the bundler to enable conversion
84 // from OpenCensus-Go view.Data to metricspb.Metric.
85 // Please do not confuse it with metricsBundler!
86 viewDataBundler *bundler.Bundler
88 clientTransportCredentials credentials.TransportCredentials
91 func NewExporter(opts ...ExporterOption) (*Exporter, error) {
92 exp, err := NewUnstartedExporter(opts...)
96 if err := exp.Start(); err != nil {
102 const spanDataBufferSize = 300
104 func NewUnstartedExporter(opts ...ExporterOption) (*Exporter, error) {
106 for _, opt := range opts {
109 traceBundler := bundler.NewBundler((*trace.SpanData)(nil), func(bundle interface{}) {
110 e.uploadTraces(bundle.([]*trace.SpanData))
112 traceBundler.DelayThreshold = 2 * time.Second
113 traceBundler.BundleCountThreshold = spanDataBufferSize
114 e.traceBundler = traceBundler
116 viewDataBundler := bundler.NewBundler((*view.Data)(nil), func(bundle interface{}) {
117 e.uploadViewData(bundle.([]*view.Data))
119 viewDataBundler.DelayThreshold = 2 * time.Second
120 viewDataBundler.BundleCountThreshold = 500 // TODO: (@odeke-em) make this configurable.
121 e.viewDataBundler = viewDataBundler
122 e.nodeInfo = NodeWithStartTime(e.serviceName)
123 e.resource = resourceProtoFromEnv()
129 maxInitialConfigRetries = 10
130 maxInitialTracesRetries = 10
134 errAlreadyStarted = errors.New("already started")
135 errNotStarted = errors.New("not started")
136 errStopped = errors.New("stopped")
137 errNoConnection = errors.New("no active connection")
140 // Start dials to the agent, establishing a connection to it. It also
141 // initiates the Config and Trace services by sending over the initial
142 // messages that consist of the node identifier. Start invokes a background
143 // connector that will reattempt connections to the agent periodically
144 // if the connection dies.
145 func (ae *Exporter) Start() error {
146 var err = errAlreadyStarted
147 ae.startOnce.Do(func() {
152 ae.disconnectedCh = make(chan bool, 1)
153 ae.stopCh = make(chan bool)
154 ae.backgroundConnectionDoneCh = make(chan bool)
156 ae.setStateDisconnected()
157 go ae.indefiniteBackgroundConnection()
165 func (ae *Exporter) prepareAgentAddress() string {
166 if ae.agentAddress != "" {
167 return ae.agentAddress
169 return fmt.Sprintf("%s:%d", DefaultAgentHost, DefaultAgentPort)
172 func (ae *Exporter) enableConnectionStreams(cc *grpc.ClientConn) error {
174 started := ae.started
175 nodeInfo := ae.nodeInfo
183 // If the previous clientConn was non-nil, close it
184 if ae.grpcClientConn != nil {
185 _ = ae.grpcClientConn.Close()
187 ae.grpcClientConn = cc
190 if err := ae.createTraceServiceConnection(ae.grpcClientConn, nodeInfo); err != nil {
194 return ae.createMetricsServiceConnection(ae.grpcClientConn, nodeInfo)
197 func (ae *Exporter) createTraceServiceConnection(cc *grpc.ClientConn, node *commonpb.Node) error {
198 // Initiate the trace service by sending over node identifier info.
199 traceSvcClient := agenttracepb.NewTraceServiceClient(cc)
200 ctx := context.Background()
201 if len(ae.headers) > 0 {
202 ctx = metadata.NewOutgoingContext(ctx, metadata.New(ae.headers))
204 traceExporter, err := traceSvcClient.Export(ctx)
206 return fmt.Errorf("Exporter.Start:: TraceServiceClient: %v", err)
209 firstTraceMessage := &agenttracepb.ExportTraceServiceRequest{
211 Resource: ae.resource,
213 if err := traceExporter.Send(firstTraceMessage); err != nil {
214 return fmt.Errorf("Exporter.Start:: Failed to initiate the Config service: %v", err)
218 ae.traceExporter = traceExporter
221 // Initiate the config service by sending over node identifier info.
222 configStream, err := traceSvcClient.Config(context.Background())
224 return fmt.Errorf("Exporter.Start:: ConfigStream: %v", err)
226 firstCfgMessage := &agenttracepb.CurrentLibraryConfig{Node: node}
227 if err := configStream.Send(firstCfgMessage); err != nil {
228 return fmt.Errorf("Exporter.Start:: Failed to initiate the Config service: %v", err)
231 // In the background, handle trace configurations that are beamed down
232 // by the agent, but also reply to it with the applied configuration.
233 go ae.handleConfigStreaming(configStream)
238 func (ae *Exporter) createMetricsServiceConnection(cc *grpc.ClientConn, node *commonpb.Node) error {
239 metricsSvcClient := agentmetricspb.NewMetricsServiceClient(cc)
240 metricsExporter, err := metricsSvcClient.Export(context.Background())
242 return fmt.Errorf("MetricsExporter: failed to start the service client: %v", err)
244 // Initiate the metrics service by sending over the first message just containing the Node and Resource.
245 firstMetricsMessage := &agentmetricspb.ExportMetricsServiceRequest{
247 Resource: ae.resource,
249 if err := metricsExporter.Send(firstMetricsMessage); err != nil {
250 return fmt.Errorf("MetricsExporter:: failed to send the first message: %v", err)
254 ae.metricsExporter = metricsExporter
257 // With that we are good to go and can start sending metrics
261 func (ae *Exporter) dialToAgent() (*grpc.ClientConn, error) {
262 addr := ae.prepareAgentAddress()
263 var dialOpts []grpc.DialOption
264 if ae.clientTransportCredentials != nil {
265 dialOpts = append(dialOpts, grpc.WithTransportCredentials(ae.clientTransportCredentials))
266 } else if ae.canDialInsecure {
267 dialOpts = append(dialOpts, grpc.WithInsecure())
269 if ae.compressor != "" {
270 dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(ae.compressor)))
272 dialOpts = append(dialOpts, grpc.WithStatsHandler(&ocgrpc.ClientHandler{}))
274 ctx := context.Background()
275 if len(ae.headers) > 0 {
276 ctx = metadata.NewOutgoingContext(ctx, metadata.New(ae.headers))
278 return grpc.DialContext(ctx, addr, dialOpts...)
281 func (ae *Exporter) handleConfigStreaming(configStream agenttracepb.TraceService_ConfigClient) error {
282 // Note: We haven't yet implemented configuration sending so we
283 // should NOT be changing connection states within this function for now.
285 recv, err := configStream.Recv()
287 // TODO: Check if this is a transient error or exponential backoff-able.
295 // Otherwise now apply the trace configuration sent down from the agent
296 if psamp := cfg.GetProbabilitySampler(); psamp != nil {
297 trace.ApplyConfig(trace.Config{DefaultSampler: trace.ProbabilitySampler(psamp.SamplingProbability)})
298 } else if csamp := cfg.GetConstantSampler(); csamp != nil {
299 alwaysSample := csamp.Decision == tracepb.ConstantSampler_ALWAYS_ON
301 trace.ApplyConfig(trace.Config{DefaultSampler: trace.AlwaysSample()})
303 trace.ApplyConfig(trace.Config{DefaultSampler: trace.NeverSample()})
305 } else { // TODO: Add the rate limiting sampler here
308 // Then finally send back to upstream the newly applied configuration
309 err = configStream.Send(&agenttracepb.CurrentLibraryConfig{Config: &tracepb.TraceConfig{Sampler: cfg.Sampler}})
316 // Stop shuts down all the connections and resources
317 // related to the exporter.
318 func (ae *Exporter) Stop() error {
320 cc := ae.grpcClientConn
321 started := ae.started
322 stopped := ae.stopped
329 // TODO: tell the user that we've already stopped, so perhaps a sentinel error?
335 // Now close the underlying gRPC connection.
341 // At this point we can change the state variables: started and stopped
348 // Ensure that the backgroundConnector returns
349 <-ae.backgroundConnectionDoneCh
354 func (ae *Exporter) ExportSpan(sd *trace.SpanData) {
358 _ = ae.traceBundler.Add(sd, 1)
361 func (ae *Exporter) ExportTraceServiceRequest(batch *agenttracepb.ExportTraceServiceRequest) error {
362 if batch == nil || len(batch.Spans) == 0 {
372 return errNoConnection
376 err := ae.traceExporter.Send(batch)
379 ae.setStateDisconnected()
386 func (ae *Exporter) ExportView(vd *view.Data) {
390 _ = ae.viewDataBundler.Add(vd, 1)
393 func ocSpanDataToPbSpans(sdl []*trace.SpanData) []*tracepb.Span {
397 protoSpans := make([]*tracepb.Span, 0, len(sdl))
398 for _, sd := range sdl {
400 protoSpans = append(protoSpans, ocSpanToProtoSpan(sd))
406 func (ae *Exporter) uploadTraces(sdl []*trace.SpanData) {
416 protoSpans := ocSpanDataToPbSpans(sdl)
417 if len(protoSpans) == 0 {
421 err := ae.traceExporter.Send(&agenttracepb.ExportTraceServiceRequest{
426 ae.setStateDisconnected()
431 func ocViewDataToPbMetrics(vdl []*view.Data) []*metricspb.Metric {
435 metrics := make([]*metricspb.Metric, 0, len(vdl))
436 for _, vd := range vdl {
438 vmetric, err := viewDataToMetric(vd)
439 // TODO: (@odeke-em) somehow report this error, if it is non-nil.
440 if err == nil && vmetric != nil {
441 metrics = append(metrics, vmetric)
448 func (ae *Exporter) uploadViewData(vdl []*view.Data) {
458 protoMetrics := ocViewDataToPbMetrics(vdl)
459 if len(protoMetrics) == 0 {
462 err := ae.metricsExporter.Send(&agentmetricspb.ExportMetricsServiceRequest{
463 Metrics: protoMetrics,
465 // a) Figure out how to derive a Node from the environment
466 // b) Figure out how to derive a Resource from the environment
467 // or better letting users of the exporter configure it.
470 ae.setStateDisconnected()
475 func (ae *Exporter) Flush() {
476 ae.traceBundler.Flush()
477 ae.viewDataBundler.Flush()
480 func resourceProtoFromEnv() *resourcepb.Resource {
481 rs, _ := resource.FromEnv(context.Background())
486 rprs := &resourcepb.Resource{
489 if rs.Labels != nil {
490 rprs.Labels = make(map[string]string)
491 for k, v := range rs.Labels {