Code refactoring for bpa operator
[icn.git] / cmd / bpa-operator / vendor / k8s.io / apimachinery / pkg / runtime / serializer / streaming / streaming.go
1 /*
2 Copyright 2015 The Kubernetes Authors.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8     http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 // Package streaming implements encoder and decoder for streams
18 // of runtime.Objects over io.Writer/Readers.
19 package streaming
20
21 import (
22         "bytes"
23         "fmt"
24         "io"
25
26         "k8s.io/apimachinery/pkg/runtime"
27         "k8s.io/apimachinery/pkg/runtime/schema"
28 )
29
30 // Encoder is a runtime.Encoder on a stream.
31 type Encoder interface {
32         // Encode will write the provided object to the stream or return an error. It obeys the same
33         // contract as runtime.VersionedEncoder.
34         Encode(obj runtime.Object) error
35 }
36
37 // Decoder is a runtime.Decoder from a stream.
38 type Decoder interface {
39         // Decode will return io.EOF when no more objects are available.
40         Decode(defaults *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error)
41         // Close closes the underlying stream.
42         Close() error
43 }
44
45 // Serializer is a factory for creating encoders and decoders that work over streams.
46 type Serializer interface {
47         NewEncoder(w io.Writer) Encoder
48         NewDecoder(r io.ReadCloser) Decoder
49 }
50
51 type decoder struct {
52         reader    io.ReadCloser
53         decoder   runtime.Decoder
54         buf       []byte
55         maxBytes  int
56         resetRead bool
57 }
58
59 // NewDecoder creates a streaming decoder that reads object chunks from r and decodes them with d.
60 // The reader is expected to return ErrShortRead if the provided buffer is not large enough to read
61 // an entire object.
62 func NewDecoder(r io.ReadCloser, d runtime.Decoder) Decoder {
63         return &decoder{
64                 reader:   r,
65                 decoder:  d,
66                 buf:      make([]byte, 1024),
67                 maxBytes: 16 * 1024 * 1024,
68         }
69 }
70
71 var ErrObjectTooLarge = fmt.Errorf("object to decode was longer than maximum allowed size")
72
73 // Decode reads the next object from the stream and decodes it.
74 func (d *decoder) Decode(defaults *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
75         base := 0
76         for {
77                 n, err := d.reader.Read(d.buf[base:])
78                 if err == io.ErrShortBuffer {
79                         if n == 0 {
80                                 return nil, nil, fmt.Errorf("got short buffer with n=0, base=%d, cap=%d", base, cap(d.buf))
81                         }
82                         if d.resetRead {
83                                 continue
84                         }
85                         // double the buffer size up to maxBytes
86                         if len(d.buf) < d.maxBytes {
87                                 base += n
88                                 d.buf = append(d.buf, make([]byte, len(d.buf))...)
89                                 continue
90                         }
91                         // must read the rest of the frame (until we stop getting ErrShortBuffer)
92                         d.resetRead = true
93                         base = 0
94                         return nil, nil, ErrObjectTooLarge
95                 }
96                 if err != nil {
97                         return nil, nil, err
98                 }
99                 if d.resetRead {
100                         // now that we have drained the large read, continue
101                         d.resetRead = false
102                         continue
103                 }
104                 base += n
105                 break
106         }
107         return d.decoder.Decode(d.buf[:base], defaults, into)
108 }
109
110 func (d *decoder) Close() error {
111         return d.reader.Close()
112 }
113
114 type encoder struct {
115         writer  io.Writer
116         encoder runtime.Encoder
117         buf     *bytes.Buffer
118 }
119
120 // NewEncoder returns a new streaming encoder.
121 func NewEncoder(w io.Writer, e runtime.Encoder) Encoder {
122         return &encoder{
123                 writer:  w,
124                 encoder: e,
125                 buf:     &bytes.Buffer{},
126         }
127 }
128
129 // Encode writes the provided object to the nested writer.
130 func (e *encoder) Encode(obj runtime.Object) error {
131         if err := e.encoder.Encode(obj, e.buf); err != nil {
132                 return err
133         }
134         _, err := e.writer.Write(e.buf.Bytes())
135         e.buf.Reset()
136         return err
137 }