3 * Copyright 2014 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
30 "golang.org/x/net/trace"
31 "google.golang.org/grpc/balancer"
32 "google.golang.org/grpc/codes"
33 "google.golang.org/grpc/connectivity"
34 "google.golang.org/grpc/encoding"
35 "google.golang.org/grpc/grpclog"
36 "google.golang.org/grpc/internal/binarylog"
37 "google.golang.org/grpc/internal/channelz"
38 "google.golang.org/grpc/internal/grpcrand"
39 "google.golang.org/grpc/internal/transport"
40 "google.golang.org/grpc/metadata"
41 "google.golang.org/grpc/peer"
42 "google.golang.org/grpc/stats"
43 "google.golang.org/grpc/status"
46 // StreamHandler defines the handler called by gRPC server to complete the
47 // execution of a streaming RPC. If a StreamHandler returns an error, it
48 // should be produced by the status package, or else gRPC will use
49 // codes.Unknown as the status code and err.Error() as the status message
51 type StreamHandler func(srv interface{}, stream ServerStream) error
53 // StreamDesc represents a streaming RPC service's method specification.
54 type StreamDesc struct {
58 // At least one of these is true.
63 // Stream defines the common interface a client or server stream has to satisfy.
65 // Deprecated: See ClientStream and ServerStream documentation instead.
66 type Stream interface {
67 // Deprecated: See ClientStream and ServerStream documentation instead.
68 Context() context.Context
69 // Deprecated: See ClientStream and ServerStream documentation instead.
70 SendMsg(m interface{}) error
71 // Deprecated: See ClientStream and ServerStream documentation instead.
72 RecvMsg(m interface{}) error
75 // ClientStream defines the client-side behavior of a streaming RPC.
77 // All errors returned from ClientStream methods are compatible with the
79 type ClientStream interface {
80 // Header returns the header metadata received from the server if there
81 // is any. It blocks if the metadata is not ready to read.
82 Header() (metadata.MD, error)
83 // Trailer returns the trailer metadata from the server, if there is any.
84 // It must only be called after stream.CloseAndRecv has returned, or
85 // stream.Recv has returned a non-nil error (including io.EOF).
87 // CloseSend closes the send direction of the stream. It closes the stream
88 // when non-nil error is met. It is also not safe to call CloseSend
89 // concurrently with SendMsg.
91 // Context returns the context for this stream.
93 // It should not be called until after Header or RecvMsg has returned. Once
94 // called, subsequent client-side retries are disabled.
95 Context() context.Context
96 // SendMsg is generally called by generated code. On error, SendMsg aborts
97 // the stream. If the error was generated by the client, the status is
98 // returned directly; otherwise, io.EOF is returned and the status of
99 // the stream may be discovered using RecvMsg.
101 // SendMsg blocks until:
102 // - There is sufficient flow control to schedule m with the transport, or
103 // - The stream is done, or
104 // - The stream breaks.
106 // SendMsg does not wait until the message is received by the server. An
107 // untimely stream closure may result in lost messages. To ensure delivery,
108 // users should ensure the RPC completed successfully using RecvMsg.
110 // It is safe to have a goroutine calling SendMsg and another goroutine
111 // calling RecvMsg on the same stream at the same time, but it is not safe
112 // to call SendMsg on the same stream in different goroutines. It is also
113 // not safe to call CloseSend concurrently with SendMsg.
114 SendMsg(m interface{}) error
115 // RecvMsg blocks until it receives a message into m or the stream is
116 // done. It returns io.EOF when the stream completes successfully. On
117 // any other error, the stream is aborted and the error contains the RPC
120 // It is safe to have a goroutine calling SendMsg and another goroutine
121 // calling RecvMsg on the same stream at the same time, but it is not
122 // safe to call RecvMsg on the same stream in different goroutines.
123 RecvMsg(m interface{}) error
126 // NewStream creates a new Stream for the client side. This is typically
127 // called by generated code. ctx is used for the lifetime of the stream.
129 // To ensure resources are not leaked due to the stream returned, one of the following
130 // actions must be performed:
132 // 1. Call Close on the ClientConn.
133 // 2. Cancel the context provided.
134 // 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
135 // client-streaming RPC, for instance, might use the helper function
136 // CloseAndRecv (note that CloseSend does not Recv, therefore is not
137 // guaranteed to release all resources).
138 // 4. Receive a non-nil, non-io.EOF error from Header or SendMsg.
140 // If none of the above happen, a goroutine and a context will be leaked, and grpc
141 // will not call the optionally-configured stats handler with a stats.End message.
142 func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
143 // allow interceptor to see all applicable call options, which means those
144 // configured as defaults from dial option as well as per-call options
145 opts = combine(cc.dopts.callOptions, opts)
147 if cc.dopts.streamInt != nil {
148 return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
150 return newClientStream(ctx, desc, cc, method, opts...)
153 // NewClientStream is a wrapper for ClientConn.NewStream.
154 func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
155 return cc.NewStream(ctx, desc, method, opts...)
158 func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
160 cc.incrCallsStarted()
167 c := defaultCallInfo()
168 // Provide an opportunity for the first RPC to see the first service config
169 // provided by the resolver.
170 if err := cc.waitForResolvedAddrs(ctx); err != nil {
173 mc := cc.GetMethodConfig(method)
174 if mc.WaitForReady != nil {
175 c.failFast = !*mc.WaitForReady
178 // Possible context leak:
179 // The cancel function for the child context we create will only be called
180 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
181 // an error is generated by SendMsg.
182 // https://github.com/grpc/grpc-go/issues/1818.
183 var cancel context.CancelFunc
184 if mc.Timeout != nil && *mc.Timeout >= 0 {
185 ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
187 ctx, cancel = context.WithCancel(ctx)
195 for _, o := range opts {
196 if err := o.before(c); err != nil {
197 return nil, toRPCErr(err)
200 c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize)
201 c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
202 if err := setCallInfoCodec(c); err != nil {
206 callHdr := &transport.CallHdr{
209 ContentSubtype: c.contentSubtype,
212 // Set our outgoing compression according to the UseCompressor CallOption, if
213 // set. In that case, also find the compressor from the encoding package.
214 // Otherwise, use the compressor configured by the WithCompressor DialOption,
217 var comp encoding.Compressor
218 if ct := c.compressorType; ct != "" {
219 callHdr.SendCompress = ct
220 if ct != encoding.Identity {
221 comp = encoding.GetCompressor(ct)
223 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
226 } else if cc.dopts.cp != nil {
227 callHdr.SendCompress = cc.dopts.cp.Type()
231 callHdr.Creds = c.creds
235 trInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
236 trInfo.firstLine.client = true
237 if deadline, ok := ctx.Deadline(); ok {
238 trInfo.firstLine.deadline = time.Until(deadline)
240 trInfo.tr.LazyLog(&trInfo.firstLine, false)
241 ctx = trace.NewContext(ctx, trInfo.tr)
243 ctx = newContextWithRPCInfo(ctx, c.failFast)
244 sh := cc.dopts.copts.StatsHandler
245 var beginTime time.Time
247 ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast})
248 beginTime = time.Now()
249 begin := &stats.Begin{
251 BeginTime: beginTime,
252 FailFast: c.failFast,
254 sh.HandleRPC(ctx, begin)
269 beginTime: beginTime,
272 if !cc.dopts.disableRetry {
273 cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
275 cs.binlog = binarylog.GetMethodLogger(method)
277 cs.callInfo.stream = cs
278 // Only this initial attempt has stats/tracing.
279 // TODO(dfawley): move to newAttempt when per-attempt stats are implemented.
280 if err := cs.newAttemptLocked(sh, trInfo); err != nil {
285 op := func(a *csAttempt) error { return a.newStream() }
286 if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
291 if cs.binlog != nil {
292 md, _ := metadata.FromOutgoingContext(ctx)
293 logEntry := &binarylog.ClientHeader{
297 Authority: cs.cc.authority,
299 if deadline, ok := ctx.Deadline(); ok {
300 logEntry.Timeout = time.Until(deadline)
301 if logEntry.Timeout < 0 {
305 cs.binlog.Log(logEntry)
308 if desc != unaryStreamDesc {
309 // Listen on cc and stream contexts to cleanup when the user closes the
310 // ClientConn or cancels the stream context. In all other cases, an error
311 // should already be injected into the recv buffer by the transport, which
312 // the client will eventually receive, and then we will cancel the stream's
313 // context in clientStream.finish.
316 case <-cc.ctx.Done():
317 cs.finish(ErrClientConnClosing)
319 cs.finish(toRPCErr(ctx.Err()))
326 func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo traceInfo) error {
327 cs.attempt = &csAttempt{
334 if err := cs.ctx.Err(); err != nil {
337 t, done, err := cs.cc.getTransport(cs.ctx, cs.callInfo.failFast, cs.callHdr.Method)
342 cs.attempt.done = done
346 func (a *csAttempt) newStream() error {
348 cs.callHdr.PreviousAttempts = cs.numRetries
349 s, err := a.t.NewStream(cs.ctx, cs.callHdr)
354 cs.attempt.p = &parser{r: s}
358 // clientStream implements a client side Stream.
359 type clientStream struct {
360 callHdr *transport.CallHdr
368 comp encoding.Compressor
370 cancel context.CancelFunc // cancels all attempts
372 sentLast bool // sent an end stream
375 methodConfig *MethodConfig
377 ctx context.Context // the application's context, wrapped by stats/tracing
379 retryThrottler *retryThrottler // The throttler active when the RPC began.
381 binlog *binarylog.MethodLogger // Binary logger, can be nil.
382 // serverHeaderBinlogged is a boolean for whether server header has been
383 // logged. Server header will be logged when the first time one of those
384 // happens: stream.Header(), stream.Recv().
386 // It's only read and used by Recv() and Header(), so it doesn't need to be
388 serverHeaderBinlogged bool
391 firstAttempt bool // if true, transparent retry is valid
392 numRetries int // exclusive of transparent retry attempt(s)
393 numRetriesSincePushback int // retries since pushback; to reset backoff
394 finished bool // TODO: replace with atomic cmpxchg or sync.Once?
395 attempt *csAttempt // the active client stream attempt
396 // TODO(hedging): hedging will have multiple attempts simultaneously.
397 committed bool // active attempt committed for retry?
398 buffer []func(a *csAttempt) error // operations to replay on retry
399 bufferSize int // current size of buffer
402 // csAttempt implements a single transport stream attempt within a
404 type csAttempt struct {
406 t transport.ClientTransport
409 done func(balancer.DoneInfo)
413 decomp encoding.Compressor
416 mu sync.Mutex // guards trInfo.tr
417 // trInfo.tr is set when created (if EnableTracing is true),
418 // and cleared when the finish method is called.
421 statsHandler stats.Handler
424 func (cs *clientStream) commitAttemptLocked() {
429 func (cs *clientStream) commitAttempt() {
431 cs.commitAttemptLocked()
435 // shouldRetry returns nil if the RPC should be retried; otherwise it returns
436 // the error that should be returned by the operation.
437 func (cs *clientStream) shouldRetry(err error) error {
438 if cs.attempt.s == nil && !cs.callInfo.failFast {
439 // In the event of any error from NewStream (attempt.s == nil), we
440 // never attempted to write anything to the wire, so we can retry
441 // indefinitely for non-fail-fast RPCs.
444 if cs.finished || cs.committed {
445 // RPC is finished or committed; cannot retry.
448 // Wait for the trailers.
449 if cs.attempt.s != nil {
450 <-cs.attempt.s.Done()
452 if cs.firstAttempt && !cs.callInfo.failFast && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) {
453 // First attempt, wait-for-ready, stream unprocessed: transparently retry.
454 cs.firstAttempt = false
457 cs.firstAttempt = false
458 if cs.cc.dopts.disableRetry {
464 if cs.attempt.s != nil {
465 if to, toErr := cs.attempt.s.TrailersOnly(); toErr != nil || !to {
469 // TODO(retry): Move down if the spec changes to not check server pushback
470 // before considering this a failure for throttling.
471 sps := cs.attempt.s.Trailer()["grpc-retry-pushback-ms"]
474 if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
475 grpclog.Infof("Server retry pushback specified to abort (%q).", sps[0])
476 cs.retryThrottler.throttle() // This counts as a failure for throttling.
480 } else if len(sps) > 1 {
481 grpclog.Warningf("Server retry pushback specified multiple values (%q); not retrying.", sps)
482 cs.retryThrottler.throttle() // This counts as a failure for throttling.
488 if cs.attempt.s != nil {
489 code = cs.attempt.s.Status().Code()
491 code = status.Convert(err).Code()
494 rp := cs.methodConfig.retryPolicy
495 if rp == nil || !rp.retryableStatusCodes[code] {
499 // Note: the ordering here is important; we count this as a failure
500 // only if the code matched a retryable code.
501 if cs.retryThrottler.throttle() {
504 if cs.numRetries+1 >= rp.maxAttempts {
508 var dur time.Duration
510 dur = time.Millisecond * time.Duration(pushback)
511 cs.numRetriesSincePushback = 0
513 fact := math.Pow(rp.backoffMultiplier, float64(cs.numRetriesSincePushback))
514 cur := float64(rp.initialBackoff) * fact
515 if max := float64(rp.maxBackoff); cur > max {
518 dur = time.Duration(grpcrand.Int63n(int64(cur)))
519 cs.numRetriesSincePushback++
522 // TODO(dfawley): we could eagerly fail here if dur puts us past the
523 // deadline, but unsure if it is worth doing.
524 t := time.NewTimer(dur)
529 case <-cs.ctx.Done():
531 return status.FromContextError(cs.ctx.Err()).Err()
535 // Returns nil if a retry was performed and succeeded; error otherwise.
536 func (cs *clientStream) retryLocked(lastErr error) error {
538 cs.attempt.finish(lastErr)
539 if err := cs.shouldRetry(lastErr); err != nil {
540 cs.commitAttemptLocked()
543 if err := cs.newAttemptLocked(nil, traceInfo{}); err != nil {
546 if lastErr = cs.replayBufferLocked(); lastErr == nil {
552 func (cs *clientStream) Context() context.Context {
554 // No need to lock before using attempt, since we know it is committed and
556 return cs.attempt.s.Context()
559 func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
564 return op(cs.attempt)
571 // We started another attempt already.
577 if err == nil || (err == io.EOF && a.s.Status().Code() == codes.OK) {
582 if err := cs.retryLocked(err); err != nil {
589 func (cs *clientStream) Header() (metadata.MD, error) {
591 err := cs.withRetry(func(a *csAttempt) error {
593 m, err = a.s.Header()
595 }, cs.commitAttemptLocked)
600 if cs.binlog != nil && !cs.serverHeaderBinlogged {
601 // Only log if binary log is on and header has not been logged.
602 logEntry := &binarylog.ServerHeader{
607 if peer, ok := peer.FromContext(cs.Context()); ok {
608 logEntry.PeerAddr = peer.Addr
610 cs.binlog.Log(logEntry)
611 cs.serverHeaderBinlogged = true
616 func (cs *clientStream) Trailer() metadata.MD {
617 // On RPC failure, we never need to retry, because usage requires that
618 // RecvMsg() returned a non-nil error before calling this function is valid.
619 // We would have retried earlier if necessary.
621 // Commit the attempt anyway, just in case users are not following those
622 // directions -- it will prevent races and should not meaningfully impact
625 if cs.attempt.s == nil {
628 return cs.attempt.s.Trailer()
631 func (cs *clientStream) replayBufferLocked() error {
633 for _, f := range cs.buffer {
634 if err := f(a); err != nil {
641 func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error) {
642 // Note: we still will buffer if retry is disabled (for transparent retries).
647 if cs.bufferSize > cs.callInfo.maxRetryRPCBufferSize {
648 cs.commitAttemptLocked()
651 cs.buffer = append(cs.buffer, op)
654 func (cs *clientStream) SendMsg(m interface{}) (err error) {
656 if err != nil && err != io.EOF {
657 // Call finish on the client stream for errors generated by this SendMsg
658 // call, as these indicate problems created by this client. (Transport
659 // errors are converted to an io.EOF error in csAttempt.sendMsg; the real
660 // error will be returned from RecvMsg eventually in that case, or be
666 return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
668 if !cs.desc.ClientStreams {
671 data, err := encode(cs.codec, m)
675 compData, err := compress(data, cs.cp, cs.comp)
679 hdr, payload := msgHeader(data, compData)
680 // TODO(dfawley): should we be checking len(data) instead?
681 if len(payload) > *cs.callInfo.maxSendMessageSize {
682 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
684 msgBytes := data // Store the pointer before setting to nil. For binary logging.
685 op := func(a *csAttempt) error {
686 err := a.sendMsg(m, hdr, payload, data)
687 // nil out the message and uncomp when replaying; they are only needed for
688 // stats which is disabled for subsequent attempts.
692 err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
693 if cs.binlog != nil && err == nil {
694 cs.binlog.Log(&binarylog.ClientMessage{
702 func (cs *clientStream) RecvMsg(m interface{}) error {
703 if cs.binlog != nil && !cs.serverHeaderBinlogged {
704 // Call Header() to binary log header if it's not already logged.
707 var recvInfo *payloadInfo
708 if cs.binlog != nil {
709 recvInfo = &payloadInfo{}
711 err := cs.withRetry(func(a *csAttempt) error {
712 return a.recvMsg(m, recvInfo)
713 }, cs.commitAttemptLocked)
714 if cs.binlog != nil && err == nil {
715 cs.binlog.Log(&binarylog.ServerMessage{
717 Message: recvInfo.uncompressedBytes,
720 if err != nil || !cs.desc.ServerStreams {
721 // err != nil or non-server-streaming indicates end of stream.
724 if cs.binlog != nil {
725 // finish will not log Trailer. Log Trailer here.
726 logEntry := &binarylog.ServerTrailer{
728 Trailer: cs.Trailer(),
731 if logEntry.Err == io.EOF {
734 if peer, ok := peer.FromContext(cs.Context()); ok {
735 logEntry.PeerAddr = peer.Addr
737 cs.binlog.Log(logEntry)
743 func (cs *clientStream) CloseSend() error {
745 // TODO: return an error and finish the stream instead, due to API misuse?
749 op := func(a *csAttempt) error {
750 a.t.Write(a.s, nil, nil, &transport.Options{Last: true})
751 // Always return nil; io.EOF is the only error that might make sense
752 // instead, but there is no need to signal the client to call RecvMsg
753 // as the only use left for the stream after CloseSend is to call
754 // RecvMsg. This also matches historical behavior.
757 cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) })
758 if cs.binlog != nil {
759 cs.binlog.Log(&binarylog.ClientHalfClose{
763 // We never returned an error here for reasons.
767 func (cs *clientStream) finish(err error) {
769 // Ending a stream with EOF indicates a success.
778 cs.commitAttemptLocked()
780 // For binary logging. only log cancel in finish (could be caused by RPC ctx
781 // canceled or ClientConn closed). Trailer will be logged in RecvMsg.
783 // Only one of cancel or trailer needs to be logged. In the cases where
784 // users don't call RecvMsg, users must have already canceled the RPC.
785 if cs.binlog != nil && status.Code(err) == codes.Canceled {
786 cs.binlog.Log(&binarylog.Cancel{
791 cs.retryThrottler.successfulRPC()
795 cs.cc.incrCallsFailed()
797 cs.cc.incrCallsSucceeded()
800 if cs.attempt != nil {
801 cs.attempt.finish(err)
803 // after functions all rely upon having a stream.
804 if cs.attempt.s != nil {
805 for _, o := range cs.opts {
812 func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
816 if a.trInfo.tr != nil {
817 a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
821 if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil {
822 if !cs.desc.ClientStreams {
823 // For non-client-streaming RPCs, we return nil instead of EOF on error
824 // because the generated code requires it. finish is not called; RecvMsg()
825 // will call it with the stream's status independently.
830 if a.statsHandler != nil {
831 a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now()))
839 func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
841 if a.statsHandler != nil && payInfo == nil {
842 payInfo = &payloadInfo{}
846 // Block until we receive headers containing received message encoding.
847 if ct := a.s.RecvCompress(); ct != "" && ct != encoding.Identity {
848 if a.dc == nil || a.dc.Type() != ct {
849 // No configured decompressor, or it does not match the incoming
850 // message encoding; attempt to find a registered compressor that does.
852 a.decomp = encoding.GetCompressor(ct)
855 // No compression is used; disable our decompressor.
858 // Only initialize this state once per stream.
861 err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)
864 if statusErr := a.s.Status().Err(); statusErr != nil {
867 return io.EOF // indicates successful end of stream.
873 if a.trInfo.tr != nil {
874 a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
878 if a.statsHandler != nil {
879 a.statsHandler.HandleRPC(cs.ctx, &stats.InPayload{
881 RecvTime: time.Now(),
883 // TODO truncate large payload.
884 Data: payInfo.uncompressedBytes,
885 Length: len(payInfo.uncompressedBytes),
891 if cs.desc.ServerStreams {
892 // Subsequent messages should be received by subsequent RecvMsg calls.
895 // Special handling for non-server-stream rpcs.
896 // This recv expects EOF or errors, so we don't collect inPayload.
897 err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp)
899 return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
902 return a.s.Status().Err() // non-server streaming Recv returns nil on success
907 func (a *csAttempt) finish(err error) {
915 // Ending a stream with EOF indicates a success.
919 a.t.CloseStream(a.s, err)
926 br = a.s.BytesReceived()
929 a.done(balancer.DoneInfo{
932 BytesSent: a.s != nil,
936 if a.statsHandler != nil {
939 BeginTime: a.cs.beginTime,
943 a.statsHandler.HandleRPC(a.cs.ctx, end)
945 if a.trInfo.tr != nil {
947 a.trInfo.tr.LazyPrintf("RPC: [OK]")
949 a.trInfo.tr.LazyPrintf("RPC: [%v]", err)
950 a.trInfo.tr.SetError()
958 func (ac *addrConn) newClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, opts ...CallOption) (_ ClientStream, err error) {
960 if ac.transport != t {
962 return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
964 // transition to CONNECTING state when an attempt starts
965 if ac.state != connectivity.Connecting {
966 ac.updateConnectivityState(connectivity.Connecting)
967 ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
972 // TODO: return RPC error here?
973 return nil, errors.New("transport provided is nil")
975 // defaultCallInfo contains unnecessary info(i.e. failfast, maxRetryRPCBufferSize), so we just initialize an empty struct.
978 for _, o := range opts {
979 if err := o.before(c); err != nil {
980 return nil, toRPCErr(err)
983 c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
984 c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize)
986 // Possible context leak:
987 // The cancel function for the child context we create will only be called
988 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
989 // an error is generated by SendMsg.
990 // https://github.com/grpc/grpc-go/issues/1818.
991 ctx, cancel := context.WithCancel(ctx)
998 if err := setCallInfoCodec(c); err != nil {
1002 callHdr := &transport.CallHdr{
1003 Host: ac.cc.authority,
1005 ContentSubtype: c.contentSubtype,
1008 // Set our outgoing compression according to the UseCompressor CallOption, if
1009 // set. In that case, also find the compressor from the encoding package.
1010 // Otherwise, use the compressor configured by the WithCompressor DialOption,
1013 var comp encoding.Compressor
1014 if ct := c.compressorType; ct != "" {
1015 callHdr.SendCompress = ct
1016 if ct != encoding.Identity {
1017 comp = encoding.GetCompressor(ct)
1019 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
1022 } else if ac.cc.dopts.cp != nil {
1023 callHdr.SendCompress = ac.cc.dopts.cp.Type()
1027 callHdr.Creds = c.creds
1030 as := &addrConnStream{
1044 as.callInfo.stream = as
1045 s, err := as.t.NewStream(as.ctx, as.callHdr)
1051 as.p = &parser{r: s}
1052 ac.incrCallsStarted()
1053 if desc != unaryStreamDesc {
1054 // Listen on cc and stream contexts to cleanup when the user closes the
1055 // ClientConn or cancels the stream context. In all other cases, an error
1056 // should already be injected into the recv buffer by the transport, which
1057 // the client will eventually receive, and then we will cancel the stream's
1058 // context in clientStream.finish.
1061 case <-ac.ctx.Done():
1062 as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing"))
1064 as.finish(toRPCErr(ctx.Err()))
1071 type addrConnStream struct {
1074 callHdr *transport.CallHdr
1075 cancel context.CancelFunc
1078 t transport.ClientTransport
1084 comp encoding.Compressor
1087 decomp encoding.Compressor
1093 func (as *addrConnStream) Header() (metadata.MD, error) {
1094 m, err := as.s.Header()
1096 as.finish(toRPCErr(err))
1101 func (as *addrConnStream) Trailer() metadata.MD {
1102 return as.s.Trailer()
1105 func (as *addrConnStream) CloseSend() error {
1107 // TODO: return an error and finish the stream instead, due to API misuse?
1112 as.t.Write(as.s, nil, nil, &transport.Options{Last: true})
1113 // Always return nil; io.EOF is the only error that might make sense
1114 // instead, but there is no need to signal the client to call RecvMsg
1115 // as the only use left for the stream after CloseSend is to call
1116 // RecvMsg. This also matches historical behavior.
1120 func (as *addrConnStream) Context() context.Context {
1121 return as.s.Context()
1124 func (as *addrConnStream) SendMsg(m interface{}) (err error) {
1126 if err != nil && err != io.EOF {
1127 // Call finish on the client stream for errors generated by this SendMsg
1128 // call, as these indicate problems created by this client. (Transport
1129 // errors are converted to an io.EOF error in csAttempt.sendMsg; the real
1130 // error will be returned from RecvMsg eventually in that case, or be
1136 return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
1138 if !as.desc.ClientStreams {
1141 data, err := encode(as.codec, m)
1145 compData, err := compress(data, as.cp, as.comp)
1149 hdr, payld := msgHeader(data, compData)
1150 // TODO(dfawley): should we be checking len(data) instead?
1151 if len(payld) > *as.callInfo.maxSendMessageSize {
1152 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payld), *as.callInfo.maxSendMessageSize)
1155 if err := as.t.Write(as.s, hdr, payld, &transport.Options{Last: !as.desc.ClientStreams}); err != nil {
1156 if !as.desc.ClientStreams {
1157 // For non-client-streaming RPCs, we return nil instead of EOF on error
1158 // because the generated code requires it. finish is not called; RecvMsg()
1159 // will call it with the stream's status independently.
1165 if channelz.IsOn() {
1171 func (as *addrConnStream) RecvMsg(m interface{}) (err error) {
1173 if err != nil || !as.desc.ServerStreams {
1174 // err != nil or non-server-streaming indicates end of stream.
1180 // Block until we receive headers containing received message encoding.
1181 if ct := as.s.RecvCompress(); ct != "" && ct != encoding.Identity {
1182 if as.dc == nil || as.dc.Type() != ct {
1183 // No configured decompressor, or it does not match the incoming
1184 // message encoding; attempt to find a registered compressor that does.
1186 as.decomp = encoding.GetCompressor(ct)
1189 // No compression is used; disable our decompressor.
1192 // Only initialize this state once per stream.
1195 err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp)
1198 if statusErr := as.s.Status().Err(); statusErr != nil {
1201 return io.EOF // indicates successful end of stream.
1203 return toRPCErr(err)
1206 if channelz.IsOn() {
1209 if as.desc.ServerStreams {
1210 // Subsequent messages should be received by subsequent RecvMsg calls.
1214 // Special handling for non-server-stream rpcs.
1215 // This recv expects EOF or errors, so we don't collect inPayload.
1216 err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp)
1218 return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
1221 return as.s.Status().Err() // non-server streaming Recv returns nil on success
1223 return toRPCErr(err)
1226 func (as *addrConnStream) finish(err error) {
1234 // Ending a stream with EOF indicates a success.
1238 as.t.CloseStream(as.s, err)
1242 as.ac.incrCallsFailed()
1244 as.ac.incrCallsSucceeded()
1250 // ServerStream defines the server-side behavior of a streaming RPC.
1252 // All errors returned from ServerStream methods are compatible with the
1254 type ServerStream interface {
1255 // SetHeader sets the header metadata. It may be called multiple times.
1256 // When call multiple times, all the provided metadata will be merged.
1257 // All the metadata will be sent out when one of the following happens:
1258 // - ServerStream.SendHeader() is called;
1259 // - The first response is sent out;
1260 // - An RPC status is sent out (error or success).
1261 SetHeader(metadata.MD) error
1262 // SendHeader sends the header metadata.
1263 // The provided md and headers set by SetHeader() will be sent.
1264 // It fails if called multiple times.
1265 SendHeader(metadata.MD) error
1266 // SetTrailer sets the trailer metadata which will be sent with the RPC status.
1267 // When called more than once, all the provided metadata will be merged.
1268 SetTrailer(metadata.MD)
1269 // Context returns the context for this stream.
1270 Context() context.Context
1271 // SendMsg sends a message. On error, SendMsg aborts the stream and the
1272 // error is returned directly.
1274 // SendMsg blocks until:
1275 // - There is sufficient flow control to schedule m with the transport, or
1276 // - The stream is done, or
1277 // - The stream breaks.
1279 // SendMsg does not wait until the message is received by the client. An
1280 // untimely stream closure may result in lost messages.
1282 // It is safe to have a goroutine calling SendMsg and another goroutine
1283 // calling RecvMsg on the same stream at the same time, but it is not safe
1284 // to call SendMsg on the same stream in different goroutines.
1285 SendMsg(m interface{}) error
1286 // RecvMsg blocks until it receives a message into m or the stream is
1287 // done. It returns io.EOF when the client has performed a CloseSend. On
1288 // any non-EOF error, the stream is aborted and the error contains the
1291 // It is safe to have a goroutine calling SendMsg and another goroutine
1292 // calling RecvMsg on the same stream at the same time, but it is not
1293 // safe to call RecvMsg on the same stream in different goroutines.
1294 RecvMsg(m interface{}) error
1297 // serverStream implements a server side Stream.
1298 type serverStream struct {
1300 t transport.ServerTransport
1307 comp encoding.Compressor
1308 decomp encoding.Compressor
1310 maxReceiveMessageSize int
1311 maxSendMessageSize int
1314 statsHandler stats.Handler
1316 binlog *binarylog.MethodLogger
1317 // serverHeaderBinlogged indicates whether server header has been logged. It
1318 // will happen when one of the following two happens: stream.SendHeader(),
1321 // It's only checked in send and sendHeader, doesn't need to be
1323 serverHeaderBinlogged bool
1325 mu sync.Mutex // protects trInfo.tr after the service handler runs.
1328 func (ss *serverStream) Context() context.Context {
1332 func (ss *serverStream) SetHeader(md metadata.MD) error {
1336 return ss.s.SetHeader(md)
1339 func (ss *serverStream) SendHeader(md metadata.MD) error {
1340 err := ss.t.WriteHeader(ss.s, md)
1341 if ss.binlog != nil && !ss.serverHeaderBinlogged {
1342 h, _ := ss.s.Header()
1343 ss.binlog.Log(&binarylog.ServerHeader{
1346 ss.serverHeaderBinlogged = true
1351 func (ss *serverStream) SetTrailer(md metadata.MD) {
1358 func (ss *serverStream) SendMsg(m interface{}) (err error) {
1360 if ss.trInfo != nil {
1362 if ss.trInfo.tr != nil {
1364 ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
1366 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1367 ss.trInfo.tr.SetError()
1372 if err != nil && err != io.EOF {
1373 st, _ := status.FromError(toRPCErr(err))
1374 ss.t.WriteStatus(ss.s, st)
1375 // Non-user specified status was sent out. This should be an error
1376 // case (as a server side Cancel maybe).
1378 // This is not handled specifically now. User will return a final
1379 // status from the service handler, we will log that error instead.
1380 // This behavior is similar to an interceptor.
1382 if channelz.IsOn() && err == nil {
1386 data, err := encode(ss.codec, m)
1390 compData, err := compress(data, ss.cp, ss.comp)
1394 hdr, payload := msgHeader(data, compData)
1395 // TODO(dfawley): should we be checking len(data) instead?
1396 if len(payload) > ss.maxSendMessageSize {
1397 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize)
1399 if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil {
1400 return toRPCErr(err)
1402 if ss.binlog != nil {
1403 if !ss.serverHeaderBinlogged {
1404 h, _ := ss.s.Header()
1405 ss.binlog.Log(&binarylog.ServerHeader{
1408 ss.serverHeaderBinlogged = true
1410 ss.binlog.Log(&binarylog.ServerMessage{
1414 if ss.statsHandler != nil {
1415 ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now()))
1420 func (ss *serverStream) RecvMsg(m interface{}) (err error) {
1422 if ss.trInfo != nil {
1424 if ss.trInfo.tr != nil {
1426 ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
1427 } else if err != io.EOF {
1428 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1429 ss.trInfo.tr.SetError()
1434 if err != nil && err != io.EOF {
1435 st, _ := status.FromError(toRPCErr(err))
1436 ss.t.WriteStatus(ss.s, st)
1437 // Non-user specified status was sent out. This should be an error
1438 // case (as a server side Cancel maybe).
1440 // This is not handled specifically now. User will return a final
1441 // status from the service handler, we will log that error instead.
1442 // This behavior is similar to an interceptor.
1444 if channelz.IsOn() && err == nil {
1448 var payInfo *payloadInfo
1449 if ss.statsHandler != nil || ss.binlog != nil {
1450 payInfo = &payloadInfo{}
1452 if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp); err != nil {
1454 if ss.binlog != nil {
1455 ss.binlog.Log(&binarylog.ClientHalfClose{})
1459 if err == io.ErrUnexpectedEOF {
1460 err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
1462 return toRPCErr(err)
1464 if ss.statsHandler != nil {
1465 ss.statsHandler.HandleRPC(ss.s.Context(), &stats.InPayload{
1466 RecvTime: time.Now(),
1468 // TODO truncate large payload.
1469 Data: payInfo.uncompressedBytes,
1470 Length: len(payInfo.uncompressedBytes),
1473 if ss.binlog != nil {
1474 ss.binlog.Log(&binarylog.ClientMessage{
1475 Message: payInfo.uncompressedBytes,
1481 // MethodFromServerStream returns the method string for the input stream.
1482 // The returned string is in the format of "/service/method".
1483 func MethodFromServerStream(stream ServerStream) (string, bool) {
1484 return Method(stream.Context())