3 * Copyright 2014 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 // Package transport defines and implements message oriented communication
20 // channel to complete various transactions (e.g., an RPC). It is meant for
21 // grpc-internal usage and is not intended to be imported directly by users.
33 "google.golang.org/grpc/codes"
34 "google.golang.org/grpc/credentials"
35 "google.golang.org/grpc/keepalive"
36 "google.golang.org/grpc/metadata"
37 "google.golang.org/grpc/stats"
38 "google.golang.org/grpc/status"
39 "google.golang.org/grpc/tap"
42 // recvMsg represents the received msg from the transport. All transport
43 // protocol specific info has been removed.
46 // nil: received some data
47 // io.EOF: stream is completed. data is nil.
48 // other non-nil error: transport failure. data is nil.
52 // recvBuffer is an unbounded channel of recvMsg structs.
53 // Note recvBuffer differs from controlBuffer only in that recvBuffer
54 // holds a channel of only recvMsg structs instead of objects implementing "item" interface.
55 // recvBuffer is written to much more often than
56 // controlBuffer and using strict recvMsg structs helps avoid allocation in "recvBuffer.put"
57 type recvBuffer struct {
64 func newRecvBuffer() *recvBuffer {
66 c: make(chan recvMsg, 1),
71 func (b *recvBuffer) put(r recvMsg) {
75 // An error had occurred earlier, don't accept more
80 if len(b.backlog) == 0 {
88 b.backlog = append(b.backlog, r)
92 func (b *recvBuffer) load() {
94 if len(b.backlog) > 0 {
96 case b.c <- b.backlog[0]:
97 b.backlog[0] = recvMsg{}
98 b.backlog = b.backlog[1:]
105 // get returns the channel that receives a recvMsg in the buffer.
107 // Upon receipt of a recvMsg, the caller should call load to send another
108 // recvMsg onto the channel if there is any.
109 func (b *recvBuffer) get() <-chan recvMsg {
113 // recvBufferReader implements io.Reader interface to read the data from
115 type recvBufferReader struct {
116 closeStream func(error) // Closes the client transport stream with the given error and nil trailer metadata.
118 ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
120 last []byte // Stores the remaining data in the previous calls.
124 // Read reads the next len(p) bytes from last. If last is drained, it tries to
125 // read additional data from recv. It blocks if there no additional data available
126 // in recv. If Read returns any non-nil error, it will continue to return that error.
127 func (r *recvBufferReader) Read(p []byte) (n int, err error) {
131 if r.last != nil && len(r.last) > 0 {
132 // Read remaining data left in last call.
133 copied := copy(p, r.last)
134 r.last = r.last[copied:]
137 if r.closeStream != nil {
138 n, r.err = r.readClient(p)
145 func (r *recvBufferReader) read(p []byte) (n int, err error) {
148 return 0, ContextErr(r.ctx.Err())
149 case m := <-r.recv.get():
150 return r.readAdditional(m, p)
154 func (r *recvBufferReader) readClient(p []byte) (n int, err error) {
155 // If the context is canceled, then closes the stream with nil metadata.
156 // closeStream writes its error parameter to r.recv as a recvMsg.
157 // r.readAdditional acts on that message and returns the necessary error.
160 r.closeStream(ContextErr(r.ctx.Err()))
162 return r.readAdditional(m, p)
163 case m := <-r.recv.get():
164 return r.readAdditional(m, p)
168 func (r *recvBufferReader) readAdditional(m recvMsg, p []byte) (n int, err error) {
173 copied := copy(p, m.data)
174 r.last = m.data[copied:]
178 type streamState uint32
181 streamActive streamState = iota
182 streamWriteDone // EndStream sent
183 streamReadDone // EndStream received
184 streamDone // the entire stream is finished.
187 // Stream represents an RPC in the transport layer.
190 st ServerTransport // nil for client side Stream
191 ctx context.Context // the associated context of the stream
192 cancel context.CancelFunc // always nil for client side Stream
193 done chan struct{} // closed at the end of stream to unblock writers. On the client side.
194 ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance)
195 method string // the associated RPC method of the stream
203 // Callback to state application's intentions to read data. This
204 // is used to adjust flow control, if needed.
205 requestRead func(int)
207 headerChan chan struct{} // closed to indicate the end of header metadata.
208 headerDone uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
210 // hdrMu protects header and trailer metadata on the server-side.
212 // On client side, header keeps the received header metadata.
214 // On server side, header keeps the header set by SetHeader(). The complete
215 // header will merged into this after t.WriteHeader() is called.
217 trailer metadata.MD // the key-value map of trailer metadata.
219 noHeaders bool // set if the client never received headers (set only after the stream is done).
221 // On the server-side, headerSent is atomically set to 1 when the headers are sent out.
226 // On client-side it is the status error received from the server.
227 // On server-side it is unused.
228 status *status.Status
230 bytesReceived uint32 // indicates whether any bytes have been received on this stream
231 unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream
233 // contentSubtype is the content-subtype for requests.
234 // this must be lowercase or the behavior is undefined.
235 contentSubtype string
238 // isHeaderSent is only valid on the server-side.
239 func (s *Stream) isHeaderSent() bool {
240 return atomic.LoadUint32(&s.headerSent) == 1
243 // updateHeaderSent updates headerSent and returns true
244 // if it was alreay set. It is valid only on server-side.
245 func (s *Stream) updateHeaderSent() bool {
246 return atomic.SwapUint32(&s.headerSent, 1) == 1
249 func (s *Stream) swapState(st streamState) streamState {
250 return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st)))
253 func (s *Stream) compareAndSwapState(oldState, newState streamState) bool {
254 return atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(oldState), uint32(newState))
257 func (s *Stream) getState() streamState {
258 return streamState(atomic.LoadUint32((*uint32)(&s.state)))
261 func (s *Stream) waitOnHeader() error {
262 if s.headerChan == nil {
263 // On the server headerChan is always nil since a stream originates
264 // only after having received headers.
269 return ContextErr(s.ctx.Err())
275 // RecvCompress returns the compression algorithm applied to the inbound
276 // message. It is empty string if there is no compression applied.
277 func (s *Stream) RecvCompress() string {
278 if err := s.waitOnHeader(); err != nil {
281 return s.recvCompress
284 // SetSendCompress sets the compression algorithm to the stream.
285 func (s *Stream) SetSendCompress(str string) {
289 // Done returns a channel which is closed when it receives the final status
291 func (s *Stream) Done() <-chan struct{} {
295 // Header returns the header metadata of the stream.
297 // On client side, it acquires the key-value pairs of header metadata once it is
298 // available. It blocks until i) the metadata is ready or ii) there is no header
299 // metadata or iii) the stream is canceled/expired.
301 // On server side, it returns the out header after t.WriteHeader is called.
302 func (s *Stream) Header() (metadata.MD, error) {
303 if s.headerChan == nil && s.header != nil {
304 // On server side, return the header in stream. It will be the out
305 // header after t.WriteHeader is called.
306 return s.header.Copy(), nil
308 err := s.waitOnHeader()
309 // Even if the stream is closed, header is returned if available.
315 return s.header.Copy(), nil
321 // TrailersOnly blocks until a header or trailers-only frame is received and
322 // then returns true if the stream was trailers-only. If the stream ends
323 // before headers are received, returns true, nil. If a context error happens
324 // first, returns it as a status error. Client-side only.
325 func (s *Stream) TrailersOnly() (bool, error) {
326 err := s.waitOnHeader()
330 // if !headerDone, some other connection error occurred.
331 return s.noHeaders && atomic.LoadUint32(&s.headerDone) == 1, nil
334 // Trailer returns the cached trailer metedata. Note that if it is not called
335 // after the entire stream is done, it could return an empty MD. Client
337 // It can be safely read only after stream has ended that is either read
338 // or write have returned io.EOF.
339 func (s *Stream) Trailer() metadata.MD {
340 c := s.trailer.Copy()
344 // ContentSubtype returns the content-subtype for a request. For example, a
345 // content-subtype of "proto" will result in a content-type of
346 // "application/grpc+proto". This will always be lowercase. See
347 // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
349 func (s *Stream) ContentSubtype() string {
350 return s.contentSubtype
353 // Context returns the context of the stream.
354 func (s *Stream) Context() context.Context {
358 // Method returns the method for the stream.
359 func (s *Stream) Method() string {
363 // Status returns the status received from the server.
364 // Status can be read safely only after the stream has ended,
365 // that is, after Done() is closed.
366 func (s *Stream) Status() *status.Status {
370 // SetHeader sets the header metadata. This can be called multiple times.
372 // This should not be called in parallel to other data writes.
373 func (s *Stream) SetHeader(md metadata.MD) error {
377 if s.isHeaderSent() || s.getState() == streamDone {
378 return ErrIllegalHeaderWrite
381 s.header = metadata.Join(s.header, md)
386 // SendHeader sends the given header metadata. The given metadata is
387 // combined with any metadata set by previous calls to SetHeader and
388 // then written to the transport stream.
389 func (s *Stream) SendHeader(md metadata.MD) error {
390 return s.st.WriteHeader(s, md)
393 // SetTrailer sets the trailer metadata which will be sent with the RPC status
394 // by the server. This can be called multiple times. Server side only.
395 // This should not be called parallel to other data writes.
396 func (s *Stream) SetTrailer(md metadata.MD) error {
400 if s.getState() == streamDone {
401 return ErrIllegalHeaderWrite
404 s.trailer = metadata.Join(s.trailer, md)
409 func (s *Stream) write(m recvMsg) {
413 // Read reads all p bytes from the wire for this stream.
414 func (s *Stream) Read(p []byte) (n int, err error) {
415 // Don't request a read if there was an error earlier
416 if er := s.trReader.(*transportReader).er; er != nil {
419 s.requestRead(len(p))
420 return io.ReadFull(s.trReader, p)
423 // tranportReader reads all the data available for this Stream from the transport and
424 // passes them into the decoder, which converts them into a gRPC message stream.
425 // The error is io.EOF when the stream is done or another non-nil error if
427 type transportReader struct {
429 // The handler to control the window update procedure for both this
430 // particular stream and the associated transport.
431 windowHandler func(int)
435 func (t *transportReader) Read(p []byte) (n int, err error) {
436 n, err = t.reader.Read(p)
445 // BytesReceived indicates whether any bytes have been received on this stream.
446 func (s *Stream) BytesReceived() bool {
447 return atomic.LoadUint32(&s.bytesReceived) == 1
450 // Unprocessed indicates whether the server did not process this stream --
451 // i.e. it sent a refused stream or GOAWAY including this stream ID.
452 func (s *Stream) Unprocessed() bool {
453 return atomic.LoadUint32(&s.unprocessed) == 1
456 // GoString is implemented by Stream so context.String() won't
457 // race when printing %#v.
458 func (s *Stream) GoString() string {
459 return fmt.Sprintf("<stream: %p, %v>", s, s.method)
462 // state of transport
463 type transportState int
466 reachable transportState = iota
471 // ServerConfig consists of all the configurations to establish a server transport.
472 type ServerConfig struct {
474 AuthInfo credentials.AuthInfo
475 InTapHandle tap.ServerInHandle
476 StatsHandler stats.Handler
477 KeepaliveParams keepalive.ServerParameters
478 KeepalivePolicy keepalive.EnforcementPolicy
479 InitialWindowSize int32
480 InitialConnWindowSize int32
483 ChannelzParentID int64
484 MaxHeaderListSize *uint32
487 // NewServerTransport creates a ServerTransport with conn or non-nil error
489 func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) {
490 return newHTTP2Server(conn, config)
493 // ConnectOptions covers all relevant options for communicating with the server.
494 type ConnectOptions struct {
495 // UserAgent is the application user agent.
497 // Dialer specifies how to dial a network address.
498 Dialer func(context.Context, string) (net.Conn, error)
499 // FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
500 FailOnNonTempDialError bool
501 // PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
502 PerRPCCredentials []credentials.PerRPCCredentials
503 // TransportCredentials stores the Authenticator required to setup a client
504 // connection. Only one of TransportCredentials and CredsBundle is non-nil.
505 TransportCredentials credentials.TransportCredentials
506 // CredsBundle is the credentials bundle to be used. Only one of
507 // TransportCredentials and CredsBundle is non-nil.
508 CredsBundle credentials.Bundle
509 // KeepaliveParams stores the keepalive parameters.
510 KeepaliveParams keepalive.ClientParameters
511 // StatsHandler stores the handler for stats.
512 StatsHandler stats.Handler
513 // InitialWindowSize sets the initial window size for a stream.
514 InitialWindowSize int32
515 // InitialConnWindowSize sets the initial window size for a connection.
516 InitialConnWindowSize int32
517 // WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.
519 // ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
521 // ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
522 ChannelzParentID int64
523 // MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
524 MaxHeaderListSize *uint32
527 // TargetInfo contains the information of the target such as network address and metadata.
528 type TargetInfo struct {
534 // NewClientTransport establishes the transport with the required ConnectOptions
535 // and returns it to the caller.
536 func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
537 return newHTTP2Client(connectCtx, ctx, target, opts, onPrefaceReceipt, onGoAway, onClose)
540 // Options provides additional hints and information for message
542 type Options struct {
543 // Last indicates whether this write is the last piece for
548 // CallHdr carries the information of a particular RPC.
549 type CallHdr struct {
550 // Host specifies the peer's host.
553 // Method specifies the operation to perform.
556 // SendCompress specifies the compression algorithm applied on
560 // Creds specifies credentials.PerRPCCredentials for a call.
561 Creds credentials.PerRPCCredentials
563 // ContentSubtype specifies the content-subtype for a request. For example, a
564 // content-subtype of "proto" will result in a content-type of
565 // "application/grpc+proto". The value of ContentSubtype must be all
566 // lowercase, otherwise the behavior is undefined. See
567 // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
569 ContentSubtype string
571 PreviousAttempts int // value of grpc-previous-rpc-attempts header to set
574 // ClientTransport is the common interface for all gRPC client-side transport
576 type ClientTransport interface {
577 // Close tears down this transport. Once it returns, the transport
578 // should not be accessed any more. The caller must make sure this
579 // is called only once.
582 // GracefulClose starts to tear down the transport. It stops accepting
583 // new RPCs and wait the completion of the pending RPCs.
584 GracefulClose() error
586 // Write sends the data for the given stream. A nil stream indicates
587 // the write is to be performed on the transport as a whole.
588 Write(s *Stream, hdr []byte, data []byte, opts *Options) error
590 // NewStream creates a Stream for an RPC.
591 NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
593 // CloseStream clears the footprint of a stream when the stream is
594 // not needed any more. The err indicates the error incurred when
595 // CloseStream is called. Must be called when a stream is finished
596 // unless the associated transport is closing.
597 CloseStream(stream *Stream, err error)
599 // Error returns a channel that is closed when some I/O error
600 // happens. Typically the caller should have a goroutine to monitor
601 // this in order to take action (e.g., close the current transport
602 // and create a new one) in error case. It should not return nil
603 // once the transport is initiated.
604 Error() <-chan struct{}
606 // GoAway returns a channel that is closed when ClientTransport
607 // receives the draining signal from the server (e.g., GOAWAY frame in
609 GoAway() <-chan struct{}
611 // GetGoAwayReason returns the reason why GoAway frame was received.
612 GetGoAwayReason() GoAwayReason
614 // IncrMsgSent increments the number of message sent through this transport.
617 // IncrMsgRecv increments the number of message received through this transport.
621 // ServerTransport is the common interface for all gRPC server-side transport
624 // Methods may be called concurrently from multiple goroutines, but
625 // Write methods for a given Stream will be called serially.
626 type ServerTransport interface {
627 // HandleStreams receives incoming streams using the given handler.
628 HandleStreams(func(*Stream), func(context.Context, string) context.Context)
630 // WriteHeader sends the header metadata for the given stream.
631 // WriteHeader may not be called on all streams.
632 WriteHeader(s *Stream, md metadata.MD) error
634 // Write sends the data for the given stream.
635 // Write may not be called on all streams.
636 Write(s *Stream, hdr []byte, data []byte, opts *Options) error
638 // WriteStatus sends the status of a stream to the client. WriteStatus is
639 // the final call made on a stream and always occurs.
640 WriteStatus(s *Stream, st *status.Status) error
642 // Close tears down the transport. Once it is called, the transport
643 // should not be accessed any more. All the pending streams and their
644 // handlers will be terminated asynchronously.
647 // RemoteAddr returns the remote network address.
648 RemoteAddr() net.Addr
650 // Drain notifies the client this ServerTransport stops accepting new RPCs.
653 // IncrMsgSent increments the number of message sent through this transport.
656 // IncrMsgRecv increments the number of message received through this transport.
660 // connectionErrorf creates an ConnectionError with the specified error description.
661 func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError {
662 return ConnectionError{
663 Desc: fmt.Sprintf(format, a...),
669 // ConnectionError is an error that results in the termination of the
670 // entire connection and the retry of all the active streams.
671 type ConnectionError struct {
677 func (e ConnectionError) Error() string {
678 return fmt.Sprintf("connection error: desc = %q", e.Desc)
681 // Temporary indicates if this connection error is temporary or fatal.
682 func (e ConnectionError) Temporary() bool {
686 // Origin returns the original error of this connection error.
687 func (e ConnectionError) Origin() error {
688 // Never return nil error here.
689 // If the original error is nil, return itself.
697 // ErrConnClosing indicates that the transport is closing.
698 ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
699 // errStreamDrain indicates that the stream is rejected because the
700 // connection is draining. This could be caused by goaway or balancer
701 // removing the address.
702 errStreamDrain = status.Error(codes.Unavailable, "the connection is draining")
703 // errStreamDone is returned from write at the client side to indiacte application
704 // layer of an error.
705 errStreamDone = errors.New("the stream is done")
706 // StatusGoAway indicates that the server sent a GOAWAY that included this
707 // stream's ID in unprocessed RPCs.
708 statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection")
711 // GoAwayReason contains the reason for the GoAway frame received.
712 type GoAwayReason uint8
715 // GoAwayInvalid indicates that no GoAway frame is received.
716 GoAwayInvalid GoAwayReason = 0
717 // GoAwayNoReason is the default value when GoAway frame is received.
718 GoAwayNoReason GoAwayReason = 1
719 // GoAwayTooManyPings indicates that a GoAway frame with
720 // ErrCodeEnhanceYourCalm was received and that the debug data said
722 GoAwayTooManyPings GoAwayReason = 2
725 // channelzData is used to store channelz related data for http2Client and http2Server.
726 // These fields cannot be embedded in the original structs (e.g. http2Client), since to do atomic
727 // operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment.
728 // Here, by grouping those int64 fields inside a struct, we are enforcing the alignment.
729 type channelzData struct {
731 // The number of streams that have started, including already finished ones.
733 // Client side: The number of streams that have ended successfully by receiving
734 // EoS bit set frame from server.
735 // Server side: The number of streams that have ended successfully by sending
736 // frame with EoS bit set.
737 streamsSucceeded int64
739 // lastStreamCreatedTime stores the timestamp that the last stream gets created. It is of int64 type
740 // instead of time.Time since it's more costly to atomically update time.Time variable than int64
741 // variable. The same goes for lastMsgSentTime and lastMsgRecvTime.
742 lastStreamCreatedTime int64
745 lastMsgSentTime int64
746 lastMsgRecvTime int64
749 // ContextErr converts the error from context package into a status error.
750 func ContextErr(err error) error {
752 case context.DeadlineExceeded:
753 return status.Error(codes.DeadlineExceeded, err.Error())
754 case context.Canceled:
755 return status.Error(codes.Canceled, err.Error())
757 return status.Errorf(codes.Internal, "Unexpected error from context packet: %v", err)