3 * Copyright 2014 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
27 "golang.org/x/net/http2"
28 "golang.org/x/net/http2/hpack"
31 var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
32 e.SetMaxDynamicTableSizeLimit(v)
35 type itemNode struct {
40 type itemList struct {
45 func (il *itemList) enqueue(i interface{}) {
48 il.head, il.tail = n, n
55 // peek returns the first item in the list without removing it from the
57 func (il *itemList) peek() interface{} {
61 func (il *itemList) dequeue() interface{} {
66 il.head = il.head.next
73 func (il *itemList) dequeueAll() *itemNode {
75 il.head, il.tail = nil, nil
79 func (il *itemList) isEmpty() bool {
83 // The following defines various control items which could flow through
84 // the control buffer of transport. They represent different aspects of
85 // control tasks, e.g., flow control, settings, streaming resetting, etc.
87 // registerStream is used to register an incoming stream with loopy writer.
88 type registerStream struct {
93 // headerFrame is also used to register stream on the client-side.
94 type headerFrame struct {
96 hf []hpack.HeaderField
97 endStream bool // Valid on server side.
98 initStream func(uint32) (bool, error) // Used only on the client side.
100 wq *writeQuota // write quota for the stream created.
101 cleanup *cleanupStream // Valid on the server side.
102 onOrphaned func(error) // Valid on client-side
105 type cleanupStream struct {
108 rstCode http2.ErrCode
112 type dataFrame struct {
117 // onEachWrite is called every time
118 // a part of d is written out.
122 type incomingWindowUpdate struct {
127 type outgoingWindowUpdate struct {
132 type incomingSettings struct {
136 type outgoingSettings struct {
140 type incomingGoAway struct {
155 type outFlowControlSizeRequest struct {
159 type outStreamState int
162 active outStreamState = iota
167 type outStream struct {
178 func (s *outStream) deleteSelf() {
185 s.next, s.prev = nil, nil
188 type outStreamList struct {
189 // Following are sentinel objects that mark the
190 // beginning and end of the list. They do not
191 // contain any item lists. All valid objects are
192 // inserted in between them.
193 // This is needed so that an outStream object can
194 // deleteSelf() in O(1) time without knowing which
195 // list it belongs to.
200 func newOutStreamList() *outStreamList {
201 head, tail := new(outStream), new(outStream)
204 return &outStreamList{
210 func (l *outStreamList) enqueue(s *outStream) {
218 // remove from the beginning of the list.
219 func (l *outStreamList) dequeue() *outStream {
228 // controlBuffer is a way to pass information to loopy.
229 // Information is passed as specific struct types called control frames.
230 // A control frame not only represents data, messages or headers to be sent out
231 // but can also be used to instruct loopy to update its internal state.
232 // It shouldn't be confused with an HTTP2 frame, although some of the control frames
233 // like dataFrame and headerFrame do go out on wire as HTTP2 frames.
234 type controlBuffer struct {
243 func newControlBuffer(done <-chan struct{}) *controlBuffer {
244 return &controlBuffer{
245 ch: make(chan struct{}, 1),
251 func (c *controlBuffer) put(it interface{}) error {
252 _, err := c.executeAndPut(nil, it)
256 func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it interface{}) (bool, error) {
264 if !f(it) { // f wasn't successful
269 if c.consumerWaiting {
271 c.consumerWaiting = false
277 case c.ch <- struct{}{}:
284 // Note argument f should never be nil.
285 func (c *controlBuffer) execute(f func(it interface{}) bool, it interface{}) (bool, error) {
291 if !f(it) { // f wasn't successful
299 func (c *controlBuffer) get(block bool) (interface{}, error) {
306 if !c.list.isEmpty() {
307 h := c.list.dequeue()
315 c.consumerWaiting = true
321 return nil, ErrConnClosing
326 func (c *controlBuffer) finish() {
332 c.err = ErrConnClosing
333 // There may be headers for streams in the control buffer.
334 // These streams need to be cleaned out since the transport
335 // is still not aware of these yet.
336 for head := c.list.dequeueAll(); head != nil; head = head.next {
337 hdr, ok := head.it.(*headerFrame)
341 if hdr.onOrphaned != nil { // It will be nil on the server-side.
342 hdr.onOrphaned(ErrConnClosing)
351 clientSide side = iota
355 // Loopy receives frames from the control buffer.
356 // Each frame is handled individually; most of the work done by loopy goes
357 // into handling data frames. Loopy maintains a queue of active streams, and each
358 // stream maintains a queue of data frames; as loopy receives data frames
359 // it gets added to the queue of the relevant stream.
360 // Loopy goes over this list of active streams by processing one node every iteration,
361 // thereby closely resemebling to a round-robin scheduling over all streams. While
362 // processing a stream, loopy writes out data bytes from this stream capped by the min
363 // of http2MaxFrameLen, connection-level flow control and stream-level flow control.
364 type loopyWriter struct {
368 oiws uint32 // outbound initial window size.
369 // estdStreams is map of all established streams that are not cleaned-up yet.
370 // On client-side, this is all streams whose headers were sent out.
371 // On server-side, this is all streams whose headers were received.
372 estdStreams map[uint32]*outStream // Established streams.
373 // activeStreams is a linked-list of all streams that have data to send and some
374 // stream-level flow control quota.
375 // Each of these streams internally have a list of data items(and perhaps trailers
376 // on the server-side) to be sent out.
377 activeStreams *outStreamList
379 hBuf *bytes.Buffer // The buffer for HPACK encoding.
380 hEnc *hpack.Encoder // HPACK encoder.
384 // Side-specific handlers
385 ssGoAwayHandler func(*goAway) (bool, error)
388 func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator) *loopyWriter {
393 sendQuota: defaultWindowSize,
394 oiws: defaultWindowSize,
395 estdStreams: make(map[uint32]*outStream),
396 activeStreams: newOutStreamList(),
399 hEnc: hpack.NewEncoder(&buf),
405 const minBatchSize = 1000
407 // run should be run in a separate goroutine.
408 // It reads control frames from controlBuf and processes them by:
409 // 1. Updating loopy's internal state, or/and
410 // 2. Writing out HTTP2 frames on the wire.
412 // Loopy keeps all active streams with data to send in a linked-list.
413 // All streams in the activeStreams linked-list must have both:
414 // 1. Data to send, and
415 // 2. Stream level flow control quota available.
417 // In each iteration of run loop, other than processing the incoming control
418 // frame, loopy calls processData, which processes one node from the activeStreams linked-list.
419 // This results in writing of HTTP2 frames into an underlying write buffer.
420 // When there's no more control frames to read from controlBuf, loopy flushes the write buffer.
421 // As an optimization, to increase the batch size for each flush, loopy yields the processor, once
422 // if the batch size is too low to give stream goroutines a chance to fill it up.
423 func (l *loopyWriter) run() (err error) {
425 if err == ErrConnClosing {
426 // Don't log ErrConnClosing as error since it happens
427 // 1. When the connection is closed by some other known issue.
428 // 2. User closed the connection.
429 // 3. A graceful close of connection.
430 infof("transport: loopyWriter.run returning. %v", err)
435 it, err := l.cbuf.get(true)
439 if err = l.handle(it); err != nil {
442 if _, err = l.processData(); err != nil {
448 it, err := l.cbuf.get(false)
453 if err = l.handle(it); err != nil {
456 if _, err = l.processData(); err != nil {
461 isEmpty, err := l.processData()
470 if l.framer.writer.offset < minBatchSize {
475 l.framer.writer.Flush()
482 func (l *loopyWriter) outgoingWindowUpdateHandler(w *outgoingWindowUpdate) error {
483 return l.framer.fr.WriteWindowUpdate(w.streamID, w.increment)
486 func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) error {
487 // Otherwise update the quota.
489 l.sendQuota += w.increment
492 // Find the stream and update it.
493 if str, ok := l.estdStreams[w.streamID]; ok {
494 str.bytesOutStanding -= int(w.increment)
495 if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota {
497 l.activeStreams.enqueue(str)
504 func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error {
505 return l.framer.fr.WriteSettings(s.ss...)
508 func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
509 if err := l.applySettings(s.ss); err != nil {
512 return l.framer.fr.WriteSettingsAck()
515 func (l *loopyWriter) registerStreamHandler(h *registerStream) error {
522 l.estdStreams[h.streamID] = str
526 func (l *loopyWriter) headerHandler(h *headerFrame) error {
527 if l.side == serverSide {
528 str, ok := l.estdStreams[h.streamID]
530 warningf("transport: loopy doesn't recognize the stream: %d", h.streamID)
533 // Case 1.A: Server is responding back with headers.
535 return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite)
537 // else: Case 1.B: Server wants to close stream.
539 if str.state != empty { // either active or waiting on stream quota.
540 // add it str's list of items.
544 if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil {
547 return l.cleanupStreamHandler(h.cleanup)
549 // Case 2: Client wants to originate stream.
557 return l.originateStream(str)
560 func (l *loopyWriter) originateStream(str *outStream) error {
561 hdr := str.itl.dequeue().(*headerFrame)
562 sendPing, err := hdr.initStream(str.id)
564 if err == ErrConnClosing {
567 // Other errors(errStreamDrain) need not close transport.
570 if err = l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
573 l.estdStreams[str.id] = str
575 return l.pingHandler(&ping{data: [8]byte{}})
580 func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.HeaderField, onWrite func()) error {
585 for _, f := range hf {
586 if err := l.hEnc.WriteField(f); err != nil {
587 warningf("transport: loopyWriter.writeHeader encountered error while encoding headers:", err)
592 endHeaders, first bool
597 if size > http2MaxFrameLen {
598 size = http2MaxFrameLen
604 err = l.framer.fr.WriteHeaders(http2.HeadersFrameParam{
606 BlockFragment: l.hBuf.Next(size),
607 EndStream: endStream,
608 EndHeaders: endHeaders,
611 err = l.framer.fr.WriteContinuation(
624 func (l *loopyWriter) preprocessData(df *dataFrame) error {
625 str, ok := l.estdStreams[df.streamID]
629 // If we got data for a stream it means that
630 // stream was originated and the headers were sent out.
632 if str.state == empty {
634 l.activeStreams.enqueue(str)
639 func (l *loopyWriter) pingHandler(p *ping) error {
641 l.bdpEst.timesnap(p.data)
643 return l.framer.fr.WritePing(p.ack, p.data)
647 func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) error {
648 o.resp <- l.sendQuota
652 func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
654 if str, ok := l.estdStreams[c.streamID]; ok {
655 // On the server side it could be a trailers-only response or
656 // a RST_STREAM before stream initialization thus the stream might
657 // not be established yet.
658 delete(l.estdStreams, c.streamID)
661 if c.rst { // If RST_STREAM needs to be sent.
662 if err := l.framer.fr.WriteRSTStream(c.streamID, c.rstCode); err != nil {
666 if l.side == clientSide && l.draining && len(l.estdStreams) == 0 {
667 return ErrConnClosing
672 func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
673 if l.side == clientSide {
675 if len(l.estdStreams) == 0 {
676 return ErrConnClosing
682 func (l *loopyWriter) goAwayHandler(g *goAway) error {
683 // Handling of outgoing GoAway is very specific to side.
684 if l.ssGoAwayHandler != nil {
685 draining, err := l.ssGoAwayHandler(g)
689 l.draining = draining
694 func (l *loopyWriter) handle(i interface{}) error {
695 switch i := i.(type) {
696 case *incomingWindowUpdate:
697 return l.incomingWindowUpdateHandler(i)
698 case *outgoingWindowUpdate:
699 return l.outgoingWindowUpdateHandler(i)
700 case *incomingSettings:
701 return l.incomingSettingsHandler(i)
702 case *outgoingSettings:
703 return l.outgoingSettingsHandler(i)
705 return l.headerHandler(i)
706 case *registerStream:
707 return l.registerStreamHandler(i)
709 return l.cleanupStreamHandler(i)
710 case *incomingGoAway:
711 return l.incomingGoAwayHandler(i)
713 return l.preprocessData(i)
715 return l.pingHandler(i)
717 return l.goAwayHandler(i)
718 case *outFlowControlSizeRequest:
719 return l.outFlowControlSizeRequestHandler(i)
721 return fmt.Errorf("transport: unknown control message type %T", i)
725 func (l *loopyWriter) applySettings(ss []http2.Setting) error {
726 for _, s := range ss {
728 case http2.SettingInitialWindowSize:
732 // If the new limit is greater make all depleted streams active.
733 for _, stream := range l.estdStreams {
734 if stream.state == waitingOnStreamQuota {
735 stream.state = active
736 l.activeStreams.enqueue(stream)
740 case http2.SettingHeaderTableSize:
741 updateHeaderTblSize(l.hEnc, s.Val)
747 // processData removes the first stream from active streams, writes out at most 16KB
748 // of its data and then puts it at the end of activeStreams if there's still more data
749 // to be sent and stream has some stream-level flow control.
750 func (l *loopyWriter) processData() (bool, error) {
751 if l.sendQuota == 0 {
754 str := l.activeStreams.dequeue() // Remove the first stream.
758 dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
759 // A data item is represented by a dataFrame, since it later translates into
760 // multiple HTTP2 data frames.
761 // Every dataFrame has two buffers; h that keeps grpc-message header and d that is acutal data.
762 // As an optimization to keep wire traffic low, data from d is copied to h to make as big as the
763 // maximum possilbe HTTP2 frame size.
765 if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame
766 // Client sends out empty data frame with endStream = true
767 if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
770 str.itl.dequeue() // remove the empty data item from stream
771 if str.itl.isEmpty() {
773 } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
774 if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
777 if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
781 l.activeStreams.enqueue(str)
789 if len(dataItem.h) != 0 { // data header has not been written out yet.
795 size := http2MaxFrameLen
799 if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control.
800 str.state = waitingOnStreamQuota
802 } else if strQuota < size {
806 if l.sendQuota < uint32(size) { // connection-level flow control.
807 size = int(l.sendQuota)
809 // Now that outgoing flow controls are checked we can replenish str's write quota
810 str.wq.replenish(size)
812 // If this is the last data message on this stream and all of it can be written in this iteration.
813 if dataItem.endStream && size == len(buf) {
814 // buf contains either data or it contains header but data is empty.
815 if idx == 1 || len(dataItem.d) == 0 {
819 if dataItem.onEachWrite != nil {
820 dataItem.onEachWrite()
822 if err := l.framer.fr.WriteData(dataItem.streamID, endStream, buf[:size]); err != nil {
826 str.bytesOutStanding += size
827 l.sendQuota -= uint32(size)
834 if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out.
837 if str.itl.isEmpty() {
839 } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // The next item is trailers.
840 if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
843 if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
846 } else if int(l.oiws)-str.bytesOutStanding <= 0 { // Ran out of stream quota.
847 str.state = waitingOnStreamQuota
848 } else { // Otherwise add it back to the list of active streams.
849 l.activeStreams.enqueue(str)