Code refactoring for bpa operator
[icn.git] / cmd / bpa-operator / vendor / google.golang.org / grpc / internal / transport / controlbuf.go
1 /*
2  *
3  * Copyright 2014 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 package transport
20
21 import (
22         "bytes"
23         "fmt"
24         "runtime"
25         "sync"
26
27         "golang.org/x/net/http2"
28         "golang.org/x/net/http2/hpack"
29 )
30
31 var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
32         e.SetMaxDynamicTableSizeLimit(v)
33 }
34
35 type itemNode struct {
36         it   interface{}
37         next *itemNode
38 }
39
40 type itemList struct {
41         head *itemNode
42         tail *itemNode
43 }
44
45 func (il *itemList) enqueue(i interface{}) {
46         n := &itemNode{it: i}
47         if il.tail == nil {
48                 il.head, il.tail = n, n
49                 return
50         }
51         il.tail.next = n
52         il.tail = n
53 }
54
55 // peek returns the first item in the list without removing it from the
56 // list.
57 func (il *itemList) peek() interface{} {
58         return il.head.it
59 }
60
61 func (il *itemList) dequeue() interface{} {
62         if il.head == nil {
63                 return nil
64         }
65         i := il.head.it
66         il.head = il.head.next
67         if il.head == nil {
68                 il.tail = nil
69         }
70         return i
71 }
72
73 func (il *itemList) dequeueAll() *itemNode {
74         h := il.head
75         il.head, il.tail = nil, nil
76         return h
77 }
78
79 func (il *itemList) isEmpty() bool {
80         return il.head == nil
81 }
82
83 // The following defines various control items which could flow through
84 // the control buffer of transport. They represent different aspects of
85 // control tasks, e.g., flow control, settings, streaming resetting, etc.
86
87 // registerStream is used to register an incoming stream with loopy writer.
88 type registerStream struct {
89         streamID uint32
90         wq       *writeQuota
91 }
92
93 // headerFrame is also used to register stream on the client-side.
94 type headerFrame struct {
95         streamID   uint32
96         hf         []hpack.HeaderField
97         endStream  bool                       // Valid on server side.
98         initStream func(uint32) (bool, error) // Used only on the client side.
99         onWrite    func()
100         wq         *writeQuota    // write quota for the stream created.
101         cleanup    *cleanupStream // Valid on the server side.
102         onOrphaned func(error)    // Valid on client-side
103 }
104
105 type cleanupStream struct {
106         streamID uint32
107         rst      bool
108         rstCode  http2.ErrCode
109         onWrite  func()
110 }
111
112 type dataFrame struct {
113         streamID  uint32
114         endStream bool
115         h         []byte
116         d         []byte
117         // onEachWrite is called every time
118         // a part of d is written out.
119         onEachWrite func()
120 }
121
122 type incomingWindowUpdate struct {
123         streamID  uint32
124         increment uint32
125 }
126
127 type outgoingWindowUpdate struct {
128         streamID  uint32
129         increment uint32
130 }
131
132 type incomingSettings struct {
133         ss []http2.Setting
134 }
135
136 type outgoingSettings struct {
137         ss []http2.Setting
138 }
139
140 type incomingGoAway struct {
141 }
142
143 type goAway struct {
144         code      http2.ErrCode
145         debugData []byte
146         headsUp   bool
147         closeConn bool
148 }
149
150 type ping struct {
151         ack  bool
152         data [8]byte
153 }
154
155 type outFlowControlSizeRequest struct {
156         resp chan uint32
157 }
158
159 type outStreamState int
160
161 const (
162         active outStreamState = iota
163         empty
164         waitingOnStreamQuota
165 )
166
167 type outStream struct {
168         id               uint32
169         state            outStreamState
170         itl              *itemList
171         bytesOutStanding int
172         wq               *writeQuota
173
174         next *outStream
175         prev *outStream
176 }
177
178 func (s *outStream) deleteSelf() {
179         if s.prev != nil {
180                 s.prev.next = s.next
181         }
182         if s.next != nil {
183                 s.next.prev = s.prev
184         }
185         s.next, s.prev = nil, nil
186 }
187
188 type outStreamList struct {
189         // Following are sentinel objects that mark the
190         // beginning and end of the list. They do not
191         // contain any item lists. All valid objects are
192         // inserted in between them.
193         // This is needed so that an outStream object can
194         // deleteSelf() in O(1) time without knowing which
195         // list it belongs to.
196         head *outStream
197         tail *outStream
198 }
199
200 func newOutStreamList() *outStreamList {
201         head, tail := new(outStream), new(outStream)
202         head.next = tail
203         tail.prev = head
204         return &outStreamList{
205                 head: head,
206                 tail: tail,
207         }
208 }
209
210 func (l *outStreamList) enqueue(s *outStream) {
211         e := l.tail.prev
212         e.next = s
213         s.prev = e
214         s.next = l.tail
215         l.tail.prev = s
216 }
217
218 // remove from the beginning of the list.
219 func (l *outStreamList) dequeue() *outStream {
220         b := l.head.next
221         if b == l.tail {
222                 return nil
223         }
224         b.deleteSelf()
225         return b
226 }
227
228 // controlBuffer is a way to pass information to loopy.
229 // Information is passed as specific struct types called control frames.
230 // A control frame not only represents data, messages or headers to be sent out
231 // but can also be used to instruct loopy to update its internal state.
232 // It shouldn't be confused with an HTTP2 frame, although some of the control frames
233 // like dataFrame and headerFrame do go out on wire as HTTP2 frames.
234 type controlBuffer struct {
235         ch              chan struct{}
236         done            <-chan struct{}
237         mu              sync.Mutex
238         consumerWaiting bool
239         list            *itemList
240         err             error
241 }
242
243 func newControlBuffer(done <-chan struct{}) *controlBuffer {
244         return &controlBuffer{
245                 ch:   make(chan struct{}, 1),
246                 list: &itemList{},
247                 done: done,
248         }
249 }
250
251 func (c *controlBuffer) put(it interface{}) error {
252         _, err := c.executeAndPut(nil, it)
253         return err
254 }
255
256 func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it interface{}) (bool, error) {
257         var wakeUp bool
258         c.mu.Lock()
259         if c.err != nil {
260                 c.mu.Unlock()
261                 return false, c.err
262         }
263         if f != nil {
264                 if !f(it) { // f wasn't successful
265                         c.mu.Unlock()
266                         return false, nil
267                 }
268         }
269         if c.consumerWaiting {
270                 wakeUp = true
271                 c.consumerWaiting = false
272         }
273         c.list.enqueue(it)
274         c.mu.Unlock()
275         if wakeUp {
276                 select {
277                 case c.ch <- struct{}{}:
278                 default:
279                 }
280         }
281         return true, nil
282 }
283
284 // Note argument f should never be nil.
285 func (c *controlBuffer) execute(f func(it interface{}) bool, it interface{}) (bool, error) {
286         c.mu.Lock()
287         if c.err != nil {
288                 c.mu.Unlock()
289                 return false, c.err
290         }
291         if !f(it) { // f wasn't successful
292                 c.mu.Unlock()
293                 return false, nil
294         }
295         c.mu.Unlock()
296         return true, nil
297 }
298
299 func (c *controlBuffer) get(block bool) (interface{}, error) {
300         for {
301                 c.mu.Lock()
302                 if c.err != nil {
303                         c.mu.Unlock()
304                         return nil, c.err
305                 }
306                 if !c.list.isEmpty() {
307                         h := c.list.dequeue()
308                         c.mu.Unlock()
309                         return h, nil
310                 }
311                 if !block {
312                         c.mu.Unlock()
313                         return nil, nil
314                 }
315                 c.consumerWaiting = true
316                 c.mu.Unlock()
317                 select {
318                 case <-c.ch:
319                 case <-c.done:
320                         c.finish()
321                         return nil, ErrConnClosing
322                 }
323         }
324 }
325
326 func (c *controlBuffer) finish() {
327         c.mu.Lock()
328         if c.err != nil {
329                 c.mu.Unlock()
330                 return
331         }
332         c.err = ErrConnClosing
333         // There may be headers for streams in the control buffer.
334         // These streams need to be cleaned out since the transport
335         // is still not aware of these yet.
336         for head := c.list.dequeueAll(); head != nil; head = head.next {
337                 hdr, ok := head.it.(*headerFrame)
338                 if !ok {
339                         continue
340                 }
341                 if hdr.onOrphaned != nil { // It will be nil on the server-side.
342                         hdr.onOrphaned(ErrConnClosing)
343                 }
344         }
345         c.mu.Unlock()
346 }
347
348 type side int
349
350 const (
351         clientSide side = iota
352         serverSide
353 )
354
355 // Loopy receives frames from the control buffer.
356 // Each frame is handled individually; most of the work done by loopy goes
357 // into handling data frames. Loopy maintains a queue of active streams, and each
358 // stream maintains a queue of data frames; as loopy receives data frames
359 // it gets added to the queue of the relevant stream.
360 // Loopy goes over this list of active streams by processing one node every iteration,
361 // thereby closely resemebling to a round-robin scheduling over all streams. While
362 // processing a stream, loopy writes out data bytes from this stream capped by the min
363 // of http2MaxFrameLen, connection-level flow control and stream-level flow control.
364 type loopyWriter struct {
365         side      side
366         cbuf      *controlBuffer
367         sendQuota uint32
368         oiws      uint32 // outbound initial window size.
369         // estdStreams is map of all established streams that are not cleaned-up yet.
370         // On client-side, this is all streams whose headers were sent out.
371         // On server-side, this is all streams whose headers were received.
372         estdStreams map[uint32]*outStream // Established streams.
373         // activeStreams is a linked-list of all streams that have data to send and some
374         // stream-level flow control quota.
375         // Each of these streams internally have a list of data items(and perhaps trailers
376         // on the server-side) to be sent out.
377         activeStreams *outStreamList
378         framer        *framer
379         hBuf          *bytes.Buffer  // The buffer for HPACK encoding.
380         hEnc          *hpack.Encoder // HPACK encoder.
381         bdpEst        *bdpEstimator
382         draining      bool
383
384         // Side-specific handlers
385         ssGoAwayHandler func(*goAway) (bool, error)
386 }
387
388 func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator) *loopyWriter {
389         var buf bytes.Buffer
390         l := &loopyWriter{
391                 side:          s,
392                 cbuf:          cbuf,
393                 sendQuota:     defaultWindowSize,
394                 oiws:          defaultWindowSize,
395                 estdStreams:   make(map[uint32]*outStream),
396                 activeStreams: newOutStreamList(),
397                 framer:        fr,
398                 hBuf:          &buf,
399                 hEnc:          hpack.NewEncoder(&buf),
400                 bdpEst:        bdpEst,
401         }
402         return l
403 }
404
405 const minBatchSize = 1000
406
407 // run should be run in a separate goroutine.
408 // It reads control frames from controlBuf and processes them by:
409 // 1. Updating loopy's internal state, or/and
410 // 2. Writing out HTTP2 frames on the wire.
411 //
412 // Loopy keeps all active streams with data to send in a linked-list.
413 // All streams in the activeStreams linked-list must have both:
414 // 1. Data to send, and
415 // 2. Stream level flow control quota available.
416 //
417 // In each iteration of run loop, other than processing the incoming control
418 // frame, loopy calls processData, which processes one node from the activeStreams linked-list.
419 // This results in writing of HTTP2 frames into an underlying write buffer.
420 // When there's no more control frames to read from controlBuf, loopy flushes the write buffer.
421 // As an optimization, to increase the batch size for each flush, loopy yields the processor, once
422 // if the batch size is too low to give stream goroutines a chance to fill it up.
423 func (l *loopyWriter) run() (err error) {
424         defer func() {
425                 if err == ErrConnClosing {
426                         // Don't log ErrConnClosing as error since it happens
427                         // 1. When the connection is closed by some other known issue.
428                         // 2. User closed the connection.
429                         // 3. A graceful close of connection.
430                         infof("transport: loopyWriter.run returning. %v", err)
431                         err = nil
432                 }
433         }()
434         for {
435                 it, err := l.cbuf.get(true)
436                 if err != nil {
437                         return err
438                 }
439                 if err = l.handle(it); err != nil {
440                         return err
441                 }
442                 if _, err = l.processData(); err != nil {
443                         return err
444                 }
445                 gosched := true
446         hasdata:
447                 for {
448                         it, err := l.cbuf.get(false)
449                         if err != nil {
450                                 return err
451                         }
452                         if it != nil {
453                                 if err = l.handle(it); err != nil {
454                                         return err
455                                 }
456                                 if _, err = l.processData(); err != nil {
457                                         return err
458                                 }
459                                 continue hasdata
460                         }
461                         isEmpty, err := l.processData()
462                         if err != nil {
463                                 return err
464                         }
465                         if !isEmpty {
466                                 continue hasdata
467                         }
468                         if gosched {
469                                 gosched = false
470                                 if l.framer.writer.offset < minBatchSize {
471                                         runtime.Gosched()
472                                         continue hasdata
473                                 }
474                         }
475                         l.framer.writer.Flush()
476                         break hasdata
477
478                 }
479         }
480 }
481
482 func (l *loopyWriter) outgoingWindowUpdateHandler(w *outgoingWindowUpdate) error {
483         return l.framer.fr.WriteWindowUpdate(w.streamID, w.increment)
484 }
485
486 func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) error {
487         // Otherwise update the quota.
488         if w.streamID == 0 {
489                 l.sendQuota += w.increment
490                 return nil
491         }
492         // Find the stream and update it.
493         if str, ok := l.estdStreams[w.streamID]; ok {
494                 str.bytesOutStanding -= int(w.increment)
495                 if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota {
496                         str.state = active
497                         l.activeStreams.enqueue(str)
498                         return nil
499                 }
500         }
501         return nil
502 }
503
504 func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error {
505         return l.framer.fr.WriteSettings(s.ss...)
506 }
507
508 func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
509         if err := l.applySettings(s.ss); err != nil {
510                 return err
511         }
512         return l.framer.fr.WriteSettingsAck()
513 }
514
515 func (l *loopyWriter) registerStreamHandler(h *registerStream) error {
516         str := &outStream{
517                 id:    h.streamID,
518                 state: empty,
519                 itl:   &itemList{},
520                 wq:    h.wq,
521         }
522         l.estdStreams[h.streamID] = str
523         return nil
524 }
525
526 func (l *loopyWriter) headerHandler(h *headerFrame) error {
527         if l.side == serverSide {
528                 str, ok := l.estdStreams[h.streamID]
529                 if !ok {
530                         warningf("transport: loopy doesn't recognize the stream: %d", h.streamID)
531                         return nil
532                 }
533                 // Case 1.A: Server is responding back with headers.
534                 if !h.endStream {
535                         return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite)
536                 }
537                 // else:  Case 1.B: Server wants to close stream.
538
539                 if str.state != empty { // either active or waiting on stream quota.
540                         // add it str's list of items.
541                         str.itl.enqueue(h)
542                         return nil
543                 }
544                 if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil {
545                         return err
546                 }
547                 return l.cleanupStreamHandler(h.cleanup)
548         }
549         // Case 2: Client wants to originate stream.
550         str := &outStream{
551                 id:    h.streamID,
552                 state: empty,
553                 itl:   &itemList{},
554                 wq:    h.wq,
555         }
556         str.itl.enqueue(h)
557         return l.originateStream(str)
558 }
559
560 func (l *loopyWriter) originateStream(str *outStream) error {
561         hdr := str.itl.dequeue().(*headerFrame)
562         sendPing, err := hdr.initStream(str.id)
563         if err != nil {
564                 if err == ErrConnClosing {
565                         return err
566                 }
567                 // Other errors(errStreamDrain) need not close transport.
568                 return nil
569         }
570         if err = l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
571                 return err
572         }
573         l.estdStreams[str.id] = str
574         if sendPing {
575                 return l.pingHandler(&ping{data: [8]byte{}})
576         }
577         return nil
578 }
579
580 func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.HeaderField, onWrite func()) error {
581         if onWrite != nil {
582                 onWrite()
583         }
584         l.hBuf.Reset()
585         for _, f := range hf {
586                 if err := l.hEnc.WriteField(f); err != nil {
587                         warningf("transport: loopyWriter.writeHeader encountered error while encoding headers:", err)
588                 }
589         }
590         var (
591                 err               error
592                 endHeaders, first bool
593         )
594         first = true
595         for !endHeaders {
596                 size := l.hBuf.Len()
597                 if size > http2MaxFrameLen {
598                         size = http2MaxFrameLen
599                 } else {
600                         endHeaders = true
601                 }
602                 if first {
603                         first = false
604                         err = l.framer.fr.WriteHeaders(http2.HeadersFrameParam{
605                                 StreamID:      streamID,
606                                 BlockFragment: l.hBuf.Next(size),
607                                 EndStream:     endStream,
608                                 EndHeaders:    endHeaders,
609                         })
610                 } else {
611                         err = l.framer.fr.WriteContinuation(
612                                 streamID,
613                                 endHeaders,
614                                 l.hBuf.Next(size),
615                         )
616                 }
617                 if err != nil {
618                         return err
619                 }
620         }
621         return nil
622 }
623
624 func (l *loopyWriter) preprocessData(df *dataFrame) error {
625         str, ok := l.estdStreams[df.streamID]
626         if !ok {
627                 return nil
628         }
629         // If we got data for a stream it means that
630         // stream was originated and the headers were sent out.
631         str.itl.enqueue(df)
632         if str.state == empty {
633                 str.state = active
634                 l.activeStreams.enqueue(str)
635         }
636         return nil
637 }
638
639 func (l *loopyWriter) pingHandler(p *ping) error {
640         if !p.ack {
641                 l.bdpEst.timesnap(p.data)
642         }
643         return l.framer.fr.WritePing(p.ack, p.data)
644
645 }
646
647 func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) error {
648         o.resp <- l.sendQuota
649         return nil
650 }
651
652 func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
653         c.onWrite()
654         if str, ok := l.estdStreams[c.streamID]; ok {
655                 // On the server side it could be a trailers-only response or
656                 // a RST_STREAM before stream initialization thus the stream might
657                 // not be established yet.
658                 delete(l.estdStreams, c.streamID)
659                 str.deleteSelf()
660         }
661         if c.rst { // If RST_STREAM needs to be sent.
662                 if err := l.framer.fr.WriteRSTStream(c.streamID, c.rstCode); err != nil {
663                         return err
664                 }
665         }
666         if l.side == clientSide && l.draining && len(l.estdStreams) == 0 {
667                 return ErrConnClosing
668         }
669         return nil
670 }
671
672 func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
673         if l.side == clientSide {
674                 l.draining = true
675                 if len(l.estdStreams) == 0 {
676                         return ErrConnClosing
677                 }
678         }
679         return nil
680 }
681
682 func (l *loopyWriter) goAwayHandler(g *goAway) error {
683         // Handling of outgoing GoAway is very specific to side.
684         if l.ssGoAwayHandler != nil {
685                 draining, err := l.ssGoAwayHandler(g)
686                 if err != nil {
687                         return err
688                 }
689                 l.draining = draining
690         }
691         return nil
692 }
693
694 func (l *loopyWriter) handle(i interface{}) error {
695         switch i := i.(type) {
696         case *incomingWindowUpdate:
697                 return l.incomingWindowUpdateHandler(i)
698         case *outgoingWindowUpdate:
699                 return l.outgoingWindowUpdateHandler(i)
700         case *incomingSettings:
701                 return l.incomingSettingsHandler(i)
702         case *outgoingSettings:
703                 return l.outgoingSettingsHandler(i)
704         case *headerFrame:
705                 return l.headerHandler(i)
706         case *registerStream:
707                 return l.registerStreamHandler(i)
708         case *cleanupStream:
709                 return l.cleanupStreamHandler(i)
710         case *incomingGoAway:
711                 return l.incomingGoAwayHandler(i)
712         case *dataFrame:
713                 return l.preprocessData(i)
714         case *ping:
715                 return l.pingHandler(i)
716         case *goAway:
717                 return l.goAwayHandler(i)
718         case *outFlowControlSizeRequest:
719                 return l.outFlowControlSizeRequestHandler(i)
720         default:
721                 return fmt.Errorf("transport: unknown control message type %T", i)
722         }
723 }
724
725 func (l *loopyWriter) applySettings(ss []http2.Setting) error {
726         for _, s := range ss {
727                 switch s.ID {
728                 case http2.SettingInitialWindowSize:
729                         o := l.oiws
730                         l.oiws = s.Val
731                         if o < l.oiws {
732                                 // If the new limit is greater make all depleted streams active.
733                                 for _, stream := range l.estdStreams {
734                                         if stream.state == waitingOnStreamQuota {
735                                                 stream.state = active
736                                                 l.activeStreams.enqueue(stream)
737                                         }
738                                 }
739                         }
740                 case http2.SettingHeaderTableSize:
741                         updateHeaderTblSize(l.hEnc, s.Val)
742                 }
743         }
744         return nil
745 }
746
747 // processData removes the first stream from active streams, writes out at most 16KB
748 // of its data and then puts it at the end of activeStreams if there's still more data
749 // to be sent and stream has some stream-level flow control.
750 func (l *loopyWriter) processData() (bool, error) {
751         if l.sendQuota == 0 {
752                 return true, nil
753         }
754         str := l.activeStreams.dequeue() // Remove the first stream.
755         if str == nil {
756                 return true, nil
757         }
758         dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
759         // A data item is represented by a dataFrame, since it later translates into
760         // multiple HTTP2 data frames.
761         // Every dataFrame has two buffers; h that keeps grpc-message header and d that is acutal data.
762         // As an optimization to keep wire traffic low, data from d is copied to h to make as big as the
763         // maximum possilbe HTTP2 frame size.
764
765         if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame
766                 // Client sends out empty data frame with endStream = true
767                 if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
768                         return false, err
769                 }
770                 str.itl.dequeue() // remove the empty data item from stream
771                 if str.itl.isEmpty() {
772                         str.state = empty
773                 } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
774                         if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
775                                 return false, err
776                         }
777                         if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
778                                 return false, nil
779                         }
780                 } else {
781                         l.activeStreams.enqueue(str)
782                 }
783                 return false, nil
784         }
785         var (
786                 idx int
787                 buf []byte
788         )
789         if len(dataItem.h) != 0 { // data header has not been written out yet.
790                 buf = dataItem.h
791         } else {
792                 idx = 1
793                 buf = dataItem.d
794         }
795         size := http2MaxFrameLen
796         if len(buf) < size {
797                 size = len(buf)
798         }
799         if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control.
800                 str.state = waitingOnStreamQuota
801                 return false, nil
802         } else if strQuota < size {
803                 size = strQuota
804         }
805
806         if l.sendQuota < uint32(size) { // connection-level flow control.
807                 size = int(l.sendQuota)
808         }
809         // Now that outgoing flow controls are checked we can replenish str's write quota
810         str.wq.replenish(size)
811         var endStream bool
812         // If this is the last data message on this stream and all of it can be written in this iteration.
813         if dataItem.endStream && size == len(buf) {
814                 // buf contains either data or it contains header but data is empty.
815                 if idx == 1 || len(dataItem.d) == 0 {
816                         endStream = true
817                 }
818         }
819         if dataItem.onEachWrite != nil {
820                 dataItem.onEachWrite()
821         }
822         if err := l.framer.fr.WriteData(dataItem.streamID, endStream, buf[:size]); err != nil {
823                 return false, err
824         }
825         buf = buf[size:]
826         str.bytesOutStanding += size
827         l.sendQuota -= uint32(size)
828         if idx == 0 {
829                 dataItem.h = buf
830         } else {
831                 dataItem.d = buf
832         }
833
834         if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out.
835                 str.itl.dequeue()
836         }
837         if str.itl.isEmpty() {
838                 str.state = empty
839         } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // The next item is trailers.
840                 if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
841                         return false, err
842                 }
843                 if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
844                         return false, err
845                 }
846         } else if int(l.oiws)-str.bytesOutStanding <= 0 { // Ran out of stream quota.
847                 str.state = waitingOnStreamQuota
848         } else { // Otherwise add it back to the list of active streams.
849                 l.activeStreams.enqueue(str)
850         }
851         return false, nil
852 }