Code refactoring for bpa operator
[icn.git] / cmd / bpa-operator / vendor / google.golang.org / grpc / stream.go
1 /*
2  *
3  * Copyright 2014 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 package grpc
20
21 import (
22         "context"
23         "errors"
24         "io"
25         "math"
26         "strconv"
27         "sync"
28         "time"
29
30         "golang.org/x/net/trace"
31         "google.golang.org/grpc/balancer"
32         "google.golang.org/grpc/codes"
33         "google.golang.org/grpc/connectivity"
34         "google.golang.org/grpc/encoding"
35         "google.golang.org/grpc/grpclog"
36         "google.golang.org/grpc/internal/binarylog"
37         "google.golang.org/grpc/internal/channelz"
38         "google.golang.org/grpc/internal/grpcrand"
39         "google.golang.org/grpc/internal/transport"
40         "google.golang.org/grpc/metadata"
41         "google.golang.org/grpc/peer"
42         "google.golang.org/grpc/stats"
43         "google.golang.org/grpc/status"
44 )
45
46 // StreamHandler defines the handler called by gRPC server to complete the
47 // execution of a streaming RPC. If a StreamHandler returns an error, it
48 // should be produced by the status package, or else gRPC will use
49 // codes.Unknown as the status code and err.Error() as the status message
50 // of the RPC.
51 type StreamHandler func(srv interface{}, stream ServerStream) error
52
53 // StreamDesc represents a streaming RPC service's method specification.
54 type StreamDesc struct {
55         StreamName string
56         Handler    StreamHandler
57
58         // At least one of these is true.
59         ServerStreams bool
60         ClientStreams bool
61 }
62
63 // Stream defines the common interface a client or server stream has to satisfy.
64 //
65 // Deprecated: See ClientStream and ServerStream documentation instead.
66 type Stream interface {
67         // Deprecated: See ClientStream and ServerStream documentation instead.
68         Context() context.Context
69         // Deprecated: See ClientStream and ServerStream documentation instead.
70         SendMsg(m interface{}) error
71         // Deprecated: See ClientStream and ServerStream documentation instead.
72         RecvMsg(m interface{}) error
73 }
74
75 // ClientStream defines the client-side behavior of a streaming RPC.
76 //
77 // All errors returned from ClientStream methods are compatible with the
78 // status package.
79 type ClientStream interface {
80         // Header returns the header metadata received from the server if there
81         // is any. It blocks if the metadata is not ready to read.
82         Header() (metadata.MD, error)
83         // Trailer returns the trailer metadata from the server, if there is any.
84         // It must only be called after stream.CloseAndRecv has returned, or
85         // stream.Recv has returned a non-nil error (including io.EOF).
86         Trailer() metadata.MD
87         // CloseSend closes the send direction of the stream. It closes the stream
88         // when non-nil error is met. It is also not safe to call CloseSend
89         // concurrently with SendMsg.
90         CloseSend() error
91         // Context returns the context for this stream.
92         //
93         // It should not be called until after Header or RecvMsg has returned. Once
94         // called, subsequent client-side retries are disabled.
95         Context() context.Context
96         // SendMsg is generally called by generated code. On error, SendMsg aborts
97         // the stream. If the error was generated by the client, the status is
98         // returned directly; otherwise, io.EOF is returned and the status of
99         // the stream may be discovered using RecvMsg.
100         //
101         // SendMsg blocks until:
102         //   - There is sufficient flow control to schedule m with the transport, or
103         //   - The stream is done, or
104         //   - The stream breaks.
105         //
106         // SendMsg does not wait until the message is received by the server. An
107         // untimely stream closure may result in lost messages. To ensure delivery,
108         // users should ensure the RPC completed successfully using RecvMsg.
109         //
110         // It is safe to have a goroutine calling SendMsg and another goroutine
111         // calling RecvMsg on the same stream at the same time, but it is not safe
112         // to call SendMsg on the same stream in different goroutines. It is also
113         // not safe to call CloseSend concurrently with SendMsg.
114         SendMsg(m interface{}) error
115         // RecvMsg blocks until it receives a message into m or the stream is
116         // done. It returns io.EOF when the stream completes successfully. On
117         // any other error, the stream is aborted and the error contains the RPC
118         // status.
119         //
120         // It is safe to have a goroutine calling SendMsg and another goroutine
121         // calling RecvMsg on the same stream at the same time, but it is not
122         // safe to call RecvMsg on the same stream in different goroutines.
123         RecvMsg(m interface{}) error
124 }
125
126 // NewStream creates a new Stream for the client side. This is typically
127 // called by generated code. ctx is used for the lifetime of the stream.
128 //
129 // To ensure resources are not leaked due to the stream returned, one of the following
130 // actions must be performed:
131 //
132 //      1. Call Close on the ClientConn.
133 //      2. Cancel the context provided.
134 //      3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
135 //         client-streaming RPC, for instance, might use the helper function
136 //         CloseAndRecv (note that CloseSend does not Recv, therefore is not
137 //         guaranteed to release all resources).
138 //      4. Receive a non-nil, non-io.EOF error from Header or SendMsg.
139 //
140 // If none of the above happen, a goroutine and a context will be leaked, and grpc
141 // will not call the optionally-configured stats handler with a stats.End message.
142 func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
143         // allow interceptor to see all applicable call options, which means those
144         // configured as defaults from dial option as well as per-call options
145         opts = combine(cc.dopts.callOptions, opts)
146
147         if cc.dopts.streamInt != nil {
148                 return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
149         }
150         return newClientStream(ctx, desc, cc, method, opts...)
151 }
152
153 // NewClientStream is a wrapper for ClientConn.NewStream.
154 func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
155         return cc.NewStream(ctx, desc, method, opts...)
156 }
157
158 func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
159         if channelz.IsOn() {
160                 cc.incrCallsStarted()
161                 defer func() {
162                         if err != nil {
163                                 cc.incrCallsFailed()
164                         }
165                 }()
166         }
167         c := defaultCallInfo()
168         // Provide an opportunity for the first RPC to see the first service config
169         // provided by the resolver.
170         if err := cc.waitForResolvedAddrs(ctx); err != nil {
171                 return nil, err
172         }
173         mc := cc.GetMethodConfig(method)
174         if mc.WaitForReady != nil {
175                 c.failFast = !*mc.WaitForReady
176         }
177
178         // Possible context leak:
179         // The cancel function for the child context we create will only be called
180         // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
181         // an error is generated by SendMsg.
182         // https://github.com/grpc/grpc-go/issues/1818.
183         var cancel context.CancelFunc
184         if mc.Timeout != nil && *mc.Timeout >= 0 {
185                 ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
186         } else {
187                 ctx, cancel = context.WithCancel(ctx)
188         }
189         defer func() {
190                 if err != nil {
191                         cancel()
192                 }
193         }()
194
195         for _, o := range opts {
196                 if err := o.before(c); err != nil {
197                         return nil, toRPCErr(err)
198                 }
199         }
200         c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize)
201         c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
202         if err := setCallInfoCodec(c); err != nil {
203                 return nil, err
204         }
205
206         callHdr := &transport.CallHdr{
207                 Host:           cc.authority,
208                 Method:         method,
209                 ContentSubtype: c.contentSubtype,
210         }
211
212         // Set our outgoing compression according to the UseCompressor CallOption, if
213         // set.  In that case, also find the compressor from the encoding package.
214         // Otherwise, use the compressor configured by the WithCompressor DialOption,
215         // if set.
216         var cp Compressor
217         var comp encoding.Compressor
218         if ct := c.compressorType; ct != "" {
219                 callHdr.SendCompress = ct
220                 if ct != encoding.Identity {
221                         comp = encoding.GetCompressor(ct)
222                         if comp == nil {
223                                 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
224                         }
225                 }
226         } else if cc.dopts.cp != nil {
227                 callHdr.SendCompress = cc.dopts.cp.Type()
228                 cp = cc.dopts.cp
229         }
230         if c.creds != nil {
231                 callHdr.Creds = c.creds
232         }
233         var trInfo traceInfo
234         if EnableTracing {
235                 trInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
236                 trInfo.firstLine.client = true
237                 if deadline, ok := ctx.Deadline(); ok {
238                         trInfo.firstLine.deadline = time.Until(deadline)
239                 }
240                 trInfo.tr.LazyLog(&trInfo.firstLine, false)
241                 ctx = trace.NewContext(ctx, trInfo.tr)
242         }
243         ctx = newContextWithRPCInfo(ctx, c.failFast)
244         sh := cc.dopts.copts.StatsHandler
245         var beginTime time.Time
246         if sh != nil {
247                 ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast})
248                 beginTime = time.Now()
249                 begin := &stats.Begin{
250                         Client:    true,
251                         BeginTime: beginTime,
252                         FailFast:  c.failFast,
253                 }
254                 sh.HandleRPC(ctx, begin)
255         }
256
257         cs := &clientStream{
258                 callHdr:      callHdr,
259                 ctx:          ctx,
260                 methodConfig: &mc,
261                 opts:         opts,
262                 callInfo:     c,
263                 cc:           cc,
264                 desc:         desc,
265                 codec:        c.codec,
266                 cp:           cp,
267                 comp:         comp,
268                 cancel:       cancel,
269                 beginTime:    beginTime,
270                 firstAttempt: true,
271         }
272         if !cc.dopts.disableRetry {
273                 cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
274         }
275         cs.binlog = binarylog.GetMethodLogger(method)
276
277         cs.callInfo.stream = cs
278         // Only this initial attempt has stats/tracing.
279         // TODO(dfawley): move to newAttempt when per-attempt stats are implemented.
280         if err := cs.newAttemptLocked(sh, trInfo); err != nil {
281                 cs.finish(err)
282                 return nil, err
283         }
284
285         op := func(a *csAttempt) error { return a.newStream() }
286         if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
287                 cs.finish(err)
288                 return nil, err
289         }
290
291         if cs.binlog != nil {
292                 md, _ := metadata.FromOutgoingContext(ctx)
293                 logEntry := &binarylog.ClientHeader{
294                         OnClientSide: true,
295                         Header:       md,
296                         MethodName:   method,
297                         Authority:    cs.cc.authority,
298                 }
299                 if deadline, ok := ctx.Deadline(); ok {
300                         logEntry.Timeout = time.Until(deadline)
301                         if logEntry.Timeout < 0 {
302                                 logEntry.Timeout = 0
303                         }
304                 }
305                 cs.binlog.Log(logEntry)
306         }
307
308         if desc != unaryStreamDesc {
309                 // Listen on cc and stream contexts to cleanup when the user closes the
310                 // ClientConn or cancels the stream context.  In all other cases, an error
311                 // should already be injected into the recv buffer by the transport, which
312                 // the client will eventually receive, and then we will cancel the stream's
313                 // context in clientStream.finish.
314                 go func() {
315                         select {
316                         case <-cc.ctx.Done():
317                                 cs.finish(ErrClientConnClosing)
318                         case <-ctx.Done():
319                                 cs.finish(toRPCErr(ctx.Err()))
320                         }
321                 }()
322         }
323         return cs, nil
324 }
325
326 func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo traceInfo) error {
327         cs.attempt = &csAttempt{
328                 cs:           cs,
329                 dc:           cs.cc.dopts.dc,
330                 statsHandler: sh,
331                 trInfo:       trInfo,
332         }
333
334         if err := cs.ctx.Err(); err != nil {
335                 return toRPCErr(err)
336         }
337         t, done, err := cs.cc.getTransport(cs.ctx, cs.callInfo.failFast, cs.callHdr.Method)
338         if err != nil {
339                 return err
340         }
341         cs.attempt.t = t
342         cs.attempt.done = done
343         return nil
344 }
345
346 func (a *csAttempt) newStream() error {
347         cs := a.cs
348         cs.callHdr.PreviousAttempts = cs.numRetries
349         s, err := a.t.NewStream(cs.ctx, cs.callHdr)
350         if err != nil {
351                 return toRPCErr(err)
352         }
353         cs.attempt.s = s
354         cs.attempt.p = &parser{r: s}
355         return nil
356 }
357
358 // clientStream implements a client side Stream.
359 type clientStream struct {
360         callHdr  *transport.CallHdr
361         opts     []CallOption
362         callInfo *callInfo
363         cc       *ClientConn
364         desc     *StreamDesc
365
366         codec baseCodec
367         cp    Compressor
368         comp  encoding.Compressor
369
370         cancel context.CancelFunc // cancels all attempts
371
372         sentLast  bool // sent an end stream
373         beginTime time.Time
374
375         methodConfig *MethodConfig
376
377         ctx context.Context // the application's context, wrapped by stats/tracing
378
379         retryThrottler *retryThrottler // The throttler active when the RPC began.
380
381         binlog *binarylog.MethodLogger // Binary logger, can be nil.
382         // serverHeaderBinlogged is a boolean for whether server header has been
383         // logged. Server header will be logged when the first time one of those
384         // happens: stream.Header(), stream.Recv().
385         //
386         // It's only read and used by Recv() and Header(), so it doesn't need to be
387         // synchronized.
388         serverHeaderBinlogged bool
389
390         mu                      sync.Mutex
391         firstAttempt            bool       // if true, transparent retry is valid
392         numRetries              int        // exclusive of transparent retry attempt(s)
393         numRetriesSincePushback int        // retries since pushback; to reset backoff
394         finished                bool       // TODO: replace with atomic cmpxchg or sync.Once?
395         attempt                 *csAttempt // the active client stream attempt
396         // TODO(hedging): hedging will have multiple attempts simultaneously.
397         committed  bool                       // active attempt committed for retry?
398         buffer     []func(a *csAttempt) error // operations to replay on retry
399         bufferSize int                        // current size of buffer
400 }
401
402 // csAttempt implements a single transport stream attempt within a
403 // clientStream.
404 type csAttempt struct {
405         cs   *clientStream
406         t    transport.ClientTransport
407         s    *transport.Stream
408         p    *parser
409         done func(balancer.DoneInfo)
410
411         finished  bool
412         dc        Decompressor
413         decomp    encoding.Compressor
414         decompSet bool
415
416         mu sync.Mutex // guards trInfo.tr
417         // trInfo.tr is set when created (if EnableTracing is true),
418         // and cleared when the finish method is called.
419         trInfo traceInfo
420
421         statsHandler stats.Handler
422 }
423
424 func (cs *clientStream) commitAttemptLocked() {
425         cs.committed = true
426         cs.buffer = nil
427 }
428
429 func (cs *clientStream) commitAttempt() {
430         cs.mu.Lock()
431         cs.commitAttemptLocked()
432         cs.mu.Unlock()
433 }
434
435 // shouldRetry returns nil if the RPC should be retried; otherwise it returns
436 // the error that should be returned by the operation.
437 func (cs *clientStream) shouldRetry(err error) error {
438         if cs.attempt.s == nil && !cs.callInfo.failFast {
439                 // In the event of any error from NewStream (attempt.s == nil), we
440                 // never attempted to write anything to the wire, so we can retry
441                 // indefinitely for non-fail-fast RPCs.
442                 return nil
443         }
444         if cs.finished || cs.committed {
445                 // RPC is finished or committed; cannot retry.
446                 return err
447         }
448         // Wait for the trailers.
449         if cs.attempt.s != nil {
450                 <-cs.attempt.s.Done()
451         }
452         if cs.firstAttempt && !cs.callInfo.failFast && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) {
453                 // First attempt, wait-for-ready, stream unprocessed: transparently retry.
454                 cs.firstAttempt = false
455                 return nil
456         }
457         cs.firstAttempt = false
458         if cs.cc.dopts.disableRetry {
459                 return err
460         }
461
462         pushback := 0
463         hasPushback := false
464         if cs.attempt.s != nil {
465                 if to, toErr := cs.attempt.s.TrailersOnly(); toErr != nil || !to {
466                         return err
467                 }
468
469                 // TODO(retry): Move down if the spec changes to not check server pushback
470                 // before considering this a failure for throttling.
471                 sps := cs.attempt.s.Trailer()["grpc-retry-pushback-ms"]
472                 if len(sps) == 1 {
473                         var e error
474                         if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
475                                 grpclog.Infof("Server retry pushback specified to abort (%q).", sps[0])
476                                 cs.retryThrottler.throttle() // This counts as a failure for throttling.
477                                 return err
478                         }
479                         hasPushback = true
480                 } else if len(sps) > 1 {
481                         grpclog.Warningf("Server retry pushback specified multiple values (%q); not retrying.", sps)
482                         cs.retryThrottler.throttle() // This counts as a failure for throttling.
483                         return err
484                 }
485         }
486
487         var code codes.Code
488         if cs.attempt.s != nil {
489                 code = cs.attempt.s.Status().Code()
490         } else {
491                 code = status.Convert(err).Code()
492         }
493
494         rp := cs.methodConfig.retryPolicy
495         if rp == nil || !rp.retryableStatusCodes[code] {
496                 return err
497         }
498
499         // Note: the ordering here is important; we count this as a failure
500         // only if the code matched a retryable code.
501         if cs.retryThrottler.throttle() {
502                 return err
503         }
504         if cs.numRetries+1 >= rp.maxAttempts {
505                 return err
506         }
507
508         var dur time.Duration
509         if hasPushback {
510                 dur = time.Millisecond * time.Duration(pushback)
511                 cs.numRetriesSincePushback = 0
512         } else {
513                 fact := math.Pow(rp.backoffMultiplier, float64(cs.numRetriesSincePushback))
514                 cur := float64(rp.initialBackoff) * fact
515                 if max := float64(rp.maxBackoff); cur > max {
516                         cur = max
517                 }
518                 dur = time.Duration(grpcrand.Int63n(int64(cur)))
519                 cs.numRetriesSincePushback++
520         }
521
522         // TODO(dfawley): we could eagerly fail here if dur puts us past the
523         // deadline, but unsure if it is worth doing.
524         t := time.NewTimer(dur)
525         select {
526         case <-t.C:
527                 cs.numRetries++
528                 return nil
529         case <-cs.ctx.Done():
530                 t.Stop()
531                 return status.FromContextError(cs.ctx.Err()).Err()
532         }
533 }
534
535 // Returns nil if a retry was performed and succeeded; error otherwise.
536 func (cs *clientStream) retryLocked(lastErr error) error {
537         for {
538                 cs.attempt.finish(lastErr)
539                 if err := cs.shouldRetry(lastErr); err != nil {
540                         cs.commitAttemptLocked()
541                         return err
542                 }
543                 if err := cs.newAttemptLocked(nil, traceInfo{}); err != nil {
544                         return err
545                 }
546                 if lastErr = cs.replayBufferLocked(); lastErr == nil {
547                         return nil
548                 }
549         }
550 }
551
552 func (cs *clientStream) Context() context.Context {
553         cs.commitAttempt()
554         // No need to lock before using attempt, since we know it is committed and
555         // cannot change.
556         return cs.attempt.s.Context()
557 }
558
559 func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
560         cs.mu.Lock()
561         for {
562                 if cs.committed {
563                         cs.mu.Unlock()
564                         return op(cs.attempt)
565                 }
566                 a := cs.attempt
567                 cs.mu.Unlock()
568                 err := op(a)
569                 cs.mu.Lock()
570                 if a != cs.attempt {
571                         // We started another attempt already.
572                         continue
573                 }
574                 if err == io.EOF {
575                         <-a.s.Done()
576                 }
577                 if err == nil || (err == io.EOF && a.s.Status().Code() == codes.OK) {
578                         onSuccess()
579                         cs.mu.Unlock()
580                         return err
581                 }
582                 if err := cs.retryLocked(err); err != nil {
583                         cs.mu.Unlock()
584                         return err
585                 }
586         }
587 }
588
589 func (cs *clientStream) Header() (metadata.MD, error) {
590         var m metadata.MD
591         err := cs.withRetry(func(a *csAttempt) error {
592                 var err error
593                 m, err = a.s.Header()
594                 return toRPCErr(err)
595         }, cs.commitAttemptLocked)
596         if err != nil {
597                 cs.finish(err)
598                 return nil, err
599         }
600         if cs.binlog != nil && !cs.serverHeaderBinlogged {
601                 // Only log if binary log is on and header has not been logged.
602                 logEntry := &binarylog.ServerHeader{
603                         OnClientSide: true,
604                         Header:       m,
605                         PeerAddr:     nil,
606                 }
607                 if peer, ok := peer.FromContext(cs.Context()); ok {
608                         logEntry.PeerAddr = peer.Addr
609                 }
610                 cs.binlog.Log(logEntry)
611                 cs.serverHeaderBinlogged = true
612         }
613         return m, err
614 }
615
616 func (cs *clientStream) Trailer() metadata.MD {
617         // On RPC failure, we never need to retry, because usage requires that
618         // RecvMsg() returned a non-nil error before calling this function is valid.
619         // We would have retried earlier if necessary.
620         //
621         // Commit the attempt anyway, just in case users are not following those
622         // directions -- it will prevent races and should not meaningfully impact
623         // performance.
624         cs.commitAttempt()
625         if cs.attempt.s == nil {
626                 return nil
627         }
628         return cs.attempt.s.Trailer()
629 }
630
631 func (cs *clientStream) replayBufferLocked() error {
632         a := cs.attempt
633         for _, f := range cs.buffer {
634                 if err := f(a); err != nil {
635                         return err
636                 }
637         }
638         return nil
639 }
640
641 func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error) {
642         // Note: we still will buffer if retry is disabled (for transparent retries).
643         if cs.committed {
644                 return
645         }
646         cs.bufferSize += sz
647         if cs.bufferSize > cs.callInfo.maxRetryRPCBufferSize {
648                 cs.commitAttemptLocked()
649                 return
650         }
651         cs.buffer = append(cs.buffer, op)
652 }
653
654 func (cs *clientStream) SendMsg(m interface{}) (err error) {
655         defer func() {
656                 if err != nil && err != io.EOF {
657                         // Call finish on the client stream for errors generated by this SendMsg
658                         // call, as these indicate problems created by this client.  (Transport
659                         // errors are converted to an io.EOF error in csAttempt.sendMsg; the real
660                         // error will be returned from RecvMsg eventually in that case, or be
661                         // retried.)
662                         cs.finish(err)
663                 }
664         }()
665         if cs.sentLast {
666                 return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
667         }
668         if !cs.desc.ClientStreams {
669                 cs.sentLast = true
670         }
671         data, err := encode(cs.codec, m)
672         if err != nil {
673                 return err
674         }
675         compData, err := compress(data, cs.cp, cs.comp)
676         if err != nil {
677                 return err
678         }
679         hdr, payload := msgHeader(data, compData)
680         // TODO(dfawley): should we be checking len(data) instead?
681         if len(payload) > *cs.callInfo.maxSendMessageSize {
682                 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
683         }
684         msgBytes := data // Store the pointer before setting to nil. For binary logging.
685         op := func(a *csAttempt) error {
686                 err := a.sendMsg(m, hdr, payload, data)
687                 // nil out the message and uncomp when replaying; they are only needed for
688                 // stats which is disabled for subsequent attempts.
689                 m, data = nil, nil
690                 return err
691         }
692         err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
693         if cs.binlog != nil && err == nil {
694                 cs.binlog.Log(&binarylog.ClientMessage{
695                         OnClientSide: true,
696                         Message:      msgBytes,
697                 })
698         }
699         return
700 }
701
702 func (cs *clientStream) RecvMsg(m interface{}) error {
703         if cs.binlog != nil && !cs.serverHeaderBinlogged {
704                 // Call Header() to binary log header if it's not already logged.
705                 cs.Header()
706         }
707         var recvInfo *payloadInfo
708         if cs.binlog != nil {
709                 recvInfo = &payloadInfo{}
710         }
711         err := cs.withRetry(func(a *csAttempt) error {
712                 return a.recvMsg(m, recvInfo)
713         }, cs.commitAttemptLocked)
714         if cs.binlog != nil && err == nil {
715                 cs.binlog.Log(&binarylog.ServerMessage{
716                         OnClientSide: true,
717                         Message:      recvInfo.uncompressedBytes,
718                 })
719         }
720         if err != nil || !cs.desc.ServerStreams {
721                 // err != nil or non-server-streaming indicates end of stream.
722                 cs.finish(err)
723
724                 if cs.binlog != nil {
725                         // finish will not log Trailer. Log Trailer here.
726                         logEntry := &binarylog.ServerTrailer{
727                                 OnClientSide: true,
728                                 Trailer:      cs.Trailer(),
729                                 Err:          err,
730                         }
731                         if logEntry.Err == io.EOF {
732                                 logEntry.Err = nil
733                         }
734                         if peer, ok := peer.FromContext(cs.Context()); ok {
735                                 logEntry.PeerAddr = peer.Addr
736                         }
737                         cs.binlog.Log(logEntry)
738                 }
739         }
740         return err
741 }
742
743 func (cs *clientStream) CloseSend() error {
744         if cs.sentLast {
745                 // TODO: return an error and finish the stream instead, due to API misuse?
746                 return nil
747         }
748         cs.sentLast = true
749         op := func(a *csAttempt) error {
750                 a.t.Write(a.s, nil, nil, &transport.Options{Last: true})
751                 // Always return nil; io.EOF is the only error that might make sense
752                 // instead, but there is no need to signal the client to call RecvMsg
753                 // as the only use left for the stream after CloseSend is to call
754                 // RecvMsg.  This also matches historical behavior.
755                 return nil
756         }
757         cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) })
758         if cs.binlog != nil {
759                 cs.binlog.Log(&binarylog.ClientHalfClose{
760                         OnClientSide: true,
761                 })
762         }
763         // We never returned an error here for reasons.
764         return nil
765 }
766
767 func (cs *clientStream) finish(err error) {
768         if err == io.EOF {
769                 // Ending a stream with EOF indicates a success.
770                 err = nil
771         }
772         cs.mu.Lock()
773         if cs.finished {
774                 cs.mu.Unlock()
775                 return
776         }
777         cs.finished = true
778         cs.commitAttemptLocked()
779         cs.mu.Unlock()
780         // For binary logging. only log cancel in finish (could be caused by RPC ctx
781         // canceled or ClientConn closed). Trailer will be logged in RecvMsg.
782         //
783         // Only one of cancel or trailer needs to be logged. In the cases where
784         // users don't call RecvMsg, users must have already canceled the RPC.
785         if cs.binlog != nil && status.Code(err) == codes.Canceled {
786                 cs.binlog.Log(&binarylog.Cancel{
787                         OnClientSide: true,
788                 })
789         }
790         if err == nil {
791                 cs.retryThrottler.successfulRPC()
792         }
793         if channelz.IsOn() {
794                 if err != nil {
795                         cs.cc.incrCallsFailed()
796                 } else {
797                         cs.cc.incrCallsSucceeded()
798                 }
799         }
800         if cs.attempt != nil {
801                 cs.attempt.finish(err)
802         }
803         // after functions all rely upon having a stream.
804         if cs.attempt.s != nil {
805                 for _, o := range cs.opts {
806                         o.after(cs.callInfo)
807                 }
808         }
809         cs.cancel()
810 }
811
812 func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
813         cs := a.cs
814         if EnableTracing {
815                 a.mu.Lock()
816                 if a.trInfo.tr != nil {
817                         a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
818                 }
819                 a.mu.Unlock()
820         }
821         if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil {
822                 if !cs.desc.ClientStreams {
823                         // For non-client-streaming RPCs, we return nil instead of EOF on error
824                         // because the generated code requires it.  finish is not called; RecvMsg()
825                         // will call it with the stream's status independently.
826                         return nil
827                 }
828                 return io.EOF
829         }
830         if a.statsHandler != nil {
831                 a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now()))
832         }
833         if channelz.IsOn() {
834                 a.t.IncrMsgSent()
835         }
836         return nil
837 }
838
839 func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
840         cs := a.cs
841         if a.statsHandler != nil && payInfo == nil {
842                 payInfo = &payloadInfo{}
843         }
844
845         if !a.decompSet {
846                 // Block until we receive headers containing received message encoding.
847                 if ct := a.s.RecvCompress(); ct != "" && ct != encoding.Identity {
848                         if a.dc == nil || a.dc.Type() != ct {
849                                 // No configured decompressor, or it does not match the incoming
850                                 // message encoding; attempt to find a registered compressor that does.
851                                 a.dc = nil
852                                 a.decomp = encoding.GetCompressor(ct)
853                         }
854                 } else {
855                         // No compression is used; disable our decompressor.
856                         a.dc = nil
857                 }
858                 // Only initialize this state once per stream.
859                 a.decompSet = true
860         }
861         err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)
862         if err != nil {
863                 if err == io.EOF {
864                         if statusErr := a.s.Status().Err(); statusErr != nil {
865                                 return statusErr
866                         }
867                         return io.EOF // indicates successful end of stream.
868                 }
869                 return toRPCErr(err)
870         }
871         if EnableTracing {
872                 a.mu.Lock()
873                 if a.trInfo.tr != nil {
874                         a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
875                 }
876                 a.mu.Unlock()
877         }
878         if a.statsHandler != nil {
879                 a.statsHandler.HandleRPC(cs.ctx, &stats.InPayload{
880                         Client:   true,
881                         RecvTime: time.Now(),
882                         Payload:  m,
883                         // TODO truncate large payload.
884                         Data:   payInfo.uncompressedBytes,
885                         Length: len(payInfo.uncompressedBytes),
886                 })
887         }
888         if channelz.IsOn() {
889                 a.t.IncrMsgRecv()
890         }
891         if cs.desc.ServerStreams {
892                 // Subsequent messages should be received by subsequent RecvMsg calls.
893                 return nil
894         }
895         // Special handling for non-server-stream rpcs.
896         // This recv expects EOF or errors, so we don't collect inPayload.
897         err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp)
898         if err == nil {
899                 return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
900         }
901         if err == io.EOF {
902                 return a.s.Status().Err() // non-server streaming Recv returns nil on success
903         }
904         return toRPCErr(err)
905 }
906
907 func (a *csAttempt) finish(err error) {
908         a.mu.Lock()
909         if a.finished {
910                 a.mu.Unlock()
911                 return
912         }
913         a.finished = true
914         if err == io.EOF {
915                 // Ending a stream with EOF indicates a success.
916                 err = nil
917         }
918         if a.s != nil {
919                 a.t.CloseStream(a.s, err)
920         }
921
922         if a.done != nil {
923                 br := false
924                 var tr metadata.MD
925                 if a.s != nil {
926                         br = a.s.BytesReceived()
927                         tr = a.s.Trailer()
928                 }
929                 a.done(balancer.DoneInfo{
930                         Err:           err,
931                         Trailer:       tr,
932                         BytesSent:     a.s != nil,
933                         BytesReceived: br,
934                 })
935         }
936         if a.statsHandler != nil {
937                 end := &stats.End{
938                         Client:    true,
939                         BeginTime: a.cs.beginTime,
940                         EndTime:   time.Now(),
941                         Error:     err,
942                 }
943                 a.statsHandler.HandleRPC(a.cs.ctx, end)
944         }
945         if a.trInfo.tr != nil {
946                 if err == nil {
947                         a.trInfo.tr.LazyPrintf("RPC: [OK]")
948                 } else {
949                         a.trInfo.tr.LazyPrintf("RPC: [%v]", err)
950                         a.trInfo.tr.SetError()
951                 }
952                 a.trInfo.tr.Finish()
953                 a.trInfo.tr = nil
954         }
955         a.mu.Unlock()
956 }
957
958 func (ac *addrConn) newClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, opts ...CallOption) (_ ClientStream, err error) {
959         ac.mu.Lock()
960         if ac.transport != t {
961                 ac.mu.Unlock()
962                 return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
963         }
964         // transition to CONNECTING state when an attempt starts
965         if ac.state != connectivity.Connecting {
966                 ac.updateConnectivityState(connectivity.Connecting)
967                 ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
968         }
969         ac.mu.Unlock()
970
971         if t == nil {
972                 // TODO: return RPC error here?
973                 return nil, errors.New("transport provided is nil")
974         }
975         // defaultCallInfo contains unnecessary info(i.e. failfast, maxRetryRPCBufferSize), so we just initialize an empty struct.
976         c := &callInfo{}
977
978         for _, o := range opts {
979                 if err := o.before(c); err != nil {
980                         return nil, toRPCErr(err)
981                 }
982         }
983         c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
984         c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize)
985
986         // Possible context leak:
987         // The cancel function for the child context we create will only be called
988         // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
989         // an error is generated by SendMsg.
990         // https://github.com/grpc/grpc-go/issues/1818.
991         ctx, cancel := context.WithCancel(ctx)
992         defer func() {
993                 if err != nil {
994                         cancel()
995                 }
996         }()
997
998         if err := setCallInfoCodec(c); err != nil {
999                 return nil, err
1000         }
1001
1002         callHdr := &transport.CallHdr{
1003                 Host:           ac.cc.authority,
1004                 Method:         method,
1005                 ContentSubtype: c.contentSubtype,
1006         }
1007
1008         // Set our outgoing compression according to the UseCompressor CallOption, if
1009         // set.  In that case, also find the compressor from the encoding package.
1010         // Otherwise, use the compressor configured by the WithCompressor DialOption,
1011         // if set.
1012         var cp Compressor
1013         var comp encoding.Compressor
1014         if ct := c.compressorType; ct != "" {
1015                 callHdr.SendCompress = ct
1016                 if ct != encoding.Identity {
1017                         comp = encoding.GetCompressor(ct)
1018                         if comp == nil {
1019                                 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
1020                         }
1021                 }
1022         } else if ac.cc.dopts.cp != nil {
1023                 callHdr.SendCompress = ac.cc.dopts.cp.Type()
1024                 cp = ac.cc.dopts.cp
1025         }
1026         if c.creds != nil {
1027                 callHdr.Creds = c.creds
1028         }
1029
1030         as := &addrConnStream{
1031                 callHdr:  callHdr,
1032                 ac:       ac,
1033                 ctx:      ctx,
1034                 cancel:   cancel,
1035                 opts:     opts,
1036                 callInfo: c,
1037                 desc:     desc,
1038                 codec:    c.codec,
1039                 cp:       cp,
1040                 comp:     comp,
1041                 t:        t,
1042         }
1043
1044         as.callInfo.stream = as
1045         s, err := as.t.NewStream(as.ctx, as.callHdr)
1046         if err != nil {
1047                 err = toRPCErr(err)
1048                 return nil, err
1049         }
1050         as.s = s
1051         as.p = &parser{r: s}
1052         ac.incrCallsStarted()
1053         if desc != unaryStreamDesc {
1054                 // Listen on cc and stream contexts to cleanup when the user closes the
1055                 // ClientConn or cancels the stream context.  In all other cases, an error
1056                 // should already be injected into the recv buffer by the transport, which
1057                 // the client will eventually receive, and then we will cancel the stream's
1058                 // context in clientStream.finish.
1059                 go func() {
1060                         select {
1061                         case <-ac.ctx.Done():
1062                                 as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing"))
1063                         case <-ctx.Done():
1064                                 as.finish(toRPCErr(ctx.Err()))
1065                         }
1066                 }()
1067         }
1068         return as, nil
1069 }
1070
1071 type addrConnStream struct {
1072         s         *transport.Stream
1073         ac        *addrConn
1074         callHdr   *transport.CallHdr
1075         cancel    context.CancelFunc
1076         opts      []CallOption
1077         callInfo  *callInfo
1078         t         transport.ClientTransport
1079         ctx       context.Context
1080         sentLast  bool
1081         desc      *StreamDesc
1082         codec     baseCodec
1083         cp        Compressor
1084         comp      encoding.Compressor
1085         decompSet bool
1086         dc        Decompressor
1087         decomp    encoding.Compressor
1088         p         *parser
1089         mu        sync.Mutex
1090         finished  bool
1091 }
1092
1093 func (as *addrConnStream) Header() (metadata.MD, error) {
1094         m, err := as.s.Header()
1095         if err != nil {
1096                 as.finish(toRPCErr(err))
1097         }
1098         return m, err
1099 }
1100
1101 func (as *addrConnStream) Trailer() metadata.MD {
1102         return as.s.Trailer()
1103 }
1104
1105 func (as *addrConnStream) CloseSend() error {
1106         if as.sentLast {
1107                 // TODO: return an error and finish the stream instead, due to API misuse?
1108                 return nil
1109         }
1110         as.sentLast = true
1111
1112         as.t.Write(as.s, nil, nil, &transport.Options{Last: true})
1113         // Always return nil; io.EOF is the only error that might make sense
1114         // instead, but there is no need to signal the client to call RecvMsg
1115         // as the only use left for the stream after CloseSend is to call
1116         // RecvMsg.  This also matches historical behavior.
1117         return nil
1118 }
1119
1120 func (as *addrConnStream) Context() context.Context {
1121         return as.s.Context()
1122 }
1123
1124 func (as *addrConnStream) SendMsg(m interface{}) (err error) {
1125         defer func() {
1126                 if err != nil && err != io.EOF {
1127                         // Call finish on the client stream for errors generated by this SendMsg
1128                         // call, as these indicate problems created by this client.  (Transport
1129                         // errors are converted to an io.EOF error in csAttempt.sendMsg; the real
1130                         // error will be returned from RecvMsg eventually in that case, or be
1131                         // retried.)
1132                         as.finish(err)
1133                 }
1134         }()
1135         if as.sentLast {
1136                 return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
1137         }
1138         if !as.desc.ClientStreams {
1139                 as.sentLast = true
1140         }
1141         data, err := encode(as.codec, m)
1142         if err != nil {
1143                 return err
1144         }
1145         compData, err := compress(data, as.cp, as.comp)
1146         if err != nil {
1147                 return err
1148         }
1149         hdr, payld := msgHeader(data, compData)
1150         // TODO(dfawley): should we be checking len(data) instead?
1151         if len(payld) > *as.callInfo.maxSendMessageSize {
1152                 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payld), *as.callInfo.maxSendMessageSize)
1153         }
1154
1155         if err := as.t.Write(as.s, hdr, payld, &transport.Options{Last: !as.desc.ClientStreams}); err != nil {
1156                 if !as.desc.ClientStreams {
1157                         // For non-client-streaming RPCs, we return nil instead of EOF on error
1158                         // because the generated code requires it.  finish is not called; RecvMsg()
1159                         // will call it with the stream's status independently.
1160                         return nil
1161                 }
1162                 return io.EOF
1163         }
1164
1165         if channelz.IsOn() {
1166                 as.t.IncrMsgSent()
1167         }
1168         return nil
1169 }
1170
1171 func (as *addrConnStream) RecvMsg(m interface{}) (err error) {
1172         defer func() {
1173                 if err != nil || !as.desc.ServerStreams {
1174                         // err != nil or non-server-streaming indicates end of stream.
1175                         as.finish(err)
1176                 }
1177         }()
1178
1179         if !as.decompSet {
1180                 // Block until we receive headers containing received message encoding.
1181                 if ct := as.s.RecvCompress(); ct != "" && ct != encoding.Identity {
1182                         if as.dc == nil || as.dc.Type() != ct {
1183                                 // No configured decompressor, or it does not match the incoming
1184                                 // message encoding; attempt to find a registered compressor that does.
1185                                 as.dc = nil
1186                                 as.decomp = encoding.GetCompressor(ct)
1187                         }
1188                 } else {
1189                         // No compression is used; disable our decompressor.
1190                         as.dc = nil
1191                 }
1192                 // Only initialize this state once per stream.
1193                 as.decompSet = true
1194         }
1195         err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp)
1196         if err != nil {
1197                 if err == io.EOF {
1198                         if statusErr := as.s.Status().Err(); statusErr != nil {
1199                                 return statusErr
1200                         }
1201                         return io.EOF // indicates successful end of stream.
1202                 }
1203                 return toRPCErr(err)
1204         }
1205
1206         if channelz.IsOn() {
1207                 as.t.IncrMsgRecv()
1208         }
1209         if as.desc.ServerStreams {
1210                 // Subsequent messages should be received by subsequent RecvMsg calls.
1211                 return nil
1212         }
1213
1214         // Special handling for non-server-stream rpcs.
1215         // This recv expects EOF or errors, so we don't collect inPayload.
1216         err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp)
1217         if err == nil {
1218                 return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
1219         }
1220         if err == io.EOF {
1221                 return as.s.Status().Err() // non-server streaming Recv returns nil on success
1222         }
1223         return toRPCErr(err)
1224 }
1225
1226 func (as *addrConnStream) finish(err error) {
1227         as.mu.Lock()
1228         if as.finished {
1229                 as.mu.Unlock()
1230                 return
1231         }
1232         as.finished = true
1233         if err == io.EOF {
1234                 // Ending a stream with EOF indicates a success.
1235                 err = nil
1236         }
1237         if as.s != nil {
1238                 as.t.CloseStream(as.s, err)
1239         }
1240
1241         if err != nil {
1242                 as.ac.incrCallsFailed()
1243         } else {
1244                 as.ac.incrCallsSucceeded()
1245         }
1246         as.cancel()
1247         as.mu.Unlock()
1248 }
1249
1250 // ServerStream defines the server-side behavior of a streaming RPC.
1251 //
1252 // All errors returned from ServerStream methods are compatible with the
1253 // status package.
1254 type ServerStream interface {
1255         // SetHeader sets the header metadata. It may be called multiple times.
1256         // When call multiple times, all the provided metadata will be merged.
1257         // All the metadata will be sent out when one of the following happens:
1258         //  - ServerStream.SendHeader() is called;
1259         //  - The first response is sent out;
1260         //  - An RPC status is sent out (error or success).
1261         SetHeader(metadata.MD) error
1262         // SendHeader sends the header metadata.
1263         // The provided md and headers set by SetHeader() will be sent.
1264         // It fails if called multiple times.
1265         SendHeader(metadata.MD) error
1266         // SetTrailer sets the trailer metadata which will be sent with the RPC status.
1267         // When called more than once, all the provided metadata will be merged.
1268         SetTrailer(metadata.MD)
1269         // Context returns the context for this stream.
1270         Context() context.Context
1271         // SendMsg sends a message. On error, SendMsg aborts the stream and the
1272         // error is returned directly.
1273         //
1274         // SendMsg blocks until:
1275         //   - There is sufficient flow control to schedule m with the transport, or
1276         //   - The stream is done, or
1277         //   - The stream breaks.
1278         //
1279         // SendMsg does not wait until the message is received by the client. An
1280         // untimely stream closure may result in lost messages.
1281         //
1282         // It is safe to have a goroutine calling SendMsg and another goroutine
1283         // calling RecvMsg on the same stream at the same time, but it is not safe
1284         // to call SendMsg on the same stream in different goroutines.
1285         SendMsg(m interface{}) error
1286         // RecvMsg blocks until it receives a message into m or the stream is
1287         // done. It returns io.EOF when the client has performed a CloseSend. On
1288         // any non-EOF error, the stream is aborted and the error contains the
1289         // RPC status.
1290         //
1291         // It is safe to have a goroutine calling SendMsg and another goroutine
1292         // calling RecvMsg on the same stream at the same time, but it is not
1293         // safe to call RecvMsg on the same stream in different goroutines.
1294         RecvMsg(m interface{}) error
1295 }
1296
1297 // serverStream implements a server side Stream.
1298 type serverStream struct {
1299         ctx   context.Context
1300         t     transport.ServerTransport
1301         s     *transport.Stream
1302         p     *parser
1303         codec baseCodec
1304
1305         cp     Compressor
1306         dc     Decompressor
1307         comp   encoding.Compressor
1308         decomp encoding.Compressor
1309
1310         maxReceiveMessageSize int
1311         maxSendMessageSize    int
1312         trInfo                *traceInfo
1313
1314         statsHandler stats.Handler
1315
1316         binlog *binarylog.MethodLogger
1317         // serverHeaderBinlogged indicates whether server header has been logged. It
1318         // will happen when one of the following two happens: stream.SendHeader(),
1319         // stream.Send().
1320         //
1321         // It's only checked in send and sendHeader, doesn't need to be
1322         // synchronized.
1323         serverHeaderBinlogged bool
1324
1325         mu sync.Mutex // protects trInfo.tr after the service handler runs.
1326 }
1327
1328 func (ss *serverStream) Context() context.Context {
1329         return ss.ctx
1330 }
1331
1332 func (ss *serverStream) SetHeader(md metadata.MD) error {
1333         if md.Len() == 0 {
1334                 return nil
1335         }
1336         return ss.s.SetHeader(md)
1337 }
1338
1339 func (ss *serverStream) SendHeader(md metadata.MD) error {
1340         err := ss.t.WriteHeader(ss.s, md)
1341         if ss.binlog != nil && !ss.serverHeaderBinlogged {
1342                 h, _ := ss.s.Header()
1343                 ss.binlog.Log(&binarylog.ServerHeader{
1344                         Header: h,
1345                 })
1346                 ss.serverHeaderBinlogged = true
1347         }
1348         return err
1349 }
1350
1351 func (ss *serverStream) SetTrailer(md metadata.MD) {
1352         if md.Len() == 0 {
1353                 return
1354         }
1355         ss.s.SetTrailer(md)
1356 }
1357
1358 func (ss *serverStream) SendMsg(m interface{}) (err error) {
1359         defer func() {
1360                 if ss.trInfo != nil {
1361                         ss.mu.Lock()
1362                         if ss.trInfo.tr != nil {
1363                                 if err == nil {
1364                                         ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
1365                                 } else {
1366                                         ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1367                                         ss.trInfo.tr.SetError()
1368                                 }
1369                         }
1370                         ss.mu.Unlock()
1371                 }
1372                 if err != nil && err != io.EOF {
1373                         st, _ := status.FromError(toRPCErr(err))
1374                         ss.t.WriteStatus(ss.s, st)
1375                         // Non-user specified status was sent out. This should be an error
1376                         // case (as a server side Cancel maybe).
1377                         //
1378                         // This is not handled specifically now. User will return a final
1379                         // status from the service handler, we will log that error instead.
1380                         // This behavior is similar to an interceptor.
1381                 }
1382                 if channelz.IsOn() && err == nil {
1383                         ss.t.IncrMsgSent()
1384                 }
1385         }()
1386         data, err := encode(ss.codec, m)
1387         if err != nil {
1388                 return err
1389         }
1390         compData, err := compress(data, ss.cp, ss.comp)
1391         if err != nil {
1392                 return err
1393         }
1394         hdr, payload := msgHeader(data, compData)
1395         // TODO(dfawley): should we be checking len(data) instead?
1396         if len(payload) > ss.maxSendMessageSize {
1397                 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize)
1398         }
1399         if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil {
1400                 return toRPCErr(err)
1401         }
1402         if ss.binlog != nil {
1403                 if !ss.serverHeaderBinlogged {
1404                         h, _ := ss.s.Header()
1405                         ss.binlog.Log(&binarylog.ServerHeader{
1406                                 Header: h,
1407                         })
1408                         ss.serverHeaderBinlogged = true
1409                 }
1410                 ss.binlog.Log(&binarylog.ServerMessage{
1411                         Message: data,
1412                 })
1413         }
1414         if ss.statsHandler != nil {
1415                 ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now()))
1416         }
1417         return nil
1418 }
1419
1420 func (ss *serverStream) RecvMsg(m interface{}) (err error) {
1421         defer func() {
1422                 if ss.trInfo != nil {
1423                         ss.mu.Lock()
1424                         if ss.trInfo.tr != nil {
1425                                 if err == nil {
1426                                         ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
1427                                 } else if err != io.EOF {
1428                                         ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1429                                         ss.trInfo.tr.SetError()
1430                                 }
1431                         }
1432                         ss.mu.Unlock()
1433                 }
1434                 if err != nil && err != io.EOF {
1435                         st, _ := status.FromError(toRPCErr(err))
1436                         ss.t.WriteStatus(ss.s, st)
1437                         // Non-user specified status was sent out. This should be an error
1438                         // case (as a server side Cancel maybe).
1439                         //
1440                         // This is not handled specifically now. User will return a final
1441                         // status from the service handler, we will log that error instead.
1442                         // This behavior is similar to an interceptor.
1443                 }
1444                 if channelz.IsOn() && err == nil {
1445                         ss.t.IncrMsgRecv()
1446                 }
1447         }()
1448         var payInfo *payloadInfo
1449         if ss.statsHandler != nil || ss.binlog != nil {
1450                 payInfo = &payloadInfo{}
1451         }
1452         if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp); err != nil {
1453                 if err == io.EOF {
1454                         if ss.binlog != nil {
1455                                 ss.binlog.Log(&binarylog.ClientHalfClose{})
1456                         }
1457                         return err
1458                 }
1459                 if err == io.ErrUnexpectedEOF {
1460                         err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
1461                 }
1462                 return toRPCErr(err)
1463         }
1464         if ss.statsHandler != nil {
1465                 ss.statsHandler.HandleRPC(ss.s.Context(), &stats.InPayload{
1466                         RecvTime: time.Now(),
1467                         Payload:  m,
1468                         // TODO truncate large payload.
1469                         Data:   payInfo.uncompressedBytes,
1470                         Length: len(payInfo.uncompressedBytes),
1471                 })
1472         }
1473         if ss.binlog != nil {
1474                 ss.binlog.Log(&binarylog.ClientMessage{
1475                         Message: payInfo.uncompressedBytes,
1476                 })
1477         }
1478         return nil
1479 }
1480
1481 // MethodFromServerStream returns the method string for the input stream.
1482 // The returned string is in the format of "/service/method".
1483 func MethodFromServerStream(stream ServerStream) (string, bool) {
1484         return Method(stream.Context())
1485 }