2 Copyright 2015 The Kubernetes Authors.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
17 // Package framer implements simple frame decoding techniques for an io.ReadCloser
26 type lengthDelimitedFrameWriter struct {
31 func NewLengthDelimitedFrameWriter(w io.Writer) io.Writer {
32 return &lengthDelimitedFrameWriter{w: w}
35 // Write writes a single frame to the nested writer, prepending it with the length in
36 // in bytes of data (as a 4 byte, bigendian uint32).
37 func (w *lengthDelimitedFrameWriter) Write(data []byte) (int, error) {
38 binary.BigEndian.PutUint32(w.h[:], uint32(len(data)))
39 n, err := w.w.Write(w.h[:])
44 return 0, io.ErrShortWrite
46 return w.w.Write(data)
49 type lengthDelimitedFrameReader struct {
54 // NewLengthDelimitedFrameReader returns an io.Reader that will decode length-prefixed
55 // frames off of a stream.
59 // stream: message ...
60 // message: prefix body
61 // prefix: 4 byte uint32 in BigEndian order, denotes length of body
62 // body: bytes (0..prefix)
64 // If the buffer passed to Read is not long enough to contain an entire frame, io.ErrShortRead
65 // will be returned along with the number of bytes read.
66 func NewLengthDelimitedFrameReader(r io.ReadCloser) io.ReadCloser {
67 return &lengthDelimitedFrameReader{r: r}
70 // Read attempts to read an entire frame into data. If that is not possible, io.ErrShortBuffer
71 // is returned and subsequent calls will attempt to read the last frame. A frame is complete when
73 func (r *lengthDelimitedFrameReader) Read(data []byte) (int, error) {
76 n, err := io.ReadAtLeast(r.r, header[:4], 4)
81 return 0, io.ErrUnexpectedEOF
83 frameLength := int(binary.BigEndian.Uint32(header[:]))
84 r.remaining = frameLength
92 n, err := io.ReadAtLeast(r.r, data[:max], int(max))
94 if err == io.ErrShortBuffer || r.remaining > 0 {
95 return n, io.ErrShortBuffer
101 return n, io.ErrUnexpectedEOF
107 func (r *lengthDelimitedFrameReader) Close() error {
111 type jsonFrameReader struct {
113 decoder *json.Decoder
117 // NewJSONFramedReader returns an io.Reader that will decode individual JSON objects off
120 // The boundaries between each frame are valid JSON objects. A JSON parsing error will terminate
122 func NewJSONFramedReader(r io.ReadCloser) io.ReadCloser {
123 return &jsonFrameReader{
125 decoder: json.NewDecoder(r),
129 // ReadFrame decodes the next JSON object in the stream, or returns an error. The returned
130 // byte slice will be modified the next time ReadFrame is invoked and should not be altered.
131 func (r *jsonFrameReader) Read(data []byte) (int, error) {
132 // Return whatever remaining data exists from an in progress frame
133 if n := len(r.remaining); n > 0 {
135 data = append(data[0:0], r.remaining...)
141 data = append(data[0:0], r.remaining[:n]...)
142 r.remaining = r.remaining[n:]
143 return n, io.ErrShortBuffer
146 // RawMessage#Unmarshal appends to data - we reset the slice down to 0 and will either see
147 // data written to data, or be larger than data and a different array.
149 m := json.RawMessage(data[:0])
150 if err := r.decoder.Decode(&m); err != nil {
154 // If capacity of data is less than length of the message, decoder will allocate a new slice
155 // and set m to it, which means we need to copy the partial result back into data and preserve
156 // the remaining result for subsequent reads.
158 data = append(data[0:0], m[:n]...)
160 return n, io.ErrShortBuffer
165 func (r *jsonFrameReader) Close() error {