Code refactoring for bpa operator
[icn.git] / cmd / bpa-operator / vendor / google.golang.org / grpc / internal / transport / transport.go
1 /*
2  *
3  * Copyright 2014 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 // Package transport defines and implements message oriented communication
20 // channel to complete various transactions (e.g., an RPC).  It is meant for
21 // grpc-internal usage and is not intended to be imported directly by users.
22 package transport
23
24 import (
25         "context"
26         "errors"
27         "fmt"
28         "io"
29         "net"
30         "sync"
31         "sync/atomic"
32
33         "google.golang.org/grpc/codes"
34         "google.golang.org/grpc/credentials"
35         "google.golang.org/grpc/keepalive"
36         "google.golang.org/grpc/metadata"
37         "google.golang.org/grpc/stats"
38         "google.golang.org/grpc/status"
39         "google.golang.org/grpc/tap"
40 )
41
42 // recvMsg represents the received msg from the transport. All transport
43 // protocol specific info has been removed.
44 type recvMsg struct {
45         data []byte
46         // nil: received some data
47         // io.EOF: stream is completed. data is nil.
48         // other non-nil error: transport failure. data is nil.
49         err error
50 }
51
52 // recvBuffer is an unbounded channel of recvMsg structs.
53 // Note recvBuffer differs from controlBuffer only in that recvBuffer
54 // holds a channel of only recvMsg structs instead of objects implementing "item" interface.
55 // recvBuffer is written to much more often than
56 // controlBuffer and using strict recvMsg structs helps avoid allocation in "recvBuffer.put"
57 type recvBuffer struct {
58         c       chan recvMsg
59         mu      sync.Mutex
60         backlog []recvMsg
61         err     error
62 }
63
64 func newRecvBuffer() *recvBuffer {
65         b := &recvBuffer{
66                 c: make(chan recvMsg, 1),
67         }
68         return b
69 }
70
71 func (b *recvBuffer) put(r recvMsg) {
72         b.mu.Lock()
73         if b.err != nil {
74                 b.mu.Unlock()
75                 // An error had occurred earlier, don't accept more
76                 // data or errors.
77                 return
78         }
79         b.err = r.err
80         if len(b.backlog) == 0 {
81                 select {
82                 case b.c <- r:
83                         b.mu.Unlock()
84                         return
85                 default:
86                 }
87         }
88         b.backlog = append(b.backlog, r)
89         b.mu.Unlock()
90 }
91
92 func (b *recvBuffer) load() {
93         b.mu.Lock()
94         if len(b.backlog) > 0 {
95                 select {
96                 case b.c <- b.backlog[0]:
97                         b.backlog[0] = recvMsg{}
98                         b.backlog = b.backlog[1:]
99                 default:
100                 }
101         }
102         b.mu.Unlock()
103 }
104
105 // get returns the channel that receives a recvMsg in the buffer.
106 //
107 // Upon receipt of a recvMsg, the caller should call load to send another
108 // recvMsg onto the channel if there is any.
109 func (b *recvBuffer) get() <-chan recvMsg {
110         return b.c
111 }
112
113 // recvBufferReader implements io.Reader interface to read the data from
114 // recvBuffer.
115 type recvBufferReader struct {
116         closeStream func(error) // Closes the client transport stream with the given error and nil trailer metadata.
117         ctx         context.Context
118         ctxDone     <-chan struct{} // cache of ctx.Done() (for performance).
119         recv        *recvBuffer
120         last        []byte // Stores the remaining data in the previous calls.
121         err         error
122 }
123
124 // Read reads the next len(p) bytes from last. If last is drained, it tries to
125 // read additional data from recv. It blocks if there no additional data available
126 // in recv. If Read returns any non-nil error, it will continue to return that error.
127 func (r *recvBufferReader) Read(p []byte) (n int, err error) {
128         if r.err != nil {
129                 return 0, r.err
130         }
131         if r.last != nil && len(r.last) > 0 {
132                 // Read remaining data left in last call.
133                 copied := copy(p, r.last)
134                 r.last = r.last[copied:]
135                 return copied, nil
136         }
137         if r.closeStream != nil {
138                 n, r.err = r.readClient(p)
139         } else {
140                 n, r.err = r.read(p)
141         }
142         return n, r.err
143 }
144
145 func (r *recvBufferReader) read(p []byte) (n int, err error) {
146         select {
147         case <-r.ctxDone:
148                 return 0, ContextErr(r.ctx.Err())
149         case m := <-r.recv.get():
150                 return r.readAdditional(m, p)
151         }
152 }
153
154 func (r *recvBufferReader) readClient(p []byte) (n int, err error) {
155         // If the context is canceled, then closes the stream with nil metadata.
156         // closeStream writes its error parameter to r.recv as a recvMsg.
157         // r.readAdditional acts on that message and returns the necessary error.
158         select {
159         case <-r.ctxDone:
160                 r.closeStream(ContextErr(r.ctx.Err()))
161                 m := <-r.recv.get()
162                 return r.readAdditional(m, p)
163         case m := <-r.recv.get():
164                 return r.readAdditional(m, p)
165         }
166 }
167
168 func (r *recvBufferReader) readAdditional(m recvMsg, p []byte) (n int, err error) {
169         r.recv.load()
170         if m.err != nil {
171                 return 0, m.err
172         }
173         copied := copy(p, m.data)
174         r.last = m.data[copied:]
175         return copied, nil
176 }
177
178 type streamState uint32
179
180 const (
181         streamActive    streamState = iota
182         streamWriteDone             // EndStream sent
183         streamReadDone              // EndStream received
184         streamDone                  // the entire stream is finished.
185 )
186
187 // Stream represents an RPC in the transport layer.
188 type Stream struct {
189         id           uint32
190         st           ServerTransport    // nil for client side Stream
191         ctx          context.Context    // the associated context of the stream
192         cancel       context.CancelFunc // always nil for client side Stream
193         done         chan struct{}      // closed at the end of stream to unblock writers. On the client side.
194         ctxDone      <-chan struct{}    // same as done chan but for server side. Cache of ctx.Done() (for performance)
195         method       string             // the associated RPC method of the stream
196         recvCompress string
197         sendCompress string
198         buf          *recvBuffer
199         trReader     io.Reader
200         fc           *inFlow
201         wq           *writeQuota
202
203         // Callback to state application's intentions to read data. This
204         // is used to adjust flow control, if needed.
205         requestRead func(int)
206
207         headerChan chan struct{} // closed to indicate the end of header metadata.
208         headerDone uint32        // set when headerChan is closed. Used to avoid closing headerChan multiple times.
209
210         // hdrMu protects header and trailer metadata on the server-side.
211         hdrMu sync.Mutex
212         // On client side, header keeps the received header metadata.
213         //
214         // On server side, header keeps the header set by SetHeader(). The complete
215         // header will merged into this after t.WriteHeader() is called.
216         header  metadata.MD
217         trailer metadata.MD // the key-value map of trailer metadata.
218
219         noHeaders bool // set if the client never received headers (set only after the stream is done).
220
221         // On the server-side, headerSent is atomically set to 1 when the headers are sent out.
222         headerSent uint32
223
224         state streamState
225
226         // On client-side it is the status error received from the server.
227         // On server-side it is unused.
228         status *status.Status
229
230         bytesReceived uint32 // indicates whether any bytes have been received on this stream
231         unprocessed   uint32 // set if the server sends a refused stream or GOAWAY including this stream
232
233         // contentSubtype is the content-subtype for requests.
234         // this must be lowercase or the behavior is undefined.
235         contentSubtype string
236 }
237
238 // isHeaderSent is only valid on the server-side.
239 func (s *Stream) isHeaderSent() bool {
240         return atomic.LoadUint32(&s.headerSent) == 1
241 }
242
243 // updateHeaderSent updates headerSent and returns true
244 // if it was alreay set. It is valid only on server-side.
245 func (s *Stream) updateHeaderSent() bool {
246         return atomic.SwapUint32(&s.headerSent, 1) == 1
247 }
248
249 func (s *Stream) swapState(st streamState) streamState {
250         return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st)))
251 }
252
253 func (s *Stream) compareAndSwapState(oldState, newState streamState) bool {
254         return atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(oldState), uint32(newState))
255 }
256
257 func (s *Stream) getState() streamState {
258         return streamState(atomic.LoadUint32((*uint32)(&s.state)))
259 }
260
261 func (s *Stream) waitOnHeader() error {
262         if s.headerChan == nil {
263                 // On the server headerChan is always nil since a stream originates
264                 // only after having received headers.
265                 return nil
266         }
267         select {
268         case <-s.ctx.Done():
269                 return ContextErr(s.ctx.Err())
270         case <-s.headerChan:
271                 return nil
272         }
273 }
274
275 // RecvCompress returns the compression algorithm applied to the inbound
276 // message. It is empty string if there is no compression applied.
277 func (s *Stream) RecvCompress() string {
278         if err := s.waitOnHeader(); err != nil {
279                 return ""
280         }
281         return s.recvCompress
282 }
283
284 // SetSendCompress sets the compression algorithm to the stream.
285 func (s *Stream) SetSendCompress(str string) {
286         s.sendCompress = str
287 }
288
289 // Done returns a channel which is closed when it receives the final status
290 // from the server.
291 func (s *Stream) Done() <-chan struct{} {
292         return s.done
293 }
294
295 // Header returns the header metadata of the stream.
296 //
297 // On client side, it acquires the key-value pairs of header metadata once it is
298 // available. It blocks until i) the metadata is ready or ii) there is no header
299 // metadata or iii) the stream is canceled/expired.
300 //
301 // On server side, it returns the out header after t.WriteHeader is called.
302 func (s *Stream) Header() (metadata.MD, error) {
303         if s.headerChan == nil && s.header != nil {
304                 // On server side, return the header in stream. It will be the out
305                 // header after t.WriteHeader is called.
306                 return s.header.Copy(), nil
307         }
308         err := s.waitOnHeader()
309         // Even if the stream is closed, header is returned if available.
310         select {
311         case <-s.headerChan:
312                 if s.header == nil {
313                         return nil, nil
314                 }
315                 return s.header.Copy(), nil
316         default:
317         }
318         return nil, err
319 }
320
321 // TrailersOnly blocks until a header or trailers-only frame is received and
322 // then returns true if the stream was trailers-only.  If the stream ends
323 // before headers are received, returns true, nil.  If a context error happens
324 // first, returns it as a status error.  Client-side only.
325 func (s *Stream) TrailersOnly() (bool, error) {
326         err := s.waitOnHeader()
327         if err != nil {
328                 return false, err
329         }
330         // if !headerDone, some other connection error occurred.
331         return s.noHeaders && atomic.LoadUint32(&s.headerDone) == 1, nil
332 }
333
334 // Trailer returns the cached trailer metedata. Note that if it is not called
335 // after the entire stream is done, it could return an empty MD. Client
336 // side only.
337 // It can be safely read only after stream has ended that is either read
338 // or write have returned io.EOF.
339 func (s *Stream) Trailer() metadata.MD {
340         c := s.trailer.Copy()
341         return c
342 }
343
344 // ContentSubtype returns the content-subtype for a request. For example, a
345 // content-subtype of "proto" will result in a content-type of
346 // "application/grpc+proto". This will always be lowercase.  See
347 // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
348 // more details.
349 func (s *Stream) ContentSubtype() string {
350         return s.contentSubtype
351 }
352
353 // Context returns the context of the stream.
354 func (s *Stream) Context() context.Context {
355         return s.ctx
356 }
357
358 // Method returns the method for the stream.
359 func (s *Stream) Method() string {
360         return s.method
361 }
362
363 // Status returns the status received from the server.
364 // Status can be read safely only after the stream has ended,
365 // that is, after Done() is closed.
366 func (s *Stream) Status() *status.Status {
367         return s.status
368 }
369
370 // SetHeader sets the header metadata. This can be called multiple times.
371 // Server side only.
372 // This should not be called in parallel to other data writes.
373 func (s *Stream) SetHeader(md metadata.MD) error {
374         if md.Len() == 0 {
375                 return nil
376         }
377         if s.isHeaderSent() || s.getState() == streamDone {
378                 return ErrIllegalHeaderWrite
379         }
380         s.hdrMu.Lock()
381         s.header = metadata.Join(s.header, md)
382         s.hdrMu.Unlock()
383         return nil
384 }
385
386 // SendHeader sends the given header metadata. The given metadata is
387 // combined with any metadata set by previous calls to SetHeader and
388 // then written to the transport stream.
389 func (s *Stream) SendHeader(md metadata.MD) error {
390         return s.st.WriteHeader(s, md)
391 }
392
393 // SetTrailer sets the trailer metadata which will be sent with the RPC status
394 // by the server. This can be called multiple times. Server side only.
395 // This should not be called parallel to other data writes.
396 func (s *Stream) SetTrailer(md metadata.MD) error {
397         if md.Len() == 0 {
398                 return nil
399         }
400         if s.getState() == streamDone {
401                 return ErrIllegalHeaderWrite
402         }
403         s.hdrMu.Lock()
404         s.trailer = metadata.Join(s.trailer, md)
405         s.hdrMu.Unlock()
406         return nil
407 }
408
409 func (s *Stream) write(m recvMsg) {
410         s.buf.put(m)
411 }
412
413 // Read reads all p bytes from the wire for this stream.
414 func (s *Stream) Read(p []byte) (n int, err error) {
415         // Don't request a read if there was an error earlier
416         if er := s.trReader.(*transportReader).er; er != nil {
417                 return 0, er
418         }
419         s.requestRead(len(p))
420         return io.ReadFull(s.trReader, p)
421 }
422
423 // tranportReader reads all the data available for this Stream from the transport and
424 // passes them into the decoder, which converts them into a gRPC message stream.
425 // The error is io.EOF when the stream is done or another non-nil error if
426 // the stream broke.
427 type transportReader struct {
428         reader io.Reader
429         // The handler to control the window update procedure for both this
430         // particular stream and the associated transport.
431         windowHandler func(int)
432         er            error
433 }
434
435 func (t *transportReader) Read(p []byte) (n int, err error) {
436         n, err = t.reader.Read(p)
437         if err != nil {
438                 t.er = err
439                 return
440         }
441         t.windowHandler(n)
442         return
443 }
444
445 // BytesReceived indicates whether any bytes have been received on this stream.
446 func (s *Stream) BytesReceived() bool {
447         return atomic.LoadUint32(&s.bytesReceived) == 1
448 }
449
450 // Unprocessed indicates whether the server did not process this stream --
451 // i.e. it sent a refused stream or GOAWAY including this stream ID.
452 func (s *Stream) Unprocessed() bool {
453         return atomic.LoadUint32(&s.unprocessed) == 1
454 }
455
456 // GoString is implemented by Stream so context.String() won't
457 // race when printing %#v.
458 func (s *Stream) GoString() string {
459         return fmt.Sprintf("<stream: %p, %v>", s, s.method)
460 }
461
462 // state of transport
463 type transportState int
464
465 const (
466         reachable transportState = iota
467         closing
468         draining
469 )
470
471 // ServerConfig consists of all the configurations to establish a server transport.
472 type ServerConfig struct {
473         MaxStreams            uint32
474         AuthInfo              credentials.AuthInfo
475         InTapHandle           tap.ServerInHandle
476         StatsHandler          stats.Handler
477         KeepaliveParams       keepalive.ServerParameters
478         KeepalivePolicy       keepalive.EnforcementPolicy
479         InitialWindowSize     int32
480         InitialConnWindowSize int32
481         WriteBufferSize       int
482         ReadBufferSize        int
483         ChannelzParentID      int64
484         MaxHeaderListSize     *uint32
485 }
486
487 // NewServerTransport creates a ServerTransport with conn or non-nil error
488 // if it fails.
489 func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) {
490         return newHTTP2Server(conn, config)
491 }
492
493 // ConnectOptions covers all relevant options for communicating with the server.
494 type ConnectOptions struct {
495         // UserAgent is the application user agent.
496         UserAgent string
497         // Dialer specifies how to dial a network address.
498         Dialer func(context.Context, string) (net.Conn, error)
499         // FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
500         FailOnNonTempDialError bool
501         // PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
502         PerRPCCredentials []credentials.PerRPCCredentials
503         // TransportCredentials stores the Authenticator required to setup a client
504         // connection. Only one of TransportCredentials and CredsBundle is non-nil.
505         TransportCredentials credentials.TransportCredentials
506         // CredsBundle is the credentials bundle to be used. Only one of
507         // TransportCredentials and CredsBundle is non-nil.
508         CredsBundle credentials.Bundle
509         // KeepaliveParams stores the keepalive parameters.
510         KeepaliveParams keepalive.ClientParameters
511         // StatsHandler stores the handler for stats.
512         StatsHandler stats.Handler
513         // InitialWindowSize sets the initial window size for a stream.
514         InitialWindowSize int32
515         // InitialConnWindowSize sets the initial window size for a connection.
516         InitialConnWindowSize int32
517         // WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.
518         WriteBufferSize int
519         // ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
520         ReadBufferSize int
521         // ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
522         ChannelzParentID int64
523         // MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
524         MaxHeaderListSize *uint32
525 }
526
527 // TargetInfo contains the information of the target such as network address and metadata.
528 type TargetInfo struct {
529         Addr      string
530         Metadata  interface{}
531         Authority string
532 }
533
534 // NewClientTransport establishes the transport with the required ConnectOptions
535 // and returns it to the caller.
536 func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
537         return newHTTP2Client(connectCtx, ctx, target, opts, onPrefaceReceipt, onGoAway, onClose)
538 }
539
540 // Options provides additional hints and information for message
541 // transmission.
542 type Options struct {
543         // Last indicates whether this write is the last piece for
544         // this stream.
545         Last bool
546 }
547
548 // CallHdr carries the information of a particular RPC.
549 type CallHdr struct {
550         // Host specifies the peer's host.
551         Host string
552
553         // Method specifies the operation to perform.
554         Method string
555
556         // SendCompress specifies the compression algorithm applied on
557         // outbound message.
558         SendCompress string
559
560         // Creds specifies credentials.PerRPCCredentials for a call.
561         Creds credentials.PerRPCCredentials
562
563         // ContentSubtype specifies the content-subtype for a request. For example, a
564         // content-subtype of "proto" will result in a content-type of
565         // "application/grpc+proto". The value of ContentSubtype must be all
566         // lowercase, otherwise the behavior is undefined. See
567         // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
568         // for more details.
569         ContentSubtype string
570
571         PreviousAttempts int // value of grpc-previous-rpc-attempts header to set
572 }
573
574 // ClientTransport is the common interface for all gRPC client-side transport
575 // implementations.
576 type ClientTransport interface {
577         // Close tears down this transport. Once it returns, the transport
578         // should not be accessed any more. The caller must make sure this
579         // is called only once.
580         Close() error
581
582         // GracefulClose starts to tear down the transport. It stops accepting
583         // new RPCs and wait the completion of the pending RPCs.
584         GracefulClose() error
585
586         // Write sends the data for the given stream. A nil stream indicates
587         // the write is to be performed on the transport as a whole.
588         Write(s *Stream, hdr []byte, data []byte, opts *Options) error
589
590         // NewStream creates a Stream for an RPC.
591         NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
592
593         // CloseStream clears the footprint of a stream when the stream is
594         // not needed any more. The err indicates the error incurred when
595         // CloseStream is called. Must be called when a stream is finished
596         // unless the associated transport is closing.
597         CloseStream(stream *Stream, err error)
598
599         // Error returns a channel that is closed when some I/O error
600         // happens. Typically the caller should have a goroutine to monitor
601         // this in order to take action (e.g., close the current transport
602         // and create a new one) in error case. It should not return nil
603         // once the transport is initiated.
604         Error() <-chan struct{}
605
606         // GoAway returns a channel that is closed when ClientTransport
607         // receives the draining signal from the server (e.g., GOAWAY frame in
608         // HTTP/2).
609         GoAway() <-chan struct{}
610
611         // GetGoAwayReason returns the reason why GoAway frame was received.
612         GetGoAwayReason() GoAwayReason
613
614         // IncrMsgSent increments the number of message sent through this transport.
615         IncrMsgSent()
616
617         // IncrMsgRecv increments the number of message received through this transport.
618         IncrMsgRecv()
619 }
620
621 // ServerTransport is the common interface for all gRPC server-side transport
622 // implementations.
623 //
624 // Methods may be called concurrently from multiple goroutines, but
625 // Write methods for a given Stream will be called serially.
626 type ServerTransport interface {
627         // HandleStreams receives incoming streams using the given handler.
628         HandleStreams(func(*Stream), func(context.Context, string) context.Context)
629
630         // WriteHeader sends the header metadata for the given stream.
631         // WriteHeader may not be called on all streams.
632         WriteHeader(s *Stream, md metadata.MD) error
633
634         // Write sends the data for the given stream.
635         // Write may not be called on all streams.
636         Write(s *Stream, hdr []byte, data []byte, opts *Options) error
637
638         // WriteStatus sends the status of a stream to the client.  WriteStatus is
639         // the final call made on a stream and always occurs.
640         WriteStatus(s *Stream, st *status.Status) error
641
642         // Close tears down the transport. Once it is called, the transport
643         // should not be accessed any more. All the pending streams and their
644         // handlers will be terminated asynchronously.
645         Close() error
646
647         // RemoteAddr returns the remote network address.
648         RemoteAddr() net.Addr
649
650         // Drain notifies the client this ServerTransport stops accepting new RPCs.
651         Drain()
652
653         // IncrMsgSent increments the number of message sent through this transport.
654         IncrMsgSent()
655
656         // IncrMsgRecv increments the number of message received through this transport.
657         IncrMsgRecv()
658 }
659
660 // connectionErrorf creates an ConnectionError with the specified error description.
661 func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError {
662         return ConnectionError{
663                 Desc: fmt.Sprintf(format, a...),
664                 temp: temp,
665                 err:  e,
666         }
667 }
668
669 // ConnectionError is an error that results in the termination of the
670 // entire connection and the retry of all the active streams.
671 type ConnectionError struct {
672         Desc string
673         temp bool
674         err  error
675 }
676
677 func (e ConnectionError) Error() string {
678         return fmt.Sprintf("connection error: desc = %q", e.Desc)
679 }
680
681 // Temporary indicates if this connection error is temporary or fatal.
682 func (e ConnectionError) Temporary() bool {
683         return e.temp
684 }
685
686 // Origin returns the original error of this connection error.
687 func (e ConnectionError) Origin() error {
688         // Never return nil error here.
689         // If the original error is nil, return itself.
690         if e.err == nil {
691                 return e
692         }
693         return e.err
694 }
695
696 var (
697         // ErrConnClosing indicates that the transport is closing.
698         ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
699         // errStreamDrain indicates that the stream is rejected because the
700         // connection is draining. This could be caused by goaway or balancer
701         // removing the address.
702         errStreamDrain = status.Error(codes.Unavailable, "the connection is draining")
703         // errStreamDone is returned from write at the client side to indiacte application
704         // layer of an error.
705         errStreamDone = errors.New("the stream is done")
706         // StatusGoAway indicates that the server sent a GOAWAY that included this
707         // stream's ID in unprocessed RPCs.
708         statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection")
709 )
710
711 // GoAwayReason contains the reason for the GoAway frame received.
712 type GoAwayReason uint8
713
714 const (
715         // GoAwayInvalid indicates that no GoAway frame is received.
716         GoAwayInvalid GoAwayReason = 0
717         // GoAwayNoReason is the default value when GoAway frame is received.
718         GoAwayNoReason GoAwayReason = 1
719         // GoAwayTooManyPings indicates that a GoAway frame with
720         // ErrCodeEnhanceYourCalm was received and that the debug data said
721         // "too_many_pings".
722         GoAwayTooManyPings GoAwayReason = 2
723 )
724
725 // channelzData is used to store channelz related data for http2Client and http2Server.
726 // These fields cannot be embedded in the original structs (e.g. http2Client), since to do atomic
727 // operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment.
728 // Here, by grouping those int64 fields inside a struct, we are enforcing the alignment.
729 type channelzData struct {
730         kpCount int64
731         // The number of streams that have started, including already finished ones.
732         streamsStarted int64
733         // Client side: The number of streams that have ended successfully by receiving
734         // EoS bit set frame from server.
735         // Server side: The number of streams that have ended successfully by sending
736         // frame with EoS bit set.
737         streamsSucceeded int64
738         streamsFailed    int64
739         // lastStreamCreatedTime stores the timestamp that the last stream gets created. It is of int64 type
740         // instead of time.Time since it's more costly to atomically update time.Time variable than int64
741         // variable. The same goes for lastMsgSentTime and lastMsgRecvTime.
742         lastStreamCreatedTime int64
743         msgSent               int64
744         msgRecv               int64
745         lastMsgSentTime       int64
746         lastMsgRecvTime       int64
747 }
748
749 // ContextErr converts the error from context package into a status error.
750 func ContextErr(err error) error {
751         switch err {
752         case context.DeadlineExceeded:
753                 return status.Error(codes.DeadlineExceeded, err.Error())
754         case context.Canceled:
755                 return status.Error(codes.Canceled, err.Error())
756         }
757         return status.Errorf(codes.Internal, "Unexpected error from context packet: %v", err)
758 }