1 // Copyright 2014 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
13 // pipe is a goroutine-safe io.Reader/io.Writer pair. It's like
14 // io.Pipe except there are no PipeReader/PipeWriter halves, and the
15 // underlying buffer is an interface. (io.Pipe is always unbuffered)
18 c sync.Cond // c.L lazily initialized to &p.mu
19 b pipeBuffer // nil when done reading
20 err error // read error once empty. non-nil means closed.
21 breakErr error // immediate read error (caller doesn't see rest of b)
22 donec chan struct{} // closed on error
23 readFn func() // optional code to run in Read before error
26 type pipeBuffer interface {
32 func (p *pipe) Len() int {
41 // Read waits until data is available and copies bytes
42 // from the buffer into p.
43 func (p *pipe) Read(d []byte) (n int, err error) {
50 if p.breakErr != nil {
53 if p.b != nil && p.b.Len() > 0 {
58 p.readFn() // e.g. copy trailers
59 p.readFn = nil // not sticky like p.err
68 var errClosedPipeWrite = errors.New("write on closed buffer")
70 // Write copies bytes from p into the buffer and wakes a reader.
71 // It is an error to write more data than the buffer can hold.
72 func (p *pipe) Write(d []byte) (n int, err error) {
80 return 0, errClosedPipeWrite
82 if p.breakErr != nil {
83 return len(d), nil // discard when there is no reader
88 // CloseWithError causes the next Read (waking up a current blocked
89 // Read if needed) to return the provided err after all data has been
92 // The error must be non-nil.
93 func (p *pipe) CloseWithError(err error) { p.closeWithError(&p.err, err, nil) }
95 // BreakWithError causes the next Read (waking up a current blocked
96 // Read if needed) to return the provided err immediately, without
97 // waiting for unread data.
98 func (p *pipe) BreakWithError(err error) { p.closeWithError(&p.breakErr, err, nil) }
100 // closeWithErrorAndCode is like CloseWithError but also sets some code to run
101 // in the caller's goroutine before returning the error.
102 func (p *pipe) closeWithErrorAndCode(err error, fn func()) { p.closeWithError(&p.err, err, fn) }
104 func (p *pipe) closeWithError(dst *error, err error, fn func()) {
106 panic("err must be non-nil")
115 // Already been done.
119 if dst == &p.breakErr {
126 // requires p.mu be held.
127 func (p *pipe) closeDoneLocked() {
131 // Close if unclosed. This isn't racy since we always
132 // hold p.mu while closing.
140 // Err returns the error (if any) first set by BreakWithError or CloseWithError.
141 func (p *pipe) Err() error {
144 if p.breakErr != nil {
150 // Done returns a channel which is closed if and when this pipe is closed
151 // with CloseWithError.
152 func (p *pipe) Done() <-chan struct{} {
156 p.donec = make(chan struct{})
157 if p.err != nil || p.breakErr != nil {
158 // Already hit an error.