1 // Copyright 2017, OpenCensus Authors
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
25 ocstats "go.opencensus.io/stats"
26 "go.opencensus.io/stats/view"
27 "go.opencensus.io/tag"
28 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/grpclog"
30 "google.golang.org/grpc/stats"
31 "google.golang.org/grpc/status"
34 type grpcInstrumentationKey string
36 // rpcData holds the instrumentation RPC data that is needed between the start
37 // and end of an call. It holds the info that this package needs to keep track
38 // of between the various GRPC events.
40 // reqCount and respCount has to be the first words
41 // in order to be 64-aligned on 32-bit architectures.
42 sentCount, sentBytes, recvCount, recvBytes int64 // access atomically
44 // startTime represents the time at which TagRPC was invoked at the
45 // beginning of an RPC. It is an appoximation of the time when the
46 // application code invoked GRPC code.
51 // The following variables define the default hard-coded auxiliary data used by
52 // both the default GRPC client and GRPC server metrics.
54 DefaultBytesDistribution = view.Distribution(1024, 2048, 4096, 16384, 65536, 262144, 1048576, 4194304, 16777216, 67108864, 268435456, 1073741824, 4294967296)
55 DefaultMillisecondsDistribution = view.Distribution(0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000)
56 DefaultMessageCountDistribution = view.Distribution(1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536)
59 // Server tags are applied to the context used to process each RPC, as well as
60 // the measures at the end of each RPC.
62 KeyServerMethod, _ = tag.NewKey("grpc_server_method")
63 KeyServerStatus, _ = tag.NewKey("grpc_server_status")
66 // Client tags are applied to measures at the end of each RPC.
68 KeyClientMethod, _ = tag.NewKey("grpc_client_method")
69 KeyClientStatus, _ = tag.NewKey("grpc_client_status")
73 rpcDataKey = grpcInstrumentationKey("opencensus-rpcData")
76 func methodName(fullname string) string {
77 return strings.TrimLeft(fullname, "/")
80 // statsHandleRPC processes the RPC events.
81 func statsHandleRPC(ctx context.Context, s stats.RPCStats) {
82 switch st := s.(type) {
83 case *stats.Begin, *stats.OutHeader, *stats.InHeader, *stats.InTrailer, *stats.OutTrailer:
84 // do nothing for client
85 case *stats.OutPayload:
86 handleRPCOutPayload(ctx, st)
87 case *stats.InPayload:
88 handleRPCInPayload(ctx, st)
92 grpclog.Infof("unexpected stats: %T", st)
96 func handleRPCOutPayload(ctx context.Context, s *stats.OutPayload) {
97 d, ok := ctx.Value(rpcDataKey).(*rpcData)
100 grpclog.Infoln("Failed to retrieve *rpcData from context.")
105 atomic.AddInt64(&d.sentBytes, int64(s.Length))
106 atomic.AddInt64(&d.sentCount, 1)
109 func handleRPCInPayload(ctx context.Context, s *stats.InPayload) {
110 d, ok := ctx.Value(rpcDataKey).(*rpcData)
113 grpclog.Infoln("Failed to retrieve *rpcData from context.")
118 atomic.AddInt64(&d.recvBytes, int64(s.Length))
119 atomic.AddInt64(&d.recvCount, 1)
122 func handleRPCEnd(ctx context.Context, s *stats.End) {
123 d, ok := ctx.Value(rpcDataKey).(*rpcData)
126 grpclog.Infoln("Failed to retrieve *rpcData from context.")
131 elapsedTime := time.Since(d.startTime)
135 s, ok := status.FromError(s.Error)
137 st = statusCodeToString(s)
143 latencyMillis := float64(elapsedTime) / float64(time.Millisecond)
145 ocstats.RecordWithTags(ctx,
147 tag.Upsert(KeyClientMethod, methodName(d.method)),
148 tag.Upsert(KeyClientStatus, st),
150 ClientSentBytesPerRPC.M(atomic.LoadInt64(&d.sentBytes)),
151 ClientSentMessagesPerRPC.M(atomic.LoadInt64(&d.sentCount)),
152 ClientReceivedMessagesPerRPC.M(atomic.LoadInt64(&d.recvCount)),
153 ClientReceivedBytesPerRPC.M(atomic.LoadInt64(&d.recvBytes)),
154 ClientRoundtripLatency.M(latencyMillis))
156 ocstats.RecordWithTags(ctx,
158 tag.Upsert(KeyServerStatus, st),
160 ServerSentBytesPerRPC.M(atomic.LoadInt64(&d.sentBytes)),
161 ServerSentMessagesPerRPC.M(atomic.LoadInt64(&d.sentCount)),
162 ServerReceivedMessagesPerRPC.M(atomic.LoadInt64(&d.recvCount)),
163 ServerReceivedBytesPerRPC.M(atomic.LoadInt64(&d.recvBytes)),
164 ServerLatency.M(latencyMillis))
168 func statusCodeToString(s *status.Status) string {
169 // see https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
170 switch c := s.Code(); c {
177 case codes.InvalidArgument:
178 return "INVALID_ARGUMENT"
179 case codes.DeadlineExceeded:
180 return "DEADLINE_EXCEEDED"
183 case codes.AlreadyExists:
184 return "ALREADY_EXISTS"
185 case codes.PermissionDenied:
186 return "PERMISSION_DENIED"
187 case codes.ResourceExhausted:
188 return "RESOURCE_EXHAUSTED"
189 case codes.FailedPrecondition:
190 return "FAILED_PRECONDITION"
193 case codes.OutOfRange:
194 return "OUT_OF_RANGE"
195 case codes.Unimplemented:
196 return "UNIMPLEMENTED"
199 case codes.Unavailable:
203 case codes.Unauthenticated:
204 return "UNAUTHENTICATED"
206 return "CODE_" + strconv.FormatInt(int64(c), 10)