2 Copyright 2015 The Kubernetes Authors.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
26 "github.com/gogo/protobuf/proto"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/runtime"
30 "k8s.io/apimachinery/pkg/runtime/schema"
31 "k8s.io/apimachinery/pkg/runtime/serializer/recognizer"
32 "k8s.io/apimachinery/pkg/util/framer"
36 // protoEncodingPrefix serves as a magic number for an encoded protobuf message on this serializer. All
37 // proto messages serialized by this schema will be preceded by the bytes 0x6b 0x38 0x73, with the fourth
38 // byte being reserved for the encoding style. The only encoding style defined is 0x00, which means that
39 // the rest of the byte stream is a message of type k8s.io.kubernetes.pkg.runtime.Unknown (proto2).
41 // See k8s.io/apimachinery/pkg/runtime/generated.proto for details of the runtime.Unknown message.
43 // This encoding scheme is experimental, and is subject to change at any time.
44 protoEncodingPrefix = []byte{0x6b, 0x38, 0x73, 0x00}
47 type errNotMarshalable struct {
51 func (e errNotMarshalable) Error() string {
52 return fmt.Sprintf("object %v does not implement the protobuf marshalling interface and cannot be encoded to a protobuf message", e.t)
55 func (e errNotMarshalable) Status() metav1.Status {
57 Status: metav1.StatusFailure,
58 Code: http.StatusNotAcceptable,
59 Reason: metav1.StatusReason("NotAcceptable"),
64 func IsNotMarshalable(err error) bool {
65 _, ok := err.(errNotMarshalable)
66 return err != nil && ok
69 // NewSerializer creates a Protobuf serializer that handles encoding versioned objects into the proper wire form. If a typer
70 // is passed, the encoded object will have group, version, and kind fields set. If typer is nil, the objects will be written
71 // as-is (any type info passed with the object will be used).
73 // This encoding scheme is experimental, and is subject to change at any time.
74 func NewSerializer(creater runtime.ObjectCreater, typer runtime.ObjectTyper, defaultContentType string) *Serializer {
76 prefix: protoEncodingPrefix,
79 contentType: defaultContentType,
83 type Serializer struct {
85 creater runtime.ObjectCreater
86 typer runtime.ObjectTyper
90 var _ runtime.Serializer = &Serializer{}
91 var _ recognizer.RecognizingDecoder = &Serializer{}
93 // Decode attempts to convert the provided data into a protobuf message, extract the stored schema kind, apply the provided default
94 // gvk, and then load that data into an object matching the desired schema kind or the provided into. If into is *runtime.Unknown,
95 // the raw data will be extracted and no decoding will be performed. If into is not registered with the typer, then the object will
96 // be straight decoded using normal protobuf unmarshalling (the MarshalTo interface). If into is provided and the original data is
97 // not fully qualified with kind/version/group, the type of the into will be used to alter the returned gvk. On success or most
98 // errors, the method will return the calculated schema kind.
99 func (s *Serializer) Decode(originalData []byte, gvk *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
100 if versioned, ok := into.(*runtime.VersionedObjects); ok {
101 into = versioned.Last()
102 obj, actual, err := s.Decode(originalData, gvk, into)
104 return nil, actual, err
106 // the last item in versioned becomes into, so if versioned was not originally empty we reset the object
107 // array so the first position is the decoded object and the second position is the outermost object.
108 // if there were no objects in the versioned list passed to us, only add ourselves.
109 if into != nil && into != obj {
110 versioned.Objects = []runtime.Object{obj, into}
112 versioned.Objects = []runtime.Object{obj}
114 return versioned, actual, err
117 prefixLen := len(s.prefix)
119 case len(originalData) == 0:
120 // TODO: treat like decoding {} from JSON with defaulting
121 return nil, nil, fmt.Errorf("empty data")
122 case len(originalData) < prefixLen || !bytes.Equal(s.prefix, originalData[:prefixLen]):
123 return nil, nil, fmt.Errorf("provided data does not appear to be a protobuf message, expected prefix %v", s.prefix)
124 case len(originalData) == prefixLen:
125 // TODO: treat like decoding {} from JSON with defaulting
126 return nil, nil, fmt.Errorf("empty body")
129 data := originalData[prefixLen:]
130 unk := runtime.Unknown{}
131 if err := unk.Unmarshal(data); err != nil {
135 actual := unk.GroupVersionKind()
136 copyKindDefaults(&actual, gvk)
138 if intoUnknown, ok := into.(*runtime.Unknown); ok && intoUnknown != nil {
140 if ok, _, _ := s.RecognizesData(bytes.NewBuffer(unk.Raw)); ok {
141 intoUnknown.ContentType = s.contentType
143 return intoUnknown, &actual, nil
147 types, _, err := s.typer.ObjectKinds(into)
149 case runtime.IsNotRegisteredError(err):
150 pb, ok := into.(proto.Message)
152 return nil, &actual, errNotMarshalable{reflect.TypeOf(into)}
154 if err := proto.Unmarshal(unk.Raw, pb); err != nil {
155 return nil, &actual, err
157 return into, &actual, nil
159 return nil, &actual, err
161 copyKindDefaults(&actual, &types[0])
162 // if the result of defaulting did not set a version or group, ensure that at least group is set
163 // (copyKindDefaults will not assign Group if version is already set). This guarantees that the group
164 // of into is set if there is no better information from the caller or object.
165 if len(actual.Version) == 0 && len(actual.Group) == 0 {
166 actual.Group = types[0].Group
171 if len(actual.Kind) == 0 {
172 return nil, &actual, runtime.NewMissingKindErr(fmt.Sprintf("%#v", unk.TypeMeta))
174 if len(actual.Version) == 0 {
175 return nil, &actual, runtime.NewMissingVersionErr(fmt.Sprintf("%#v", unk.TypeMeta))
178 return unmarshalToObject(s.typer, s.creater, &actual, into, unk.Raw)
181 // Encode serializes the provided object to the given writer.
182 func (s *Serializer) Encode(obj runtime.Object, w io.Writer) error {
183 prefixSize := uint64(len(s.prefix))
185 var unk runtime.Unknown
186 switch t := obj.(type) {
187 case *runtime.Unknown:
188 estimatedSize := prefixSize + uint64(t.Size())
189 data := make([]byte, estimatedSize)
190 i, err := t.MarshalTo(data[prefixSize:])
195 _, err = w.Write(data[:prefixSize+uint64(i)])
198 kind := obj.GetObjectKind().GroupVersionKind()
199 unk = runtime.Unknown{
200 TypeMeta: runtime.TypeMeta{
202 APIVersion: kind.GroupVersion().String(),
207 switch t := obj.(type) {
208 case bufferedMarshaller:
209 // this path performs a single allocation during write but requires the caller to implement
210 // the more efficient Size and MarshalTo methods
211 encodedSize := uint64(t.Size())
212 estimatedSize := prefixSize + estimateUnknownSize(&unk, encodedSize)
213 data := make([]byte, estimatedSize)
215 i, err := unk.NestedMarshalTo(data[prefixSize:], t, encodedSize)
222 _, err = w.Write(data[:prefixSize+uint64(i)])
225 case proto.Marshaler:
226 // this path performs extra allocations
227 data, err := t.Marshal()
233 estimatedSize := prefixSize + uint64(unk.Size())
234 data = make([]byte, estimatedSize)
236 i, err := unk.MarshalTo(data[prefixSize:])
243 _, err = w.Write(data[:prefixSize+uint64(i)])
247 // TODO: marshal with a different content type and serializer (JSON for third party objects)
248 return errNotMarshalable{reflect.TypeOf(obj)}
252 // RecognizesData implements the RecognizingDecoder interface.
253 func (s *Serializer) RecognizesData(peek io.Reader) (bool, bool, error) {
254 prefix := make([]byte, 4)
255 n, err := peek.Read(prefix)
258 return false, false, nil
260 return false, false, err
263 return false, false, nil
265 return bytes.Equal(s.prefix, prefix), false, nil
268 // copyKindDefaults defaults dst to the value in src if dst does not have a value set.
269 func copyKindDefaults(dst, src *schema.GroupVersionKind) {
273 // apply kind and version defaulting from provided default
274 if len(dst.Kind) == 0 {
277 if len(dst.Version) == 0 && len(src.Version) > 0 {
278 dst.Group = src.Group
279 dst.Version = src.Version
283 // bufferedMarshaller describes a more efficient marshalling interface that can avoid allocating multiple
284 // byte buffers by pre-calculating the size of the final buffer needed.
285 type bufferedMarshaller interface {
287 runtime.ProtobufMarshaller
290 // estimateUnknownSize returns the expected bytes consumed by a given runtime.Unknown
291 // object with a nil RawJSON struct and the expected size of the provided buffer. The
292 // returned size will not be correct if RawJSOn is set on unk.
293 func estimateUnknownSize(unk *runtime.Unknown, byteSize uint64) uint64 {
294 size := uint64(unk.Size())
295 // protobuf uses 1 byte for the tag, a varint for the length of the array (at most 8 bytes - uint64 - here),
296 // and the size of the array.
297 size += 1 + 8 + byteSize
301 // NewRawSerializer creates a Protobuf serializer that handles encoding versioned objects into the proper wire form. If typer
302 // is not nil, the object has the group, version, and kind fields set. This serializer does not provide type information for the
303 // encoded object, and thus is not self describing (callers must know what type is being described in order to decode).
305 // This encoding scheme is experimental, and is subject to change at any time.
306 func NewRawSerializer(creater runtime.ObjectCreater, typer runtime.ObjectTyper, defaultContentType string) *RawSerializer {
307 return &RawSerializer{
310 contentType: defaultContentType,
314 // RawSerializer encodes and decodes objects without adding a runtime.Unknown wrapper (objects are encoded without identifying
316 type RawSerializer struct {
317 creater runtime.ObjectCreater
318 typer runtime.ObjectTyper
322 var _ runtime.Serializer = &RawSerializer{}
324 // Decode attempts to convert the provided data into a protobuf message, extract the stored schema kind, apply the provided default
325 // gvk, and then load that data into an object matching the desired schema kind or the provided into. If into is *runtime.Unknown,
326 // the raw data will be extracted and no decoding will be performed. If into is not registered with the typer, then the object will
327 // be straight decoded using normal protobuf unmarshalling (the MarshalTo interface). If into is provided and the original data is
328 // not fully qualified with kind/version/group, the type of the into will be used to alter the returned gvk. On success or most
329 // errors, the method will return the calculated schema kind.
330 func (s *RawSerializer) Decode(originalData []byte, gvk *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
332 return nil, nil, fmt.Errorf("this serializer requires an object to decode into: %#v", s)
335 if versioned, ok := into.(*runtime.VersionedObjects); ok {
336 into = versioned.Last()
337 obj, actual, err := s.Decode(originalData, gvk, into)
339 return nil, actual, err
341 if into != nil && into != obj {
342 versioned.Objects = []runtime.Object{obj, into}
344 versioned.Objects = []runtime.Object{obj}
346 return versioned, actual, err
349 if len(originalData) == 0 {
350 // TODO: treat like decoding {} from JSON with defaulting
351 return nil, nil, fmt.Errorf("empty data")
355 actual := &schema.GroupVersionKind{}
356 copyKindDefaults(actual, gvk)
358 if intoUnknown, ok := into.(*runtime.Unknown); ok && intoUnknown != nil {
359 intoUnknown.Raw = data
360 intoUnknown.ContentEncoding = ""
361 intoUnknown.ContentType = s.contentType
362 intoUnknown.SetGroupVersionKind(*actual)
363 return intoUnknown, actual, nil
366 types, _, err := s.typer.ObjectKinds(into)
368 case runtime.IsNotRegisteredError(err):
369 pb, ok := into.(proto.Message)
371 return nil, actual, errNotMarshalable{reflect.TypeOf(into)}
373 if err := proto.Unmarshal(data, pb); err != nil {
374 return nil, actual, err
376 return into, actual, nil
378 return nil, actual, err
380 copyKindDefaults(actual, &types[0])
381 // if the result of defaulting did not set a version or group, ensure that at least group is set
382 // (copyKindDefaults will not assign Group if version is already set). This guarantees that the group
383 // of into is set if there is no better information from the caller or object.
384 if len(actual.Version) == 0 && len(actual.Group) == 0 {
385 actual.Group = types[0].Group
389 if len(actual.Kind) == 0 {
390 return nil, actual, runtime.NewMissingKindErr("<protobuf encoded body - must provide default type>")
392 if len(actual.Version) == 0 {
393 return nil, actual, runtime.NewMissingVersionErr("<protobuf encoded body - must provide default type>")
396 return unmarshalToObject(s.typer, s.creater, actual, into, data)
399 // unmarshalToObject is the common code between decode in the raw and normal serializer.
400 func unmarshalToObject(typer runtime.ObjectTyper, creater runtime.ObjectCreater, actual *schema.GroupVersionKind, into runtime.Object, data []byte) (runtime.Object, *schema.GroupVersionKind, error) {
401 // use the target if necessary
402 obj, err := runtime.UseOrCreateObject(typer, creater, *actual, into)
404 return nil, actual, err
407 pb, ok := obj.(proto.Message)
409 return nil, actual, errNotMarshalable{reflect.TypeOf(obj)}
411 if err := proto.Unmarshal(data, pb); err != nil {
412 return nil, actual, err
414 return obj, actual, nil
417 // Encode serializes the provided object to the given writer. Overrides is ignored.
418 func (s *RawSerializer) Encode(obj runtime.Object, w io.Writer) error {
419 switch t := obj.(type) {
420 case bufferedMarshaller:
421 // this path performs a single allocation during write but requires the caller to implement
422 // the more efficient Size and MarshalTo methods
423 encodedSize := uint64(t.Size())
424 data := make([]byte, encodedSize)
426 n, err := t.MarshalTo(data)
430 _, err = w.Write(data[:n])
433 case proto.Marshaler:
434 // this path performs extra allocations
435 data, err := t.Marshal()
439 _, err = w.Write(data)
443 return errNotMarshalable{reflect.TypeOf(obj)}
447 var LengthDelimitedFramer = lengthDelimitedFramer{}
449 type lengthDelimitedFramer struct{}
451 // NewFrameWriter implements stream framing for this serializer
452 func (lengthDelimitedFramer) NewFrameWriter(w io.Writer) io.Writer {
453 return framer.NewLengthDelimitedFrameWriter(w)
456 // NewFrameReader implements stream framing for this serializer
457 func (lengthDelimitedFramer) NewFrameReader(r io.ReadCloser) io.ReadCloser {
458 return framer.NewLengthDelimitedFrameReader(r)