Remove BPA from Makefile
[icn.git] / cmd / bpa-operator / vendor / google.golang.org / grpc / internal / transport / http2_server.go
1 /*
2  *
3  * Copyright 2014 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 package transport
20
21 import (
22         "bytes"
23         "context"
24         "errors"
25         "fmt"
26         "io"
27         "math"
28         "net"
29         "strconv"
30         "sync"
31         "sync/atomic"
32         "time"
33
34         "github.com/golang/protobuf/proto"
35         "golang.org/x/net/http2"
36         "golang.org/x/net/http2/hpack"
37
38         "google.golang.org/grpc/codes"
39         "google.golang.org/grpc/credentials"
40         "google.golang.org/grpc/grpclog"
41         "google.golang.org/grpc/internal/channelz"
42         "google.golang.org/grpc/internal/grpcrand"
43         "google.golang.org/grpc/keepalive"
44         "google.golang.org/grpc/metadata"
45         "google.golang.org/grpc/peer"
46         "google.golang.org/grpc/stats"
47         "google.golang.org/grpc/status"
48         "google.golang.org/grpc/tap"
49 )
50
51 var (
52         // ErrIllegalHeaderWrite indicates that setting header is illegal because of
53         // the stream's state.
54         ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
55         // ErrHeaderListSizeLimitViolation indicates that the header list size is larger
56         // than the limit set by peer.
57         ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
58 )
59
60 // http2Server implements the ServerTransport interface with HTTP2.
61 type http2Server struct {
62         ctx         context.Context
63         ctxDone     <-chan struct{} // Cache the context.Done() chan
64         cancel      context.CancelFunc
65         conn        net.Conn
66         loopy       *loopyWriter
67         readerDone  chan struct{} // sync point to enable testing.
68         writerDone  chan struct{} // sync point to enable testing.
69         remoteAddr  net.Addr
70         localAddr   net.Addr
71         maxStreamID uint32               // max stream ID ever seen
72         authInfo    credentials.AuthInfo // auth info about the connection
73         inTapHandle tap.ServerInHandle
74         framer      *framer
75         // The max number of concurrent streams.
76         maxStreams uint32
77         // controlBuf delivers all the control related tasks (e.g., window
78         // updates, reset streams, and various settings) to the controller.
79         controlBuf *controlBuffer
80         fc         *trInFlow
81         stats      stats.Handler
82         // Flag to keep track of reading activity on transport.
83         // 1 is true and 0 is false.
84         activity uint32 // Accessed atomically.
85         // Keepalive and max-age parameters for the server.
86         kp keepalive.ServerParameters
87
88         // Keepalive enforcement policy.
89         kep keepalive.EnforcementPolicy
90         // The time instance last ping was received.
91         lastPingAt time.Time
92         // Number of times the client has violated keepalive ping policy so far.
93         pingStrikes uint8
94         // Flag to signify that number of ping strikes should be reset to 0.
95         // This is set whenever data or header frames are sent.
96         // 1 means yes.
97         resetPingStrikes      uint32 // Accessed atomically.
98         initialWindowSize     int32
99         bdpEst                *bdpEstimator
100         maxSendHeaderListSize *uint32
101
102         mu sync.Mutex // guard the following
103
104         // drainChan is initialized when drain(...) is called the first time.
105         // After which the server writes out the first GoAway(with ID 2^31-1) frame.
106         // Then an independent goroutine will be launched to later send the second GoAway.
107         // During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
108         // Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
109         // already underway.
110         drainChan     chan struct{}
111         state         transportState
112         activeStreams map[uint32]*Stream
113         // idle is the time instant when the connection went idle.
114         // This is either the beginning of the connection or when the number of
115         // RPCs go down to 0.
116         // When the connection is busy, this value is set to 0.
117         idle time.Time
118
119         // Fields below are for channelz metric collection.
120         channelzID int64 // channelz unique identification number
121         czData     *channelzData
122 }
123
124 // newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
125 // returned if something goes wrong.
126 func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
127         writeBufSize := config.WriteBufferSize
128         readBufSize := config.ReadBufferSize
129         maxHeaderListSize := defaultServerMaxHeaderListSize
130         if config.MaxHeaderListSize != nil {
131                 maxHeaderListSize = *config.MaxHeaderListSize
132         }
133         framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
134         // Send initial settings as connection preface to client.
135         var isettings []http2.Setting
136         // TODO(zhaoq): Have a better way to signal "no limit" because 0 is
137         // permitted in the HTTP2 spec.
138         maxStreams := config.MaxStreams
139         if maxStreams == 0 {
140                 maxStreams = math.MaxUint32
141         } else {
142                 isettings = append(isettings, http2.Setting{
143                         ID:  http2.SettingMaxConcurrentStreams,
144                         Val: maxStreams,
145                 })
146         }
147         dynamicWindow := true
148         iwz := int32(initialWindowSize)
149         if config.InitialWindowSize >= defaultWindowSize {
150                 iwz = config.InitialWindowSize
151                 dynamicWindow = false
152         }
153         icwz := int32(initialWindowSize)
154         if config.InitialConnWindowSize >= defaultWindowSize {
155                 icwz = config.InitialConnWindowSize
156                 dynamicWindow = false
157         }
158         if iwz != defaultWindowSize {
159                 isettings = append(isettings, http2.Setting{
160                         ID:  http2.SettingInitialWindowSize,
161                         Val: uint32(iwz)})
162         }
163         if config.MaxHeaderListSize != nil {
164                 isettings = append(isettings, http2.Setting{
165                         ID:  http2.SettingMaxHeaderListSize,
166                         Val: *config.MaxHeaderListSize,
167                 })
168         }
169         if err := framer.fr.WriteSettings(isettings...); err != nil {
170                 return nil, connectionErrorf(false, err, "transport: %v", err)
171         }
172         // Adjust the connection flow control window if needed.
173         if delta := uint32(icwz - defaultWindowSize); delta > 0 {
174                 if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
175                         return nil, connectionErrorf(false, err, "transport: %v", err)
176                 }
177         }
178         kp := config.KeepaliveParams
179         if kp.MaxConnectionIdle == 0 {
180                 kp.MaxConnectionIdle = defaultMaxConnectionIdle
181         }
182         if kp.MaxConnectionAge == 0 {
183                 kp.MaxConnectionAge = defaultMaxConnectionAge
184         }
185         // Add a jitter to MaxConnectionAge.
186         kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
187         if kp.MaxConnectionAgeGrace == 0 {
188                 kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
189         }
190         if kp.Time == 0 {
191                 kp.Time = defaultServerKeepaliveTime
192         }
193         if kp.Timeout == 0 {
194                 kp.Timeout = defaultServerKeepaliveTimeout
195         }
196         kep := config.KeepalivePolicy
197         if kep.MinTime == 0 {
198                 kep.MinTime = defaultKeepalivePolicyMinTime
199         }
200         ctx, cancel := context.WithCancel(context.Background())
201         t := &http2Server{
202                 ctx:               ctx,
203                 cancel:            cancel,
204                 ctxDone:           ctx.Done(),
205                 conn:              conn,
206                 remoteAddr:        conn.RemoteAddr(),
207                 localAddr:         conn.LocalAddr(),
208                 authInfo:          config.AuthInfo,
209                 framer:            framer,
210                 readerDone:        make(chan struct{}),
211                 writerDone:        make(chan struct{}),
212                 maxStreams:        maxStreams,
213                 inTapHandle:       config.InTapHandle,
214                 fc:                &trInFlow{limit: uint32(icwz)},
215                 state:             reachable,
216                 activeStreams:     make(map[uint32]*Stream),
217                 stats:             config.StatsHandler,
218                 kp:                kp,
219                 idle:              time.Now(),
220                 kep:               kep,
221                 initialWindowSize: iwz,
222                 czData:            new(channelzData),
223         }
224         t.controlBuf = newControlBuffer(t.ctxDone)
225         if dynamicWindow {
226                 t.bdpEst = &bdpEstimator{
227                         bdp:               initialWindowSize,
228                         updateFlowControl: t.updateFlowControl,
229                 }
230         }
231         if t.stats != nil {
232                 t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
233                         RemoteAddr: t.remoteAddr,
234                         LocalAddr:  t.localAddr,
235                 })
236                 connBegin := &stats.ConnBegin{}
237                 t.stats.HandleConn(t.ctx, connBegin)
238         }
239         if channelz.IsOn() {
240                 t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
241         }
242         t.framer.writer.Flush()
243
244         defer func() {
245                 if err != nil {
246                         t.Close()
247                 }
248         }()
249
250         // Check the validity of client preface.
251         preface := make([]byte, len(clientPreface))
252         if _, err := io.ReadFull(t.conn, preface); err != nil {
253                 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
254         }
255         if !bytes.Equal(preface, clientPreface) {
256                 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
257         }
258
259         frame, err := t.framer.fr.ReadFrame()
260         if err == io.EOF || err == io.ErrUnexpectedEOF {
261                 return nil, err
262         }
263         if err != nil {
264                 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
265         }
266         atomic.StoreUint32(&t.activity, 1)
267         sf, ok := frame.(*http2.SettingsFrame)
268         if !ok {
269                 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
270         }
271         t.handleSettings(sf)
272
273         go func() {
274                 t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
275                 t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
276                 if err := t.loopy.run(); err != nil {
277                         errorf("transport: loopyWriter.run returning. Err: %v", err)
278                 }
279                 t.conn.Close()
280                 close(t.writerDone)
281         }()
282         go t.keepalive()
283         return t, nil
284 }
285
286 // operateHeader takes action on the decoded headers.
287 func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
288         streamID := frame.Header().StreamID
289         state := decodeState{serverSide: true}
290         if err := state.decodeHeader(frame); err != nil {
291                 if se, ok := status.FromError(err); ok {
292                         t.controlBuf.put(&cleanupStream{
293                                 streamID: streamID,
294                                 rst:      true,
295                                 rstCode:  statusCodeConvTab[se.Code()],
296                                 onWrite:  func() {},
297                         })
298                 }
299                 return false
300         }
301
302         buf := newRecvBuffer()
303         s := &Stream{
304                 id:             streamID,
305                 st:             t,
306                 buf:            buf,
307                 fc:             &inFlow{limit: uint32(t.initialWindowSize)},
308                 recvCompress:   state.encoding,
309                 method:         state.method,
310                 contentSubtype: state.contentSubtype,
311         }
312         if frame.StreamEnded() {
313                 // s is just created by the caller. No lock needed.
314                 s.state = streamReadDone
315         }
316         if state.timeoutSet {
317                 s.ctx, s.cancel = context.WithTimeout(t.ctx, state.timeout)
318         } else {
319                 s.ctx, s.cancel = context.WithCancel(t.ctx)
320         }
321         pr := &peer.Peer{
322                 Addr: t.remoteAddr,
323         }
324         // Attach Auth info if there is any.
325         if t.authInfo != nil {
326                 pr.AuthInfo = t.authInfo
327         }
328         s.ctx = peer.NewContext(s.ctx, pr)
329         // Attach the received metadata to the context.
330         if len(state.mdata) > 0 {
331                 s.ctx = metadata.NewIncomingContext(s.ctx, state.mdata)
332         }
333         if state.statsTags != nil {
334                 s.ctx = stats.SetIncomingTags(s.ctx, state.statsTags)
335         }
336         if state.statsTrace != nil {
337                 s.ctx = stats.SetIncomingTrace(s.ctx, state.statsTrace)
338         }
339         if t.inTapHandle != nil {
340                 var err error
341                 info := &tap.Info{
342                         FullMethodName: state.method,
343                 }
344                 s.ctx, err = t.inTapHandle(s.ctx, info)
345                 if err != nil {
346                         warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
347                         t.controlBuf.put(&cleanupStream{
348                                 streamID: s.id,
349                                 rst:      true,
350                                 rstCode:  http2.ErrCodeRefusedStream,
351                                 onWrite:  func() {},
352                         })
353                         return false
354                 }
355         }
356         t.mu.Lock()
357         if t.state != reachable {
358                 t.mu.Unlock()
359                 return false
360         }
361         if uint32(len(t.activeStreams)) >= t.maxStreams {
362                 t.mu.Unlock()
363                 t.controlBuf.put(&cleanupStream{
364                         streamID: streamID,
365                         rst:      true,
366                         rstCode:  http2.ErrCodeRefusedStream,
367                         onWrite:  func() {},
368                 })
369                 return false
370         }
371         if streamID%2 != 1 || streamID <= t.maxStreamID {
372                 t.mu.Unlock()
373                 // illegal gRPC stream id.
374                 errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
375                 return true
376         }
377         t.maxStreamID = streamID
378         t.activeStreams[streamID] = s
379         if len(t.activeStreams) == 1 {
380                 t.idle = time.Time{}
381         }
382         t.mu.Unlock()
383         if channelz.IsOn() {
384                 atomic.AddInt64(&t.czData.streamsStarted, 1)
385                 atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
386         }
387         s.requestRead = func(n int) {
388                 t.adjustWindow(s, uint32(n))
389         }
390         s.ctx = traceCtx(s.ctx, s.method)
391         if t.stats != nil {
392                 s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
393                 inHeader := &stats.InHeader{
394                         FullMethod:  s.method,
395                         RemoteAddr:  t.remoteAddr,
396                         LocalAddr:   t.localAddr,
397                         Compression: s.recvCompress,
398                         WireLength:  int(frame.Header().Length),
399                 }
400                 t.stats.HandleRPC(s.ctx, inHeader)
401         }
402         s.ctxDone = s.ctx.Done()
403         s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
404         s.trReader = &transportReader{
405                 reader: &recvBufferReader{
406                         ctx:     s.ctx,
407                         ctxDone: s.ctxDone,
408                         recv:    s.buf,
409                 },
410                 windowHandler: func(n int) {
411                         t.updateWindow(s, uint32(n))
412                 },
413         }
414         // Register the stream with loopy.
415         t.controlBuf.put(&registerStream{
416                 streamID: s.id,
417                 wq:       s.wq,
418         })
419         handle(s)
420         return false
421 }
422
423 // HandleStreams receives incoming streams using the given handler. This is
424 // typically run in a separate goroutine.
425 // traceCtx attaches trace to ctx and returns the new context.
426 func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
427         defer close(t.readerDone)
428         for {
429                 frame, err := t.framer.fr.ReadFrame()
430                 atomic.StoreUint32(&t.activity, 1)
431                 if err != nil {
432                         if se, ok := err.(http2.StreamError); ok {
433                                 warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
434                                 t.mu.Lock()
435                                 s := t.activeStreams[se.StreamID]
436                                 t.mu.Unlock()
437                                 if s != nil {
438                                         t.closeStream(s, true, se.Code, nil, false)
439                                 } else {
440                                         t.controlBuf.put(&cleanupStream{
441                                                 streamID: se.StreamID,
442                                                 rst:      true,
443                                                 rstCode:  se.Code,
444                                                 onWrite:  func() {},
445                                         })
446                                 }
447                                 continue
448                         }
449                         if err == io.EOF || err == io.ErrUnexpectedEOF {
450                                 t.Close()
451                                 return
452                         }
453                         warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
454                         t.Close()
455                         return
456                 }
457                 switch frame := frame.(type) {
458                 case *http2.MetaHeadersFrame:
459                         if t.operateHeaders(frame, handle, traceCtx) {
460                                 t.Close()
461                                 break
462                         }
463                 case *http2.DataFrame:
464                         t.handleData(frame)
465                 case *http2.RSTStreamFrame:
466                         t.handleRSTStream(frame)
467                 case *http2.SettingsFrame:
468                         t.handleSettings(frame)
469                 case *http2.PingFrame:
470                         t.handlePing(frame)
471                 case *http2.WindowUpdateFrame:
472                         t.handleWindowUpdate(frame)
473                 case *http2.GoAwayFrame:
474                         // TODO: Handle GoAway from the client appropriately.
475                 default:
476                         errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
477                 }
478         }
479 }
480
481 func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
482         t.mu.Lock()
483         defer t.mu.Unlock()
484         if t.activeStreams == nil {
485                 // The transport is closing.
486                 return nil, false
487         }
488         s, ok := t.activeStreams[f.Header().StreamID]
489         if !ok {
490                 // The stream is already done.
491                 return nil, false
492         }
493         return s, true
494 }
495
496 // adjustWindow sends out extra window update over the initial window size
497 // of stream if the application is requesting data larger in size than
498 // the window.
499 func (t *http2Server) adjustWindow(s *Stream, n uint32) {
500         if w := s.fc.maybeAdjust(n); w > 0 {
501                 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
502         }
503
504 }
505
506 // updateWindow adjusts the inbound quota for the stream and the transport.
507 // Window updates will deliver to the controller for sending when
508 // the cumulative quota exceeds the corresponding threshold.
509 func (t *http2Server) updateWindow(s *Stream, n uint32) {
510         if w := s.fc.onRead(n); w > 0 {
511                 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
512                         increment: w,
513                 })
514         }
515 }
516
517 // updateFlowControl updates the incoming flow control windows
518 // for the transport and the stream based on the current bdp
519 // estimation.
520 func (t *http2Server) updateFlowControl(n uint32) {
521         t.mu.Lock()
522         for _, s := range t.activeStreams {
523                 s.fc.newLimit(n)
524         }
525         t.initialWindowSize = int32(n)
526         t.mu.Unlock()
527         t.controlBuf.put(&outgoingWindowUpdate{
528                 streamID:  0,
529                 increment: t.fc.newLimit(n),
530         })
531         t.controlBuf.put(&outgoingSettings{
532                 ss: []http2.Setting{
533                         {
534                                 ID:  http2.SettingInitialWindowSize,
535                                 Val: n,
536                         },
537                 },
538         })
539
540 }
541
542 func (t *http2Server) handleData(f *http2.DataFrame) {
543         size := f.Header().Length
544         var sendBDPPing bool
545         if t.bdpEst != nil {
546                 sendBDPPing = t.bdpEst.add(size)
547         }
548         // Decouple connection's flow control from application's read.
549         // An update on connection's flow control should not depend on
550         // whether user application has read the data or not. Such a
551         // restriction is already imposed on the stream's flow control,
552         // and therefore the sender will be blocked anyways.
553         // Decoupling the connection flow control will prevent other
554         // active(fast) streams from starving in presence of slow or
555         // inactive streams.
556         if w := t.fc.onData(size); w > 0 {
557                 t.controlBuf.put(&outgoingWindowUpdate{
558                         streamID:  0,
559                         increment: w,
560                 })
561         }
562         if sendBDPPing {
563                 // Avoid excessive ping detection (e.g. in an L7 proxy)
564                 // by sending a window update prior to the BDP ping.
565                 if w := t.fc.reset(); w > 0 {
566                         t.controlBuf.put(&outgoingWindowUpdate{
567                                 streamID:  0,
568                                 increment: w,
569                         })
570                 }
571                 t.controlBuf.put(bdpPing)
572         }
573         // Select the right stream to dispatch.
574         s, ok := t.getStream(f)
575         if !ok {
576                 return
577         }
578         if size > 0 {
579                 if err := s.fc.onData(size); err != nil {
580                         t.closeStream(s, true, http2.ErrCodeFlowControl, nil, false)
581                         return
582                 }
583                 if f.Header().Flags.Has(http2.FlagDataPadded) {
584                         if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
585                                 t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
586                         }
587                 }
588                 // TODO(bradfitz, zhaoq): A copy is required here because there is no
589                 // guarantee f.Data() is consumed before the arrival of next frame.
590                 // Can this copy be eliminated?
591                 if len(f.Data()) > 0 {
592                         data := make([]byte, len(f.Data()))
593                         copy(data, f.Data())
594                         s.write(recvMsg{data: data})
595                 }
596         }
597         if f.Header().Flags.Has(http2.FlagDataEndStream) {
598                 // Received the end of stream from the client.
599                 s.compareAndSwapState(streamActive, streamReadDone)
600                 s.write(recvMsg{err: io.EOF})
601         }
602 }
603
604 func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
605         s, ok := t.getStream(f)
606         if !ok {
607                 return
608         }
609         t.closeStream(s, false, 0, nil, false)
610 }
611
612 func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
613         if f.IsAck() {
614                 return
615         }
616         var ss []http2.Setting
617         var updateFuncs []func()
618         f.ForeachSetting(func(s http2.Setting) error {
619                 switch s.ID {
620                 case http2.SettingMaxHeaderListSize:
621                         updateFuncs = append(updateFuncs, func() {
622                                 t.maxSendHeaderListSize = new(uint32)
623                                 *t.maxSendHeaderListSize = s.Val
624                         })
625                 default:
626                         ss = append(ss, s)
627                 }
628                 return nil
629         })
630         t.controlBuf.executeAndPut(func(interface{}) bool {
631                 for _, f := range updateFuncs {
632                         f()
633                 }
634                 return true
635         }, &incomingSettings{
636                 ss: ss,
637         })
638 }
639
640 const (
641         maxPingStrikes     = 2
642         defaultPingTimeout = 2 * time.Hour
643 )
644
645 func (t *http2Server) handlePing(f *http2.PingFrame) {
646         if f.IsAck() {
647                 if f.Data == goAwayPing.data && t.drainChan != nil {
648                         close(t.drainChan)
649                         return
650                 }
651                 // Maybe it's a BDP ping.
652                 if t.bdpEst != nil {
653                         t.bdpEst.calculate(f.Data)
654                 }
655                 return
656         }
657         pingAck := &ping{ack: true}
658         copy(pingAck.data[:], f.Data[:])
659         t.controlBuf.put(pingAck)
660
661         now := time.Now()
662         defer func() {
663                 t.lastPingAt = now
664         }()
665         // A reset ping strikes means that we don't need to check for policy
666         // violation for this ping and the pingStrikes counter should be set
667         // to 0.
668         if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
669                 t.pingStrikes = 0
670                 return
671         }
672         t.mu.Lock()
673         ns := len(t.activeStreams)
674         t.mu.Unlock()
675         if ns < 1 && !t.kep.PermitWithoutStream {
676                 // Keepalive shouldn't be active thus, this new ping should
677                 // have come after at least defaultPingTimeout.
678                 if t.lastPingAt.Add(defaultPingTimeout).After(now) {
679                         t.pingStrikes++
680                 }
681         } else {
682                 // Check if keepalive policy is respected.
683                 if t.lastPingAt.Add(t.kep.MinTime).After(now) {
684                         t.pingStrikes++
685                 }
686         }
687
688         if t.pingStrikes > maxPingStrikes {
689                 // Send goaway and close the connection.
690                 errorf("transport: Got too many pings from the client, closing the connection.")
691                 t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
692         }
693 }
694
695 func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
696         t.controlBuf.put(&incomingWindowUpdate{
697                 streamID:  f.Header().StreamID,
698                 increment: f.Increment,
699         })
700 }
701
702 func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
703         for k, vv := range md {
704                 if isReservedHeader(k) {
705                         // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
706                         continue
707                 }
708                 for _, v := range vv {
709                         headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
710                 }
711         }
712         return headerFields
713 }
714
715 func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
716         if t.maxSendHeaderListSize == nil {
717                 return true
718         }
719         hdrFrame := it.(*headerFrame)
720         var sz int64
721         for _, f := range hdrFrame.hf {
722                 if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
723                         errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
724                         return false
725                 }
726         }
727         return true
728 }
729
730 // WriteHeader sends the header metedata md back to the client.
731 func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
732         if s.updateHeaderSent() || s.getState() == streamDone {
733                 return ErrIllegalHeaderWrite
734         }
735         s.hdrMu.Lock()
736         if md.Len() > 0 {
737                 if s.header.Len() > 0 {
738                         s.header = metadata.Join(s.header, md)
739                 } else {
740                         s.header = md
741                 }
742         }
743         if err := t.writeHeaderLocked(s); err != nil {
744                 s.hdrMu.Unlock()
745                 return err
746         }
747         s.hdrMu.Unlock()
748         return nil
749 }
750
751 func (t *http2Server) writeHeaderLocked(s *Stream) error {
752         // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
753         // first and create a slice of that exact size.
754         headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
755         headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
756         headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
757         if s.sendCompress != "" {
758                 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
759         }
760         headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
761         success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{
762                 streamID:  s.id,
763                 hf:        headerFields,
764                 endStream: false,
765                 onWrite: func() {
766                         atomic.StoreUint32(&t.resetPingStrikes, 1)
767                 },
768         })
769         if !success {
770                 if err != nil {
771                         return err
772                 }
773                 t.closeStream(s, true, http2.ErrCodeInternal, nil, false)
774                 return ErrHeaderListSizeLimitViolation
775         }
776         if t.stats != nil {
777                 // Note: WireLength is not set in outHeader.
778                 // TODO(mmukhi): Revisit this later, if needed.
779                 outHeader := &stats.OutHeader{}
780                 t.stats.HandleRPC(s.Context(), outHeader)
781         }
782         return nil
783 }
784
785 // WriteStatus sends stream status to the client and terminates the stream.
786 // There is no further I/O operations being able to perform on this stream.
787 // TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
788 // OK is adopted.
789 func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
790         if s.getState() == streamDone {
791                 return nil
792         }
793         s.hdrMu.Lock()
794         // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
795         // first and create a slice of that exact size.
796         headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
797         if !s.updateHeaderSent() {                      // No headers have been sent.
798                 if len(s.header) > 0 { // Send a separate header frame.
799                         if err := t.writeHeaderLocked(s); err != nil {
800                                 s.hdrMu.Unlock()
801                                 return err
802                         }
803                 } else { // Send a trailer only response.
804                         headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
805                         headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
806                 }
807         }
808         headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
809         headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
810
811         if p := st.Proto(); p != nil && len(p.Details) > 0 {
812                 stBytes, err := proto.Marshal(p)
813                 if err != nil {
814                         // TODO: return error instead, when callers are able to handle it.
815                         grpclog.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
816                 } else {
817                         headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
818                 }
819         }
820
821         // Attach the trailer metadata.
822         headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
823         trailingHeader := &headerFrame{
824                 streamID:  s.id,
825                 hf:        headerFields,
826                 endStream: true,
827                 onWrite: func() {
828                         atomic.StoreUint32(&t.resetPingStrikes, 1)
829                 },
830         }
831         s.hdrMu.Unlock()
832         success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
833         if !success {
834                 if err != nil {
835                         return err
836                 }
837                 t.closeStream(s, true, http2.ErrCodeInternal, nil, false)
838                 return ErrHeaderListSizeLimitViolation
839         }
840         t.closeStream(s, false, 0, trailingHeader, true)
841         if t.stats != nil {
842                 t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
843         }
844         return nil
845 }
846
847 // Write converts the data into HTTP2 data frame and sends it out. Non-nil error
848 // is returns if it fails (e.g., framing error, transport error).
849 func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
850         if !s.isHeaderSent() { // Headers haven't been written yet.
851                 if err := t.WriteHeader(s, nil); err != nil {
852                         // TODO(mmukhi, dfawley): Make sure this is the right code to return.
853                         return status.Errorf(codes.Internal, "transport: %v", err)
854                 }
855         } else {
856                 // Writing headers checks for this condition.
857                 if s.getState() == streamDone {
858                         // TODO(mmukhi, dfawley): Should the server write also return io.EOF?
859                         s.cancel()
860                         select {
861                         case <-t.ctx.Done():
862                                 return ErrConnClosing
863                         default:
864                         }
865                         return ContextErr(s.ctx.Err())
866                 }
867         }
868         // Add some data to header frame so that we can equally distribute bytes across frames.
869         emptyLen := http2MaxFrameLen - len(hdr)
870         if emptyLen > len(data) {
871                 emptyLen = len(data)
872         }
873         hdr = append(hdr, data[:emptyLen]...)
874         data = data[emptyLen:]
875         df := &dataFrame{
876                 streamID: s.id,
877                 h:        hdr,
878                 d:        data,
879                 onEachWrite: func() {
880                         atomic.StoreUint32(&t.resetPingStrikes, 1)
881                 },
882         }
883         if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
884                 select {
885                 case <-t.ctx.Done():
886                         return ErrConnClosing
887                 default:
888                 }
889                 return ContextErr(s.ctx.Err())
890         }
891         return t.controlBuf.put(df)
892 }
893
894 // keepalive running in a separate goroutine does the following:
895 // 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
896 // 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
897 // 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
898 // 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
899 // after an additional duration of keepalive.Timeout.
900 func (t *http2Server) keepalive() {
901         p := &ping{}
902         var pingSent bool
903         maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
904         maxAge := time.NewTimer(t.kp.MaxConnectionAge)
905         keepalive := time.NewTimer(t.kp.Time)
906         // NOTE: All exit paths of this function should reset their
907         // respective timers. A failure to do so will cause the
908         // following clean-up to deadlock and eventually leak.
909         defer func() {
910                 if !maxIdle.Stop() {
911                         <-maxIdle.C
912                 }
913                 if !maxAge.Stop() {
914                         <-maxAge.C
915                 }
916                 if !keepalive.Stop() {
917                         <-keepalive.C
918                 }
919         }()
920         for {
921                 select {
922                 case <-maxIdle.C:
923                         t.mu.Lock()
924                         idle := t.idle
925                         if idle.IsZero() { // The connection is non-idle.
926                                 t.mu.Unlock()
927                                 maxIdle.Reset(t.kp.MaxConnectionIdle)
928                                 continue
929                         }
930                         val := t.kp.MaxConnectionIdle - time.Since(idle)
931                         t.mu.Unlock()
932                         if val <= 0 {
933                                 // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
934                                 // Gracefully close the connection.
935                                 t.drain(http2.ErrCodeNo, []byte{})
936                                 // Resetting the timer so that the clean-up doesn't deadlock.
937                                 maxIdle.Reset(infinity)
938                                 return
939                         }
940                         maxIdle.Reset(val)
941                 case <-maxAge.C:
942                         t.drain(http2.ErrCodeNo, []byte{})
943                         maxAge.Reset(t.kp.MaxConnectionAgeGrace)
944                         select {
945                         case <-maxAge.C:
946                                 // Close the connection after grace period.
947                                 t.Close()
948                                 // Resetting the timer so that the clean-up doesn't deadlock.
949                                 maxAge.Reset(infinity)
950                         case <-t.ctx.Done():
951                         }
952                         return
953                 case <-keepalive.C:
954                         if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
955                                 pingSent = false
956                                 keepalive.Reset(t.kp.Time)
957                                 continue
958                         }
959                         if pingSent {
960                                 t.Close()
961                                 // Resetting the timer so that the clean-up doesn't deadlock.
962                                 keepalive.Reset(infinity)
963                                 return
964                         }
965                         pingSent = true
966                         if channelz.IsOn() {
967                                 atomic.AddInt64(&t.czData.kpCount, 1)
968                         }
969                         t.controlBuf.put(p)
970                         keepalive.Reset(t.kp.Timeout)
971                 case <-t.ctx.Done():
972                         return
973                 }
974         }
975 }
976
977 // Close starts shutting down the http2Server transport.
978 // TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
979 // could cause some resource issue. Revisit this later.
980 func (t *http2Server) Close() error {
981         t.mu.Lock()
982         if t.state == closing {
983                 t.mu.Unlock()
984                 return errors.New("transport: Close() was already called")
985         }
986         t.state = closing
987         streams := t.activeStreams
988         t.activeStreams = nil
989         t.mu.Unlock()
990         t.controlBuf.finish()
991         t.cancel()
992         err := t.conn.Close()
993         if channelz.IsOn() {
994                 channelz.RemoveEntry(t.channelzID)
995         }
996         // Cancel all active streams.
997         for _, s := range streams {
998                 s.cancel()
999         }
1000         if t.stats != nil {
1001                 connEnd := &stats.ConnEnd{}
1002                 t.stats.HandleConn(t.ctx, connEnd)
1003         }
1004         return err
1005 }
1006
1007 // deleteStream deletes the stream s from transport's active streams.
1008 func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
1009         t.mu.Lock()
1010         if _, ok := t.activeStreams[s.id]; !ok {
1011                 t.mu.Unlock()
1012                 return
1013         }
1014
1015         delete(t.activeStreams, s.id)
1016         if len(t.activeStreams) == 0 {
1017                 t.idle = time.Now()
1018         }
1019         t.mu.Unlock()
1020
1021         if channelz.IsOn() {
1022                 if eosReceived {
1023                         atomic.AddInt64(&t.czData.streamsSucceeded, 1)
1024                 } else {
1025                         atomic.AddInt64(&t.czData.streamsFailed, 1)
1026                 }
1027         }
1028 }
1029
1030 // closeStream clears the footprint of a stream when the stream is not needed
1031 // any more.
1032 func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
1033         // Mark the stream as done
1034         oldState := s.swapState(streamDone)
1035
1036         // In case stream sending and receiving are invoked in separate
1037         // goroutines (e.g., bi-directional streaming), cancel needs to be
1038         // called to interrupt the potential blocking on other goroutines.
1039         s.cancel()
1040
1041         // Deletes the stream from active streams
1042         t.deleteStream(s, eosReceived)
1043
1044         cleanup := &cleanupStream{
1045                 streamID: s.id,
1046                 rst:      rst,
1047                 rstCode:  rstCode,
1048                 onWrite:  func() {},
1049         }
1050
1051         // No trailer. Puts cleanupFrame into transport's control buffer.
1052         if hdr == nil {
1053                 t.controlBuf.put(cleanup)
1054                 return
1055         }
1056
1057         // We do the check here, because of the following scenario:
1058         // 1. closeStream is called first with a trailer. A trailer item with a piggybacked cleanup item
1059         // is put to control buffer.
1060         // 2. Loopy writer is waiting on a stream quota. It will never get it because client errored at
1061         // some point. So loopy can't act on trailer
1062         // 3. Client sends a RST_STREAM due to the error. Then closeStream is called without a trailer as
1063         // the result of the received RST_STREAM.
1064         // If we do this check at the beginning of the closeStream, then we won't put a cleanup item in
1065         // response to received RST_STREAM into the control buffer and outStream in loopy writer will
1066         // never get cleaned up.
1067
1068         // If the stream is already done, don't send the trailer.
1069         if oldState == streamDone {
1070                 return
1071         }
1072
1073         hdr.cleanup = cleanup
1074         t.controlBuf.put(hdr)
1075 }
1076
1077 func (t *http2Server) RemoteAddr() net.Addr {
1078         return t.remoteAddr
1079 }
1080
1081 func (t *http2Server) Drain() {
1082         t.drain(http2.ErrCodeNo, []byte{})
1083 }
1084
1085 func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
1086         t.mu.Lock()
1087         defer t.mu.Unlock()
1088         if t.drainChan != nil {
1089                 return
1090         }
1091         t.drainChan = make(chan struct{})
1092         t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
1093 }
1094
1095 var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
1096
1097 // Handles outgoing GoAway and returns true if loopy needs to put itself
1098 // in draining mode.
1099 func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
1100         t.mu.Lock()
1101         if t.state == closing { // TODO(mmukhi): This seems unnecessary.
1102                 t.mu.Unlock()
1103                 // The transport is closing.
1104                 return false, ErrConnClosing
1105         }
1106         sid := t.maxStreamID
1107         if !g.headsUp {
1108                 // Stop accepting more streams now.
1109                 t.state = draining
1110                 if len(t.activeStreams) == 0 {
1111                         g.closeConn = true
1112                 }
1113                 t.mu.Unlock()
1114                 if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
1115                         return false, err
1116                 }
1117                 if g.closeConn {
1118                         // Abruptly close the connection following the GoAway (via
1119                         // loopywriter).  But flush out what's inside the buffer first.
1120                         t.framer.writer.Flush()
1121                         return false, fmt.Errorf("transport: Connection closing")
1122                 }
1123                 return true, nil
1124         }
1125         t.mu.Unlock()
1126         // For a graceful close, send out a GoAway with stream ID of MaxUInt32,
1127         // Follow that with a ping and wait for the ack to come back or a timer
1128         // to expire. During this time accept new streams since they might have
1129         // originated before the GoAway reaches the client.
1130         // After getting the ack or timer expiration send out another GoAway this
1131         // time with an ID of the max stream server intends to process.
1132         if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
1133                 return false, err
1134         }
1135         if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
1136                 return false, err
1137         }
1138         go func() {
1139                 timer := time.NewTimer(time.Minute)
1140                 defer timer.Stop()
1141                 select {
1142                 case <-t.drainChan:
1143                 case <-timer.C:
1144                 case <-t.ctx.Done():
1145                         return
1146                 }
1147                 t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
1148         }()
1149         return false, nil
1150 }
1151
1152 func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
1153         s := channelz.SocketInternalMetric{
1154                 StreamsStarted:                   atomic.LoadInt64(&t.czData.streamsStarted),
1155                 StreamsSucceeded:                 atomic.LoadInt64(&t.czData.streamsSucceeded),
1156                 StreamsFailed:                    atomic.LoadInt64(&t.czData.streamsFailed),
1157                 MessagesSent:                     atomic.LoadInt64(&t.czData.msgSent),
1158                 MessagesReceived:                 atomic.LoadInt64(&t.czData.msgRecv),
1159                 KeepAlivesSent:                   atomic.LoadInt64(&t.czData.kpCount),
1160                 LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
1161                 LastMessageSentTimestamp:         time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
1162                 LastMessageReceivedTimestamp:     time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
1163                 LocalFlowControlWindow:           int64(t.fc.getSize()),
1164                 SocketOptions:                    channelz.GetSocketOption(t.conn),
1165                 LocalAddr:                        t.localAddr,
1166                 RemoteAddr:                       t.remoteAddr,
1167                 // RemoteName :
1168         }
1169         if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
1170                 s.Security = au.GetSecurityValue()
1171         }
1172         s.RemoteFlowControlWindow = t.getOutFlowWindow()
1173         return &s
1174 }
1175
1176 func (t *http2Server) IncrMsgSent() {
1177         atomic.AddInt64(&t.czData.msgSent, 1)
1178         atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
1179 }
1180
1181 func (t *http2Server) IncrMsgRecv() {
1182         atomic.AddInt64(&t.czData.msgRecv, 1)
1183         atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
1184 }
1185
1186 func (t *http2Server) getOutFlowWindow() int64 {
1187         resp := make(chan uint32, 1)
1188         timer := time.NewTimer(time.Second)
1189         defer timer.Stop()
1190         t.controlBuf.put(&outFlowControlSizeRequest{resp})
1191         select {
1192         case sz := <-resp:
1193                 return int64(sz)
1194         case <-t.ctxDone:
1195                 return -1
1196         case <-timer.C:
1197                 return -2
1198         }
1199 }
1200
1201 func getJitter(v time.Duration) time.Duration {
1202         if v == infinity {
1203                 return 0
1204         }
1205         // Generate a jitter between +/- 10% of the value.
1206         r := int64(v / 10)
1207         j := grpcrand.Int63n(2*r) - r
1208         return time.Duration(j)
1209 }