Code refactoring for bpa operator
[icn.git] / cmd / bpa-operator / vendor / google.golang.org / grpc / internal / transport / http2_client.go
1 /*
2  *
3  * Copyright 2014 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 package transport
20
21 import (
22         "context"
23         "fmt"
24         "io"
25         "math"
26         "net"
27         "strconv"
28         "strings"
29         "sync"
30         "sync/atomic"
31         "time"
32
33         "golang.org/x/net/http2"
34         "golang.org/x/net/http2/hpack"
35
36         "google.golang.org/grpc/codes"
37         "google.golang.org/grpc/credentials"
38         "google.golang.org/grpc/internal/channelz"
39         "google.golang.org/grpc/internal/syscall"
40         "google.golang.org/grpc/keepalive"
41         "google.golang.org/grpc/metadata"
42         "google.golang.org/grpc/peer"
43         "google.golang.org/grpc/stats"
44         "google.golang.org/grpc/status"
45 )
46
47 // http2Client implements the ClientTransport interface with HTTP2.
48 type http2Client struct {
49         ctx        context.Context
50         cancel     context.CancelFunc
51         ctxDone    <-chan struct{} // Cache the ctx.Done() chan.
52         userAgent  string
53         md         interface{}
54         conn       net.Conn // underlying communication channel
55         loopy      *loopyWriter
56         remoteAddr net.Addr
57         localAddr  net.Addr
58         authInfo   credentials.AuthInfo // auth info about the connection
59
60         readerDone chan struct{} // sync point to enable testing.
61         writerDone chan struct{} // sync point to enable testing.
62         // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
63         // that the server sent GoAway on this transport.
64         goAway chan struct{}
65         // awakenKeepalive is used to wake up keepalive when after it has gone dormant.
66         awakenKeepalive chan struct{}
67
68         framer *framer
69         // controlBuf delivers all the control related tasks (e.g., window
70         // updates, reset streams, and various settings) to the controller.
71         controlBuf *controlBuffer
72         fc         *trInFlow
73         // The scheme used: https if TLS is on, http otherwise.
74         scheme string
75
76         isSecure bool
77
78         perRPCCreds []credentials.PerRPCCredentials
79
80         // Boolean to keep track of reading activity on transport.
81         // 1 is true and 0 is false.
82         activity         uint32 // Accessed atomically.
83         kp               keepalive.ClientParameters
84         keepaliveEnabled bool
85
86         statsHandler stats.Handler
87
88         initialWindowSize int32
89
90         // configured by peer through SETTINGS_MAX_HEADER_LIST_SIZE
91         maxSendHeaderListSize *uint32
92
93         bdpEst *bdpEstimator
94         // onPrefaceReceipt is a callback that client transport calls upon
95         // receiving server preface to signal that a succefull HTTP2
96         // connection was established.
97         onPrefaceReceipt func()
98
99         maxConcurrentStreams  uint32
100         streamQuota           int64
101         streamsQuotaAvailable chan struct{}
102         waitingStreams        uint32
103         nextID                uint32
104
105         mu            sync.Mutex // guard the following variables
106         state         transportState
107         activeStreams map[uint32]*Stream
108         // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
109         prevGoAwayID uint32
110         // goAwayReason records the http2.ErrCode and debug data received with the
111         // GoAway frame.
112         goAwayReason GoAwayReason
113
114         // Fields below are for channelz metric collection.
115         channelzID int64 // channelz unique identification number
116         czData     *channelzData
117
118         onGoAway func(GoAwayReason)
119         onClose  func()
120 }
121
122 func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
123         if fn != nil {
124                 return fn(ctx, addr)
125         }
126         return (&net.Dialer{}).DialContext(ctx, "tcp", addr)
127 }
128
129 func isTemporary(err error) bool {
130         switch err := err.(type) {
131         case interface {
132                 Temporary() bool
133         }:
134                 return err.Temporary()
135         case interface {
136                 Timeout() bool
137         }:
138                 // Timeouts may be resolved upon retry, and are thus treated as
139                 // temporary.
140                 return err.Timeout()
141         }
142         return true
143 }
144
145 // newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
146 // and starts to receive messages on it. Non-nil error returns if construction
147 // fails.
148 func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
149         scheme := "http"
150         ctx, cancel := context.WithCancel(ctx)
151         defer func() {
152                 if err != nil {
153                         cancel()
154                 }
155         }()
156
157         conn, err := dial(connectCtx, opts.Dialer, addr.Addr)
158         if err != nil {
159                 if opts.FailOnNonTempDialError {
160                         return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
161                 }
162                 return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err)
163         }
164         // Any further errors will close the underlying connection
165         defer func(conn net.Conn) {
166                 if err != nil {
167                         conn.Close()
168                 }
169         }(conn)
170         kp := opts.KeepaliveParams
171         // Validate keepalive parameters.
172         if kp.Time == 0 {
173                 kp.Time = defaultClientKeepaliveTime
174         }
175         if kp.Timeout == 0 {
176                 kp.Timeout = defaultClientKeepaliveTimeout
177         }
178         keepaliveEnabled := false
179         if kp.Time != infinity {
180                 if err = syscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil {
181                         return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
182                 }
183                 keepaliveEnabled = true
184         }
185         var (
186                 isSecure bool
187                 authInfo credentials.AuthInfo
188         )
189         transportCreds := opts.TransportCredentials
190         perRPCCreds := opts.PerRPCCredentials
191
192         if b := opts.CredsBundle; b != nil {
193                 if t := b.TransportCredentials(); t != nil {
194                         transportCreds = t
195                 }
196                 if t := b.PerRPCCredentials(); t != nil {
197                         perRPCCreds = append(perRPCCreds, t)
198                 }
199         }
200         if transportCreds != nil {
201                 scheme = "https"
202                 conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.Authority, conn)
203                 if err != nil {
204                         return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
205                 }
206                 isSecure = true
207         }
208         dynamicWindow := true
209         icwz := int32(initialWindowSize)
210         if opts.InitialConnWindowSize >= defaultWindowSize {
211                 icwz = opts.InitialConnWindowSize
212                 dynamicWindow = false
213         }
214         writeBufSize := opts.WriteBufferSize
215         readBufSize := opts.ReadBufferSize
216         maxHeaderListSize := defaultClientMaxHeaderListSize
217         if opts.MaxHeaderListSize != nil {
218                 maxHeaderListSize = *opts.MaxHeaderListSize
219         }
220         t := &http2Client{
221                 ctx:                   ctx,
222                 ctxDone:               ctx.Done(), // Cache Done chan.
223                 cancel:                cancel,
224                 userAgent:             opts.UserAgent,
225                 md:                    addr.Metadata,
226                 conn:                  conn,
227                 remoteAddr:            conn.RemoteAddr(),
228                 localAddr:             conn.LocalAddr(),
229                 authInfo:              authInfo,
230                 readerDone:            make(chan struct{}),
231                 writerDone:            make(chan struct{}),
232                 goAway:                make(chan struct{}),
233                 awakenKeepalive:       make(chan struct{}, 1),
234                 framer:                newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
235                 fc:                    &trInFlow{limit: uint32(icwz)},
236                 scheme:                scheme,
237                 activeStreams:         make(map[uint32]*Stream),
238                 isSecure:              isSecure,
239                 perRPCCreds:           perRPCCreds,
240                 kp:                    kp,
241                 statsHandler:          opts.StatsHandler,
242                 initialWindowSize:     initialWindowSize,
243                 onPrefaceReceipt:      onPrefaceReceipt,
244                 nextID:                1,
245                 maxConcurrentStreams:  defaultMaxStreamsClient,
246                 streamQuota:           defaultMaxStreamsClient,
247                 streamsQuotaAvailable: make(chan struct{}, 1),
248                 czData:                new(channelzData),
249                 onGoAway:              onGoAway,
250                 onClose:               onClose,
251                 keepaliveEnabled:      keepaliveEnabled,
252         }
253         t.controlBuf = newControlBuffer(t.ctxDone)
254         if opts.InitialWindowSize >= defaultWindowSize {
255                 t.initialWindowSize = opts.InitialWindowSize
256                 dynamicWindow = false
257         }
258         if dynamicWindow {
259                 t.bdpEst = &bdpEstimator{
260                         bdp:               initialWindowSize,
261                         updateFlowControl: t.updateFlowControl,
262                 }
263         }
264         // Make sure awakenKeepalive can't be written upon.
265         // keepalive routine will make it writable, if need be.
266         t.awakenKeepalive <- struct{}{}
267         if t.statsHandler != nil {
268                 t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
269                         RemoteAddr: t.remoteAddr,
270                         LocalAddr:  t.localAddr,
271                 })
272                 connBegin := &stats.ConnBegin{
273                         Client: true,
274                 }
275                 t.statsHandler.HandleConn(t.ctx, connBegin)
276         }
277         if channelz.IsOn() {
278                 t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
279         }
280         if t.keepaliveEnabled {
281                 go t.keepalive()
282         }
283         // Start the reader goroutine for incoming message. Each transport has
284         // a dedicated goroutine which reads HTTP2 frame from network. Then it
285         // dispatches the frame to the corresponding stream entity.
286         go t.reader()
287
288         // Send connection preface to server.
289         n, err := t.conn.Write(clientPreface)
290         if err != nil {
291                 t.Close()
292                 return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
293         }
294         if n != len(clientPreface) {
295                 t.Close()
296                 return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
297         }
298         var ss []http2.Setting
299
300         if t.initialWindowSize != defaultWindowSize {
301                 ss = append(ss, http2.Setting{
302                         ID:  http2.SettingInitialWindowSize,
303                         Val: uint32(t.initialWindowSize),
304                 })
305         }
306         if opts.MaxHeaderListSize != nil {
307                 ss = append(ss, http2.Setting{
308                         ID:  http2.SettingMaxHeaderListSize,
309                         Val: *opts.MaxHeaderListSize,
310                 })
311         }
312         err = t.framer.fr.WriteSettings(ss...)
313         if err != nil {
314                 t.Close()
315                 return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
316         }
317         // Adjust the connection flow control window if needed.
318         if delta := uint32(icwz - defaultWindowSize); delta > 0 {
319                 if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
320                         t.Close()
321                         return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
322                 }
323         }
324
325         if err := t.framer.writer.Flush(); err != nil {
326                 return nil, err
327         }
328         go func() {
329                 t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
330                 err := t.loopy.run()
331                 if err != nil {
332                         errorf("transport: loopyWriter.run returning. Err: %v", err)
333                 }
334                 // If it's a connection error, let reader goroutine handle it
335                 // since there might be data in the buffers.
336                 if _, ok := err.(net.Error); !ok {
337                         t.conn.Close()
338                 }
339                 close(t.writerDone)
340         }()
341         return t, nil
342 }
343
344 func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
345         // TODO(zhaoq): Handle uint32 overflow of Stream.id.
346         s := &Stream{
347                 done:           make(chan struct{}),
348                 method:         callHdr.Method,
349                 sendCompress:   callHdr.SendCompress,
350                 buf:            newRecvBuffer(),
351                 headerChan:     make(chan struct{}),
352                 contentSubtype: callHdr.ContentSubtype,
353         }
354         s.wq = newWriteQuota(defaultWriteQuota, s.done)
355         s.requestRead = func(n int) {
356                 t.adjustWindow(s, uint32(n))
357         }
358         // The client side stream context should have exactly the same life cycle with the user provided context.
359         // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
360         // So we use the original context here instead of creating a copy.
361         s.ctx = ctx
362         s.trReader = &transportReader{
363                 reader: &recvBufferReader{
364                         ctx:     s.ctx,
365                         ctxDone: s.ctx.Done(),
366                         recv:    s.buf,
367                         closeStream: func(err error) {
368                                 t.CloseStream(s, err)
369                         },
370                 },
371                 windowHandler: func(n int) {
372                         t.updateWindow(s, uint32(n))
373                 },
374         }
375         return s
376 }
377
378 func (t *http2Client) getPeer() *peer.Peer {
379         pr := &peer.Peer{
380                 Addr: t.remoteAddr,
381         }
382         // Attach Auth info if there is any.
383         if t.authInfo != nil {
384                 pr.AuthInfo = t.authInfo
385         }
386         return pr
387 }
388
389 func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
390         aud := t.createAudience(callHdr)
391         authData, err := t.getTrAuthData(ctx, aud)
392         if err != nil {
393                 return nil, err
394         }
395         callAuthData, err := t.getCallAuthData(ctx, aud, callHdr)
396         if err != nil {
397                 return nil, err
398         }
399         // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
400         // first and create a slice of that exact size.
401         // Make the slice of certain predictable size to reduce allocations made by append.
402         hfLen := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te
403         hfLen += len(authData) + len(callAuthData)
404         headerFields := make([]hpack.HeaderField, 0, hfLen)
405         headerFields = append(headerFields, hpack.HeaderField{Name: ":method", Value: "POST"})
406         headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme})
407         headerFields = append(headerFields, hpack.HeaderField{Name: ":path", Value: callHdr.Method})
408         headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
409         headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(callHdr.ContentSubtype)})
410         headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
411         headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"})
412         if callHdr.PreviousAttempts > 0 {
413                 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(callHdr.PreviousAttempts)})
414         }
415
416         if callHdr.SendCompress != "" {
417                 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
418         }
419         if dl, ok := ctx.Deadline(); ok {
420                 // Send out timeout regardless its value. The server can detect timeout context by itself.
421                 // TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire.
422                 timeout := time.Until(dl)
423                 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})
424         }
425         for k, v := range authData {
426                 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
427         }
428         for k, v := range callAuthData {
429                 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
430         }
431         if b := stats.OutgoingTags(ctx); b != nil {
432                 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)})
433         }
434         if b := stats.OutgoingTrace(ctx); b != nil {
435                 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)})
436         }
437
438         if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
439                 var k string
440                 for _, vv := range added {
441                         for i, v := range vv {
442                                 if i%2 == 0 {
443                                         k = v
444                                         continue
445                                 }
446                                 // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
447                                 if isReservedHeader(k) {
448                                         continue
449                                 }
450                                 headerFields = append(headerFields, hpack.HeaderField{Name: strings.ToLower(k), Value: encodeMetadataHeader(k, v)})
451                         }
452                 }
453                 for k, vv := range md {
454                         // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
455                         if isReservedHeader(k) {
456                                 continue
457                         }
458                         for _, v := range vv {
459                                 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
460                         }
461                 }
462         }
463         if md, ok := t.md.(*metadata.MD); ok {
464                 for k, vv := range *md {
465                         if isReservedHeader(k) {
466                                 continue
467                         }
468                         for _, v := range vv {
469                                 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
470                         }
471                 }
472         }
473         return headerFields, nil
474 }
475
476 func (t *http2Client) createAudience(callHdr *CallHdr) string {
477         // Create an audience string only if needed.
478         if len(t.perRPCCreds) == 0 && callHdr.Creds == nil {
479                 return ""
480         }
481         // Construct URI required to get auth request metadata.
482         // Omit port if it is the default one.
483         host := strings.TrimSuffix(callHdr.Host, ":443")
484         pos := strings.LastIndex(callHdr.Method, "/")
485         if pos == -1 {
486                 pos = len(callHdr.Method)
487         }
488         return "https://" + host + callHdr.Method[:pos]
489 }
490
491 func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error) {
492         authData := map[string]string{}
493         for _, c := range t.perRPCCreds {
494                 data, err := c.GetRequestMetadata(ctx, audience)
495                 if err != nil {
496                         if _, ok := status.FromError(err); ok {
497                                 return nil, err
498                         }
499
500                         return nil, status.Errorf(codes.Unauthenticated, "transport: %v", err)
501                 }
502                 for k, v := range data {
503                         // Capital header names are illegal in HTTP/2.
504                         k = strings.ToLower(k)
505                         authData[k] = v
506                 }
507         }
508         return authData, nil
509 }
510
511 func (t *http2Client) getCallAuthData(ctx context.Context, audience string, callHdr *CallHdr) (map[string]string, error) {
512         callAuthData := map[string]string{}
513         // Check if credentials.PerRPCCredentials were provided via call options.
514         // Note: if these credentials are provided both via dial options and call
515         // options, then both sets of credentials will be applied.
516         if callCreds := callHdr.Creds; callCreds != nil {
517                 if !t.isSecure && callCreds.RequireTransportSecurity() {
518                         return nil, status.Error(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
519                 }
520                 data, err := callCreds.GetRequestMetadata(ctx, audience)
521                 if err != nil {
522                         return nil, status.Errorf(codes.Internal, "transport: %v", err)
523                 }
524                 for k, v := range data {
525                         // Capital header names are illegal in HTTP/2
526                         k = strings.ToLower(k)
527                         callAuthData[k] = v
528                 }
529         }
530         return callAuthData, nil
531 }
532
533 // NewStream creates a stream and registers it into the transport as "active"
534 // streams.
535 func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
536         ctx = peer.NewContext(ctx, t.getPeer())
537         headerFields, err := t.createHeaderFields(ctx, callHdr)
538         if err != nil {
539                 return nil, err
540         }
541         s := t.newStream(ctx, callHdr)
542         cleanup := func(err error) {
543                 if s.swapState(streamDone) == streamDone {
544                         // If it was already done, return.
545                         return
546                 }
547                 // The stream was unprocessed by the server.
548                 atomic.StoreUint32(&s.unprocessed, 1)
549                 s.write(recvMsg{err: err})
550                 close(s.done)
551                 // If headerChan isn't closed, then close it.
552                 if atomic.SwapUint32(&s.headerDone, 1) == 0 {
553                         close(s.headerChan)
554                 }
555
556         }
557         hdr := &headerFrame{
558                 hf:        headerFields,
559                 endStream: false,
560                 initStream: func(id uint32) (bool, error) {
561                         t.mu.Lock()
562                         if state := t.state; state != reachable {
563                                 t.mu.Unlock()
564                                 // Do a quick cleanup.
565                                 err := error(errStreamDrain)
566                                 if state == closing {
567                                         err = ErrConnClosing
568                                 }
569                                 cleanup(err)
570                                 return false, err
571                         }
572                         t.activeStreams[id] = s
573                         if channelz.IsOn() {
574                                 atomic.AddInt64(&t.czData.streamsStarted, 1)
575                                 atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
576                         }
577                         var sendPing bool
578                         // If the number of active streams change from 0 to 1, then check if keepalive
579                         // has gone dormant. If so, wake it up.
580                         if len(t.activeStreams) == 1 && t.keepaliveEnabled {
581                                 select {
582                                 case t.awakenKeepalive <- struct{}{}:
583                                         sendPing = true
584                                         // Fill the awakenKeepalive channel again as this channel must be
585                                         // kept non-writable except at the point that the keepalive()
586                                         // goroutine is waiting either to be awaken or shutdown.
587                                         t.awakenKeepalive <- struct{}{}
588                                 default:
589                                 }
590                         }
591                         t.mu.Unlock()
592                         return sendPing, nil
593                 },
594                 onOrphaned: cleanup,
595                 wq:         s.wq,
596         }
597         firstTry := true
598         var ch chan struct{}
599         checkForStreamQuota := func(it interface{}) bool {
600                 if t.streamQuota <= 0 { // Can go negative if server decreases it.
601                         if firstTry {
602                                 t.waitingStreams++
603                         }
604                         ch = t.streamsQuotaAvailable
605                         return false
606                 }
607                 if !firstTry {
608                         t.waitingStreams--
609                 }
610                 t.streamQuota--
611                 h := it.(*headerFrame)
612                 h.streamID = t.nextID
613                 t.nextID += 2
614                 s.id = h.streamID
615                 s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
616                 if t.streamQuota > 0 && t.waitingStreams > 0 {
617                         select {
618                         case t.streamsQuotaAvailable <- struct{}{}:
619                         default:
620                         }
621                 }
622                 return true
623         }
624         var hdrListSizeErr error
625         checkForHeaderListSize := func(it interface{}) bool {
626                 if t.maxSendHeaderListSize == nil {
627                         return true
628                 }
629                 hdrFrame := it.(*headerFrame)
630                 var sz int64
631                 for _, f := range hdrFrame.hf {
632                         if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
633                                 hdrListSizeErr = status.Errorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *t.maxSendHeaderListSize)
634                                 return false
635                         }
636                 }
637                 return true
638         }
639         for {
640                 success, err := t.controlBuf.executeAndPut(func(it interface{}) bool {
641                         if !checkForStreamQuota(it) {
642                                 return false
643                         }
644                         if !checkForHeaderListSize(it) {
645                                 return false
646                         }
647                         return true
648                 }, hdr)
649                 if err != nil {
650                         return nil, err
651                 }
652                 if success {
653                         break
654                 }
655                 if hdrListSizeErr != nil {
656                         return nil, hdrListSizeErr
657                 }
658                 firstTry = false
659                 select {
660                 case <-ch:
661                 case <-s.ctx.Done():
662                         return nil, ContextErr(s.ctx.Err())
663                 case <-t.goAway:
664                         return nil, errStreamDrain
665                 case <-t.ctx.Done():
666                         return nil, ErrConnClosing
667                 }
668         }
669         if t.statsHandler != nil {
670                 outHeader := &stats.OutHeader{
671                         Client:      true,
672                         FullMethod:  callHdr.Method,
673                         RemoteAddr:  t.remoteAddr,
674                         LocalAddr:   t.localAddr,
675                         Compression: callHdr.SendCompress,
676                 }
677                 t.statsHandler.HandleRPC(s.ctx, outHeader)
678         }
679         return s, nil
680 }
681
682 // CloseStream clears the footprint of a stream when the stream is not needed any more.
683 // This must not be executed in reader's goroutine.
684 func (t *http2Client) CloseStream(s *Stream, err error) {
685         var (
686                 rst     bool
687                 rstCode http2.ErrCode
688         )
689         if err != nil {
690                 rst = true
691                 rstCode = http2.ErrCodeCancel
692         }
693         t.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false)
694 }
695
696 func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) {
697         // Set stream status to done.
698         if s.swapState(streamDone) == streamDone {
699                 // If it was already done, return.  If multiple closeStream calls
700                 // happen simultaneously, wait for the first to finish.
701                 <-s.done
702                 return
703         }
704         // status and trailers can be updated here without any synchronization because the stream goroutine will
705         // only read it after it sees an io.EOF error from read or write and we'll write those errors
706         // only after updating this.
707         s.status = st
708         if len(mdata) > 0 {
709                 s.trailer = mdata
710         }
711         if err != nil {
712                 // This will unblock reads eventually.
713                 s.write(recvMsg{err: err})
714         }
715         // If headerChan isn't closed, then close it.
716         if atomic.SwapUint32(&s.headerDone, 1) == 0 {
717                 s.noHeaders = true
718                 close(s.headerChan)
719         }
720         cleanup := &cleanupStream{
721                 streamID: s.id,
722                 onWrite: func() {
723                         t.mu.Lock()
724                         if t.activeStreams != nil {
725                                 delete(t.activeStreams, s.id)
726                         }
727                         t.mu.Unlock()
728                         if channelz.IsOn() {
729                                 if eosReceived {
730                                         atomic.AddInt64(&t.czData.streamsSucceeded, 1)
731                                 } else {
732                                         atomic.AddInt64(&t.czData.streamsFailed, 1)
733                                 }
734                         }
735                 },
736                 rst:     rst,
737                 rstCode: rstCode,
738         }
739         addBackStreamQuota := func(interface{}) bool {
740                 t.streamQuota++
741                 if t.streamQuota > 0 && t.waitingStreams > 0 {
742                         select {
743                         case t.streamsQuotaAvailable <- struct{}{}:
744                         default:
745                         }
746                 }
747                 return true
748         }
749         t.controlBuf.executeAndPut(addBackStreamQuota, cleanup)
750         // This will unblock write.
751         close(s.done)
752 }
753
754 // Close kicks off the shutdown process of the transport. This should be called
755 // only once on a transport. Once it is called, the transport should not be
756 // accessed any more.
757 //
758 // This method blocks until the addrConn that initiated this transport is
759 // re-connected. This happens because t.onClose() begins reconnect logic at the
760 // addrConn level and blocks until the addrConn is successfully connected.
761 func (t *http2Client) Close() error {
762         t.mu.Lock()
763         // Make sure we only Close once.
764         if t.state == closing {
765                 t.mu.Unlock()
766                 return nil
767         }
768         t.state = closing
769         streams := t.activeStreams
770         t.activeStreams = nil
771         t.mu.Unlock()
772         t.controlBuf.finish()
773         t.cancel()
774         err := t.conn.Close()
775         if channelz.IsOn() {
776                 channelz.RemoveEntry(t.channelzID)
777         }
778         // Notify all active streams.
779         for _, s := range streams {
780                 t.closeStream(s, ErrConnClosing, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil, false)
781         }
782         if t.statsHandler != nil {
783                 connEnd := &stats.ConnEnd{
784                         Client: true,
785                 }
786                 t.statsHandler.HandleConn(t.ctx, connEnd)
787         }
788         t.onClose()
789         return err
790 }
791
792 // GracefulClose sets the state to draining, which prevents new streams from
793 // being created and causes the transport to be closed when the last active
794 // stream is closed.  If there are no active streams, the transport is closed
795 // immediately.  This does nothing if the transport is already draining or
796 // closing.
797 func (t *http2Client) GracefulClose() error {
798         t.mu.Lock()
799         // Make sure we move to draining only from active.
800         if t.state == draining || t.state == closing {
801                 t.mu.Unlock()
802                 return nil
803         }
804         t.state = draining
805         active := len(t.activeStreams)
806         t.mu.Unlock()
807         if active == 0 {
808                 return t.Close()
809         }
810         t.controlBuf.put(&incomingGoAway{})
811         return nil
812 }
813
814 // Write formats the data into HTTP2 data frame(s) and sends it out. The caller
815 // should proceed only if Write returns nil.
816 func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
817         if opts.Last {
818                 // If it's the last message, update stream state.
819                 if !s.compareAndSwapState(streamActive, streamWriteDone) {
820                         return errStreamDone
821                 }
822         } else if s.getState() != streamActive {
823                 return errStreamDone
824         }
825         df := &dataFrame{
826                 streamID:  s.id,
827                 endStream: opts.Last,
828         }
829         if hdr != nil || data != nil { // If it's not an empty data frame.
830                 // Add some data to grpc message header so that we can equally
831                 // distribute bytes across frames.
832                 emptyLen := http2MaxFrameLen - len(hdr)
833                 if emptyLen > len(data) {
834                         emptyLen = len(data)
835                 }
836                 hdr = append(hdr, data[:emptyLen]...)
837                 data = data[emptyLen:]
838                 df.h, df.d = hdr, data
839                 // TODO(mmukhi): The above logic in this if can be moved to loopyWriter's data handler.
840                 if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
841                         return err
842                 }
843         }
844         return t.controlBuf.put(df)
845 }
846
847 func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) {
848         t.mu.Lock()
849         defer t.mu.Unlock()
850         s, ok := t.activeStreams[f.Header().StreamID]
851         return s, ok
852 }
853
854 // adjustWindow sends out extra window update over the initial window size
855 // of stream if the application is requesting data larger in size than
856 // the window.
857 func (t *http2Client) adjustWindow(s *Stream, n uint32) {
858         if w := s.fc.maybeAdjust(n); w > 0 {
859                 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
860         }
861 }
862
863 // updateWindow adjusts the inbound quota for the stream.
864 // Window updates will be sent out when the cumulative quota
865 // exceeds the corresponding threshold.
866 func (t *http2Client) updateWindow(s *Stream, n uint32) {
867         if w := s.fc.onRead(n); w > 0 {
868                 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
869         }
870 }
871
872 // updateFlowControl updates the incoming flow control windows
873 // for the transport and the stream based on the current bdp
874 // estimation.
875 func (t *http2Client) updateFlowControl(n uint32) {
876         t.mu.Lock()
877         for _, s := range t.activeStreams {
878                 s.fc.newLimit(n)
879         }
880         t.mu.Unlock()
881         updateIWS := func(interface{}) bool {
882                 t.initialWindowSize = int32(n)
883                 return true
884         }
885         t.controlBuf.executeAndPut(updateIWS, &outgoingWindowUpdate{streamID: 0, increment: t.fc.newLimit(n)})
886         t.controlBuf.put(&outgoingSettings{
887                 ss: []http2.Setting{
888                         {
889                                 ID:  http2.SettingInitialWindowSize,
890                                 Val: n,
891                         },
892                 },
893         })
894 }
895
896 func (t *http2Client) handleData(f *http2.DataFrame) {
897         size := f.Header().Length
898         var sendBDPPing bool
899         if t.bdpEst != nil {
900                 sendBDPPing = t.bdpEst.add(size)
901         }
902         // Decouple connection's flow control from application's read.
903         // An update on connection's flow control should not depend on
904         // whether user application has read the data or not. Such a
905         // restriction is already imposed on the stream's flow control,
906         // and therefore the sender will be blocked anyways.
907         // Decoupling the connection flow control will prevent other
908         // active(fast) streams from starving in presence of slow or
909         // inactive streams.
910         //
911         if w := t.fc.onData(size); w > 0 {
912                 t.controlBuf.put(&outgoingWindowUpdate{
913                         streamID:  0,
914                         increment: w,
915                 })
916         }
917         if sendBDPPing {
918                 // Avoid excessive ping detection (e.g. in an L7 proxy)
919                 // by sending a window update prior to the BDP ping.
920
921                 if w := t.fc.reset(); w > 0 {
922                         t.controlBuf.put(&outgoingWindowUpdate{
923                                 streamID:  0,
924                                 increment: w,
925                         })
926                 }
927
928                 t.controlBuf.put(bdpPing)
929         }
930         // Select the right stream to dispatch.
931         s, ok := t.getStream(f)
932         if !ok {
933                 return
934         }
935         if size > 0 {
936                 if err := s.fc.onData(size); err != nil {
937                         t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false)
938                         return
939                 }
940                 if f.Header().Flags.Has(http2.FlagDataPadded) {
941                         if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
942                                 t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
943                         }
944                 }
945                 // TODO(bradfitz, zhaoq): A copy is required here because there is no
946                 // guarantee f.Data() is consumed before the arrival of next frame.
947                 // Can this copy be eliminated?
948                 if len(f.Data()) > 0 {
949                         data := make([]byte, len(f.Data()))
950                         copy(data, f.Data())
951                         s.write(recvMsg{data: data})
952                 }
953         }
954         // The server has closed the stream without sending trailers.  Record that
955         // the read direction is closed, and set the status appropriately.
956         if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
957                 t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
958         }
959 }
960
961 func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
962         s, ok := t.getStream(f)
963         if !ok {
964                 return
965         }
966         if f.ErrCode == http2.ErrCodeRefusedStream {
967                 // The stream was unprocessed by the server.
968                 atomic.StoreUint32(&s.unprocessed, 1)
969         }
970         statusCode, ok := http2ErrConvTab[f.ErrCode]
971         if !ok {
972                 warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode)
973                 statusCode = codes.Unknown
974         }
975         if statusCode == codes.Canceled {
976                 // Our deadline was already exceeded, and that was likely the cause of
977                 // this cancelation.  Alter the status code accordingly.
978                 if d, ok := s.ctx.Deadline(); ok && d.After(time.Now()) {
979                         statusCode = codes.DeadlineExceeded
980                 }
981         }
982         t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode), nil, false)
983 }
984
985 func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
986         if f.IsAck() {
987                 return
988         }
989         var maxStreams *uint32
990         var ss []http2.Setting
991         var updateFuncs []func()
992         f.ForeachSetting(func(s http2.Setting) error {
993                 switch s.ID {
994                 case http2.SettingMaxConcurrentStreams:
995                         maxStreams = new(uint32)
996                         *maxStreams = s.Val
997                 case http2.SettingMaxHeaderListSize:
998                         updateFuncs = append(updateFuncs, func() {
999                                 t.maxSendHeaderListSize = new(uint32)
1000                                 *t.maxSendHeaderListSize = s.Val
1001                         })
1002                 default:
1003                         ss = append(ss, s)
1004                 }
1005                 return nil
1006         })
1007         if isFirst && maxStreams == nil {
1008                 maxStreams = new(uint32)
1009                 *maxStreams = math.MaxUint32
1010         }
1011         sf := &incomingSettings{
1012                 ss: ss,
1013         }
1014         if maxStreams != nil {
1015                 updateStreamQuota := func() {
1016                         delta := int64(*maxStreams) - int64(t.maxConcurrentStreams)
1017                         t.maxConcurrentStreams = *maxStreams
1018                         t.streamQuota += delta
1019                         if delta > 0 && t.waitingStreams > 0 {
1020                                 close(t.streamsQuotaAvailable) // wake all of them up.
1021                                 t.streamsQuotaAvailable = make(chan struct{}, 1)
1022                         }
1023                 }
1024                 updateFuncs = append(updateFuncs, updateStreamQuota)
1025         }
1026         t.controlBuf.executeAndPut(func(interface{}) bool {
1027                 for _, f := range updateFuncs {
1028                         f()
1029                 }
1030                 return true
1031         }, sf)
1032 }
1033
1034 func (t *http2Client) handlePing(f *http2.PingFrame) {
1035         if f.IsAck() {
1036                 // Maybe it's a BDP ping.
1037                 if t.bdpEst != nil {
1038                         t.bdpEst.calculate(f.Data)
1039                 }
1040                 return
1041         }
1042         pingAck := &ping{ack: true}
1043         copy(pingAck.data[:], f.Data[:])
1044         t.controlBuf.put(pingAck)
1045 }
1046
1047 func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
1048         t.mu.Lock()
1049         if t.state == closing {
1050                 t.mu.Unlock()
1051                 return
1052         }
1053         if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
1054                 infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
1055         }
1056         id := f.LastStreamID
1057         if id > 0 && id%2 != 1 {
1058                 t.mu.Unlock()
1059                 t.Close()
1060                 return
1061         }
1062         // A client can receive multiple GoAways from the server (see
1063         // https://github.com/grpc/grpc-go/issues/1387).  The idea is that the first
1064         // GoAway will be sent with an ID of MaxInt32 and the second GoAway will be
1065         // sent after an RTT delay with the ID of the last stream the server will
1066         // process.
1067         //
1068         // Therefore, when we get the first GoAway we don't necessarily close any
1069         // streams. While in case of second GoAway we close all streams created after
1070         // the GoAwayId. This way streams that were in-flight while the GoAway from
1071         // server was being sent don't get killed.
1072         select {
1073         case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways).
1074                 // If there are multiple GoAways the first one should always have an ID greater than the following ones.
1075                 if id > t.prevGoAwayID {
1076                         t.mu.Unlock()
1077                         t.Close()
1078                         return
1079                 }
1080         default:
1081                 t.setGoAwayReason(f)
1082                 close(t.goAway)
1083                 t.state = draining
1084                 t.controlBuf.put(&incomingGoAway{})
1085
1086                 // This has to be a new goroutine because we're still using the current goroutine to read in the transport.
1087                 t.onGoAway(t.goAwayReason)
1088         }
1089         // All streams with IDs greater than the GoAwayId
1090         // and smaller than the previous GoAway ID should be killed.
1091         upperLimit := t.prevGoAwayID
1092         if upperLimit == 0 { // This is the first GoAway Frame.
1093                 upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
1094         }
1095         for streamID, stream := range t.activeStreams {
1096                 if streamID > id && streamID <= upperLimit {
1097                         // The stream was unprocessed by the server.
1098                         atomic.StoreUint32(&stream.unprocessed, 1)
1099                         t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
1100                 }
1101         }
1102         t.prevGoAwayID = id
1103         active := len(t.activeStreams)
1104         t.mu.Unlock()
1105         if active == 0 {
1106                 t.Close()
1107         }
1108 }
1109
1110 // setGoAwayReason sets the value of t.goAwayReason based
1111 // on the GoAway frame received.
1112 // It expects a lock on transport's mutext to be held by
1113 // the caller.
1114 func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
1115         t.goAwayReason = GoAwayNoReason
1116         switch f.ErrCode {
1117         case http2.ErrCodeEnhanceYourCalm:
1118                 if string(f.DebugData()) == "too_many_pings" {
1119                         t.goAwayReason = GoAwayTooManyPings
1120                 }
1121         }
1122 }
1123
1124 func (t *http2Client) GetGoAwayReason() GoAwayReason {
1125         t.mu.Lock()
1126         defer t.mu.Unlock()
1127         return t.goAwayReason
1128 }
1129
1130 func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
1131         t.controlBuf.put(&incomingWindowUpdate{
1132                 streamID:  f.Header().StreamID,
1133                 increment: f.Increment,
1134         })
1135 }
1136
1137 // operateHeaders takes action on the decoded headers.
1138 func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
1139         s, ok := t.getStream(frame)
1140         if !ok {
1141                 return
1142         }
1143         atomic.StoreUint32(&s.bytesReceived, 1)
1144         var state decodeState
1145         if err := state.decodeHeader(frame); err != nil {
1146                 t.closeStream(s, err, true, http2.ErrCodeProtocol, status.New(codes.Internal, err.Error()), nil, false)
1147                 // Something wrong. Stops reading even when there is remaining.
1148                 return
1149         }
1150
1151         endStream := frame.StreamEnded()
1152         var isHeader bool
1153         defer func() {
1154                 if t.statsHandler != nil {
1155                         if isHeader {
1156                                 inHeader := &stats.InHeader{
1157                                         Client:     true,
1158                                         WireLength: int(frame.Header().Length),
1159                                 }
1160                                 t.statsHandler.HandleRPC(s.ctx, inHeader)
1161                         } else {
1162                                 inTrailer := &stats.InTrailer{
1163                                         Client:     true,
1164                                         WireLength: int(frame.Header().Length),
1165                                 }
1166                                 t.statsHandler.HandleRPC(s.ctx, inTrailer)
1167                         }
1168                 }
1169         }()
1170         // If headers haven't been received yet.
1171         if atomic.SwapUint32(&s.headerDone, 1) == 0 {
1172                 if !endStream {
1173                         // Headers frame is not actually a trailers-only frame.
1174                         isHeader = true
1175                         // These values can be set without any synchronization because
1176                         // stream goroutine will read it only after seeing a closed
1177                         // headerChan which we'll close after setting this.
1178                         s.recvCompress = state.encoding
1179                         if len(state.mdata) > 0 {
1180                                 s.header = state.mdata
1181                         }
1182                 } else {
1183                         s.noHeaders = true
1184                 }
1185                 close(s.headerChan)
1186         }
1187         if !endStream {
1188                 return
1189         }
1190         // if client received END_STREAM from server while stream was still active, send RST_STREAM
1191         rst := s.getState() == streamActive
1192         t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.mdata, true)
1193 }
1194
1195 // reader runs as a separate goroutine in charge of reading data from network
1196 // connection.
1197 //
1198 // TODO(zhaoq): currently one reader per transport. Investigate whether this is
1199 // optimal.
1200 // TODO(zhaoq): Check the validity of the incoming frame sequence.
1201 func (t *http2Client) reader() {
1202         defer close(t.readerDone)
1203         // Check the validity of server preface.
1204         frame, err := t.framer.fr.ReadFrame()
1205         if err != nil {
1206                 t.Close() // this kicks off resetTransport, so must be last before return
1207                 return
1208         }
1209         t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
1210         if t.keepaliveEnabled {
1211                 atomic.CompareAndSwapUint32(&t.activity, 0, 1)
1212         }
1213         sf, ok := frame.(*http2.SettingsFrame)
1214         if !ok {
1215                 t.Close() // this kicks off resetTransport, so must be last before return
1216                 return
1217         }
1218         t.onPrefaceReceipt()
1219         t.handleSettings(sf, true)
1220
1221         // loop to keep reading incoming messages on this transport.
1222         for {
1223                 frame, err := t.framer.fr.ReadFrame()
1224                 if t.keepaliveEnabled {
1225                         atomic.CompareAndSwapUint32(&t.activity, 0, 1)
1226                 }
1227                 if err != nil {
1228                         // Abort an active stream if the http2.Framer returns a
1229                         // http2.StreamError. This can happen only if the server's response
1230                         // is malformed http2.
1231                         if se, ok := err.(http2.StreamError); ok {
1232                                 t.mu.Lock()
1233                                 s := t.activeStreams[se.StreamID]
1234                                 t.mu.Unlock()
1235                                 if s != nil {
1236                                         // use error detail to provide better err message
1237                                         code := http2ErrConvTab[se.Code]
1238                                         msg := t.framer.fr.ErrorDetail().Error()
1239                                         t.closeStream(s, status.Error(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false)
1240                                 }
1241                                 continue
1242                         } else {
1243                                 // Transport error.
1244                                 t.Close()
1245                                 return
1246                         }
1247                 }
1248                 switch frame := frame.(type) {
1249                 case *http2.MetaHeadersFrame:
1250                         t.operateHeaders(frame)
1251                 case *http2.DataFrame:
1252                         t.handleData(frame)
1253                 case *http2.RSTStreamFrame:
1254                         t.handleRSTStream(frame)
1255                 case *http2.SettingsFrame:
1256                         t.handleSettings(frame, false)
1257                 case *http2.PingFrame:
1258                         t.handlePing(frame)
1259                 case *http2.GoAwayFrame:
1260                         t.handleGoAway(frame)
1261                 case *http2.WindowUpdateFrame:
1262                         t.handleWindowUpdate(frame)
1263                 default:
1264                         errorf("transport: http2Client.reader got unhandled frame type %v.", frame)
1265                 }
1266         }
1267 }
1268
1269 // keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
1270 func (t *http2Client) keepalive() {
1271         p := &ping{data: [8]byte{}}
1272         timer := time.NewTimer(t.kp.Time)
1273         for {
1274                 select {
1275                 case <-timer.C:
1276                         if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
1277                                 timer.Reset(t.kp.Time)
1278                                 continue
1279                         }
1280                         // Check if keepalive should go dormant.
1281                         t.mu.Lock()
1282                         if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
1283                                 // Make awakenKeepalive writable.
1284                                 <-t.awakenKeepalive
1285                                 t.mu.Unlock()
1286                                 select {
1287                                 case <-t.awakenKeepalive:
1288                                         // If the control gets here a ping has been sent
1289                                         // need to reset the timer with keepalive.Timeout.
1290                                 case <-t.ctx.Done():
1291                                         return
1292                                 }
1293                         } else {
1294                                 t.mu.Unlock()
1295                                 if channelz.IsOn() {
1296                                         atomic.AddInt64(&t.czData.kpCount, 1)
1297                                 }
1298                                 // Send ping.
1299                                 t.controlBuf.put(p)
1300                         }
1301
1302                         // By the time control gets here a ping has been sent one way or the other.
1303                         timer.Reset(t.kp.Timeout)
1304                         select {
1305                         case <-timer.C:
1306                                 if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
1307                                         timer.Reset(t.kp.Time)
1308                                         continue
1309                                 }
1310                                 t.Close()
1311                                 return
1312                         case <-t.ctx.Done():
1313                                 if !timer.Stop() {
1314                                         <-timer.C
1315                                 }
1316                                 return
1317                         }
1318                 case <-t.ctx.Done():
1319                         if !timer.Stop() {
1320                                 <-timer.C
1321                         }
1322                         return
1323                 }
1324         }
1325 }
1326
1327 func (t *http2Client) Error() <-chan struct{} {
1328         return t.ctx.Done()
1329 }
1330
1331 func (t *http2Client) GoAway() <-chan struct{} {
1332         return t.goAway
1333 }
1334
1335 func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric {
1336         s := channelz.SocketInternalMetric{
1337                 StreamsStarted:                  atomic.LoadInt64(&t.czData.streamsStarted),
1338                 StreamsSucceeded:                atomic.LoadInt64(&t.czData.streamsSucceeded),
1339                 StreamsFailed:                   atomic.LoadInt64(&t.czData.streamsFailed),
1340                 MessagesSent:                    atomic.LoadInt64(&t.czData.msgSent),
1341                 MessagesReceived:                atomic.LoadInt64(&t.czData.msgRecv),
1342                 KeepAlivesSent:                  atomic.LoadInt64(&t.czData.kpCount),
1343                 LastLocalStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
1344                 LastMessageSentTimestamp:        time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
1345                 LastMessageReceivedTimestamp:    time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
1346                 LocalFlowControlWindow:          int64(t.fc.getSize()),
1347                 SocketOptions:                   channelz.GetSocketOption(t.conn),
1348                 LocalAddr:                       t.localAddr,
1349                 RemoteAddr:                      t.remoteAddr,
1350                 // RemoteName :
1351         }
1352         if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
1353                 s.Security = au.GetSecurityValue()
1354         }
1355         s.RemoteFlowControlWindow = t.getOutFlowWindow()
1356         return &s
1357 }
1358
1359 func (t *http2Client) IncrMsgSent() {
1360         atomic.AddInt64(&t.czData.msgSent, 1)
1361         atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
1362 }
1363
1364 func (t *http2Client) IncrMsgRecv() {
1365         atomic.AddInt64(&t.czData.msgRecv, 1)
1366         atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
1367 }
1368
1369 func (t *http2Client) getOutFlowWindow() int64 {
1370         resp := make(chan uint32, 1)
1371         timer := time.NewTimer(time.Second)
1372         defer timer.Stop()
1373         t.controlBuf.put(&outFlowControlSizeRequest{resp})
1374         select {
1375         case sz := <-resp:
1376                 return int64(sz)
1377         case <-t.ctxDone:
1378                 return -1
1379         case <-timer.C:
1380                 return -2
1381         }
1382 }