2 Copyright 2014 The Kubernetes Authors.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
35 "golang.org/x/net/http2"
36 "k8s.io/apimachinery/pkg/api/errors"
37 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
38 "k8s.io/apimachinery/pkg/runtime"
39 "k8s.io/apimachinery/pkg/runtime/schema"
40 "k8s.io/apimachinery/pkg/runtime/serializer/streaming"
41 "k8s.io/apimachinery/pkg/util/net"
42 "k8s.io/apimachinery/pkg/watch"
43 restclientwatch "k8s.io/client-go/rest/watch"
44 "k8s.io/client-go/tools/metrics"
45 "k8s.io/client-go/util/flowcontrol"
50 // longThrottleLatency defines threshold for logging requests. All requests being
51 // throttle for more than longThrottleLatency will be logged.
52 longThrottleLatency = 50 * time.Millisecond
55 // HTTPClient is an interface for testing a request object.
56 type HTTPClient interface {
57 Do(req *http.Request) (*http.Response, error)
60 // ResponseWrapper is an interface for getting a response.
61 // The response may be either accessed as a raw data (the whole output is put into memory) or as a stream.
62 type ResponseWrapper interface {
63 DoRaw() ([]byte, error)
64 Stream() (io.ReadCloser, error)
67 // RequestConstructionError is returned when there's an error assembling a request.
68 type RequestConstructionError struct {
72 // Error returns a textual description of 'r'.
73 func (r *RequestConstructionError) Error() string {
74 return fmt.Sprintf("request construction error: '%v'", r.Err)
77 // Request allows for building up a request to a server in a chained fashion.
78 // Any errors are stored until the end of your call, so you only have to
87 serializers Serializers
89 // generic components accessible via method setters
95 // structural elements of the request that are part of the Kubernetes API conventions
101 timeout time.Duration
107 // This is only used for per-request timeouts, deadlines, and cancellations.
110 backoffMgr BackoffManager
111 throttle flowcontrol.RateLimiter
114 // NewRequest creates a new request helper object for accessing runtime.Objects on a server.
115 func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPath string, content ContentConfig, serializers Serializers, backoff BackoffManager, throttle flowcontrol.RateLimiter, timeout time.Duration) *Request {
117 klog.V(2).Infof("Not implementing request backoff strategy.")
118 backoff = &NoBackoff{}
123 pathPrefix = path.Join(pathPrefix, baseURL.Path)
129 pathPrefix: path.Join(pathPrefix, versionedAPIPath),
131 serializers: serializers,
137 case len(content.AcceptContentTypes) > 0:
138 r.SetHeader("Accept", content.AcceptContentTypes)
139 case len(content.ContentType) > 0:
140 r.SetHeader("Accept", content.ContentType+", */*")
145 // Prefix adds segments to the relative beginning to the request path. These
146 // items will be placed before the optional Namespace, Resource, or Name sections.
147 // Setting AbsPath will clear any previously set Prefix segments
148 func (r *Request) Prefix(segments ...string) *Request {
152 r.pathPrefix = path.Join(r.pathPrefix, path.Join(segments...))
156 // Suffix appends segments to the end of the path. These items will be placed after the prefix and optional
157 // Namespace, Resource, or Name sections.
158 func (r *Request) Suffix(segments ...string) *Request {
162 r.subpath = path.Join(r.subpath, path.Join(segments...))
166 // Resource sets the resource to access (<resource>/[ns/<namespace>/]<name>)
167 func (r *Request) Resource(resource string) *Request {
171 if len(r.resource) != 0 {
172 r.err = fmt.Errorf("resource already set to %q, cannot change to %q", r.resource, resource)
175 if msgs := IsValidPathSegmentName(resource); len(msgs) != 0 {
176 r.err = fmt.Errorf("invalid resource %q: %v", resource, msgs)
179 r.resource = resource
183 // BackOff sets the request's backoff manager to the one specified,
184 // or defaults to the stub implementation if nil is provided
185 func (r *Request) BackOff(manager BackoffManager) *Request {
187 r.backoffMgr = &NoBackoff{}
191 r.backoffMgr = manager
195 // Throttle receives a rate-limiter and sets or replaces an existing request limiter
196 func (r *Request) Throttle(limiter flowcontrol.RateLimiter) *Request {
201 // SubResource sets a sub-resource path which can be multiple segments after the resource
202 // name but before the suffix.
203 func (r *Request) SubResource(subresources ...string) *Request {
207 subresource := path.Join(subresources...)
208 if len(r.subresource) != 0 {
209 r.err = fmt.Errorf("subresource already set to %q, cannot change to %q", r.resource, subresource)
212 for _, s := range subresources {
213 if msgs := IsValidPathSegmentName(s); len(msgs) != 0 {
214 r.err = fmt.Errorf("invalid subresource %q: %v", s, msgs)
218 r.subresource = subresource
222 // Name sets the name of a resource to access (<resource>/[ns/<namespace>/]<name>)
223 func (r *Request) Name(resourceName string) *Request {
227 if len(resourceName) == 0 {
228 r.err = fmt.Errorf("resource name may not be empty")
231 if len(r.resourceName) != 0 {
232 r.err = fmt.Errorf("resource name already set to %q, cannot change to %q", r.resourceName, resourceName)
235 if msgs := IsValidPathSegmentName(resourceName); len(msgs) != 0 {
236 r.err = fmt.Errorf("invalid resource name %q: %v", resourceName, msgs)
239 r.resourceName = resourceName
243 // Namespace applies the namespace scope to a request (<resource>/[ns/<namespace>/]<name>)
244 func (r *Request) Namespace(namespace string) *Request {
249 r.err = fmt.Errorf("namespace already set to %q, cannot change to %q", r.namespace, namespace)
252 if msgs := IsValidPathSegmentName(namespace); len(msgs) != 0 {
253 r.err = fmt.Errorf("invalid namespace %q: %v", namespace, msgs)
256 r.namespaceSet = true
257 r.namespace = namespace
261 // NamespaceIfScoped is a convenience function to set a namespace if scoped is true
262 func (r *Request) NamespaceIfScoped(namespace string, scoped bool) *Request {
264 return r.Namespace(namespace)
269 // AbsPath overwrites an existing path with the segments provided. Trailing slashes are preserved
270 // when a single segment is passed.
271 func (r *Request) AbsPath(segments ...string) *Request {
275 r.pathPrefix = path.Join(r.baseURL.Path, path.Join(segments...))
276 if len(segments) == 1 && (len(r.baseURL.Path) > 1 || len(segments[0]) > 1) && strings.HasSuffix(segments[0], "/") {
277 // preserve any trailing slashes for legacy behavior
283 // RequestURI overwrites existing path and parameters with the value of the provided server relative
285 func (r *Request) RequestURI(uri string) *Request {
289 locator, err := url.Parse(uri)
294 r.pathPrefix = locator.Path
295 if len(locator.Query()) > 0 {
297 r.params = make(url.Values)
299 for k, v := range locator.Query() {
306 // Param creates a query parameter with the given string value.
307 func (r *Request) Param(paramName, s string) *Request {
311 return r.setParam(paramName, s)
314 // VersionedParams will take the provided object, serialize it to a map[string][]string using the
315 // implicit RESTClient API version and the default parameter codec, and then add those as parameters
316 // to the request. Use this to provide versioned query parameters from client libraries.
317 // VersionedParams will not write query parameters that have omitempty set and are empty. If a
318 // parameter has already been set it is appended to (Params and VersionedParams are additive).
319 func (r *Request) VersionedParams(obj runtime.Object, codec runtime.ParameterCodec) *Request {
320 return r.SpecificallyVersionedParams(obj, codec, *r.content.GroupVersion)
323 func (r *Request) SpecificallyVersionedParams(obj runtime.Object, codec runtime.ParameterCodec, version schema.GroupVersion) *Request {
327 params, err := codec.EncodeParameters(obj, version)
332 for k, v := range params {
334 r.params = make(url.Values)
336 r.params[k] = append(r.params[k], v...)
341 func (r *Request) setParam(paramName, value string) *Request {
343 r.params = make(url.Values)
345 r.params[paramName] = append(r.params[paramName], value)
349 func (r *Request) SetHeader(key string, values ...string) *Request {
350 if r.headers == nil {
351 r.headers = http.Header{}
354 for _, value := range values {
355 r.headers.Add(key, value)
360 // Timeout makes the request use the given duration as an overall timeout for the
361 // request. Additionally, if set passes the value as "timeout" parameter in URL.
362 func (r *Request) Timeout(d time.Duration) *Request {
370 // Body makes the request use obj as the body. Optional.
371 // If obj is a string, try to read a file of that name.
372 // If obj is a []byte, send it directly.
373 // If obj is an io.Reader, use it directly.
374 // If obj is a runtime.Object, marshal it correctly, and set Content-Type header.
375 // If obj is a runtime.Object and nil, do nothing.
376 // Otherwise, set an error.
377 func (r *Request) Body(obj interface{}) *Request {
381 switch t := obj.(type) {
383 data, err := ioutil.ReadFile(t)
388 glogBody("Request Body", data)
389 r.body = bytes.NewReader(data)
391 glogBody("Request Body", t)
392 r.body = bytes.NewReader(t)
396 // callers may pass typed interface pointers, therefore we must check nil with reflection
397 if reflect.ValueOf(t).IsNil() {
400 data, err := runtime.Encode(r.serializers.Encoder, t)
405 glogBody("Request Body", data)
406 r.body = bytes.NewReader(data)
407 r.SetHeader("Content-Type", r.content.ContentType)
409 r.err = fmt.Errorf("unknown type used for body: %+v", obj)
414 // Context adds a context to the request. Contexts are only used for
415 // timeouts, deadlines, and cancellations.
416 func (r *Request) Context(ctx context.Context) *Request {
421 // URL returns the current working URL.
422 func (r *Request) URL() *url.URL {
424 if r.namespaceSet && len(r.namespace) > 0 {
425 p = path.Join(p, "namespaces", r.namespace)
427 if len(r.resource) != 0 {
428 p = path.Join(p, strings.ToLower(r.resource))
430 // Join trims trailing slashes, so preserve r.pathPrefix's trailing slash for backwards compatibility if nothing was changed
431 if len(r.resourceName) != 0 || len(r.subpath) != 0 || len(r.subresource) != 0 {
432 p = path.Join(p, r.resourceName, r.subresource, r.subpath)
435 finalURL := &url.URL{}
436 if r.baseURL != nil {
437 *finalURL = *r.baseURL
441 query := url.Values{}
442 for key, values := range r.params {
443 for _, value := range values {
444 query.Add(key, value)
448 // timeout is handled specially here.
450 query.Set("timeout", r.timeout.String())
452 finalURL.RawQuery = query.Encode()
456 // finalURLTemplate is similar to URL(), but will make all specific parameter values equal
457 // - instead of name or namespace, "{name}" and "{namespace}" will be used, and all query
458 // parameters will be reset. This creates a copy of the url so as not to change the
459 // underlying object.
460 func (r Request) finalURLTemplate() url.URL {
461 newParams := url.Values{}
462 v := []string{"{value}"}
463 for k := range r.params {
468 segments := strings.Split(r.URL().Path, "/")
471 if r.URL() != nil && r.baseURL != nil && strings.Contains(r.URL().Path, r.baseURL.Path) {
472 groupIndex += len(strings.Split(r.baseURL.Path, "/"))
474 if groupIndex >= len(segments) {
478 const CoreGroupPrefix = "api"
479 const NamedGroupPrefix = "apis"
480 isCoreGroup := segments[groupIndex] == CoreGroupPrefix
481 isNamedGroup := segments[groupIndex] == NamedGroupPrefix
483 // checking the case of core group with /api/v1/... format
484 index = groupIndex + 2
485 } else if isNamedGroup {
486 // checking the case of named group with /apis/apps/v1/... format
487 index = groupIndex + 3
489 // this should not happen that the only two possibilities are /api... and /apis..., just want to put an
490 // outlet here in case more API groups are added in future if ever possible:
491 // https://kubernetes.io/docs/concepts/overview/kubernetes-api/#api-groups
492 // if a wrong API groups name is encountered, return the {prefix} for url.Path
493 url.Path = "/{prefix}"
497 //switch segLength := len(segments) - index; segLength {
499 // case len(segments) - index == 1:
500 // resource (with no name) do nothing
501 case len(segments)-index == 2:
502 // /$RESOURCE/$NAME: replace $NAME with {name}
503 segments[index+1] = "{name}"
504 case len(segments)-index == 3:
505 if segments[index+2] == "finalize" || segments[index+2] == "status" {
506 // /$RESOURCE/$NAME/$SUBRESOURCE: replace $NAME with {name}
507 segments[index+1] = "{name}"
509 // /namespace/$NAMESPACE/$RESOURCE: replace $NAMESPACE with {namespace}
510 segments[index+1] = "{namespace}"
512 case len(segments)-index >= 4:
513 segments[index+1] = "{namespace}"
514 // /namespace/$NAMESPACE/$RESOURCE/$NAME: replace $NAMESPACE with {namespace}, $NAME with {name}
515 if segments[index+3] != "finalize" && segments[index+3] != "status" {
516 // /$RESOURCE/$NAME/$SUBRESOURCE: replace $NAME with {name}
517 segments[index+3] = "{name}"
520 url.Path = path.Join(segments...)
524 func (r *Request) tryThrottle() {
526 if r.throttle != nil {
529 if latency := time.Since(now); latency > longThrottleLatency {
530 klog.V(4).Infof("Throttling request took %v, request: %s:%s", latency, r.verb, r.URL().String())
534 // Watch attempts to begin watching the requested location.
535 // Returns a watch.Interface, or an error.
536 func (r *Request) Watch() (watch.Interface, error) {
537 return r.WatchWithSpecificDecoders(
538 func(body io.ReadCloser) streaming.Decoder {
539 framer := r.serializers.Framer.NewFrameReader(body)
540 return streaming.NewDecoder(framer, r.serializers.StreamingSerializer)
542 r.serializers.Decoder,
546 // WatchWithSpecificDecoders attempts to begin watching the requested location with a *different* decoder.
547 // Turns out that you want one "standard" decoder for the watch event and one "personal" decoder for the content
548 // Returns a watch.Interface, or an error.
549 func (r *Request) WatchWithSpecificDecoders(wrapperDecoderFn func(io.ReadCloser) streaming.Decoder, embeddedDecoder runtime.Decoder) (watch.Interface, error) {
550 // We specifically don't want to rate limit watches, so we
551 // don't use r.throttle here.
555 if r.serializers.Framer == nil {
556 return nil, fmt.Errorf("watching resources is not possible with this client (content-type: %s)", r.content.ContentType)
559 url := r.URL().String()
560 req, err := http.NewRequest(r.verb, url, r.body)
565 req = req.WithContext(r.ctx)
567 req.Header = r.headers
570 client = http.DefaultClient
572 r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
573 resp, err := client.Do(req)
574 updateURLMetrics(r, resp, err)
575 if r.baseURL != nil {
577 r.backoffMgr.UpdateBackoff(r.baseURL, err, 0)
579 r.backoffMgr.UpdateBackoff(r.baseURL, err, resp.StatusCode)
583 // The watch stream mechanism handles many common partial data errors, so closed
584 // connections can be retried in many cases.
585 if net.IsProbableEOF(err) {
586 return watch.NewEmptyWatch(), nil
590 if resp.StatusCode != http.StatusOK {
591 defer resp.Body.Close()
592 if result := r.transformResponse(resp, req); result.err != nil {
593 return nil, result.err
595 return nil, fmt.Errorf("for request '%+v', got status: %v", url, resp.StatusCode)
597 wrapperDecoder := wrapperDecoderFn(resp.Body)
598 return watch.NewStreamWatcher(restclientwatch.NewDecoder(wrapperDecoder, embeddedDecoder)), nil
601 // updateURLMetrics is a convenience function for pushing metrics.
602 // It also handles corner cases for incomplete/invalid request data.
603 func updateURLMetrics(req *Request, resp *http.Response, err error) {
605 if req.baseURL != nil {
606 url = req.baseURL.Host
609 // Errors can be arbitrary strings. Unbound label cardinality is not suitable for a metric
610 // system so we just report them as `<error>`.
612 metrics.RequestResult.Increment("<error>", req.verb, url)
614 //Metrics for failure codes
615 metrics.RequestResult.Increment(strconv.Itoa(resp.StatusCode), req.verb, url)
619 // Stream formats and executes the request, and offers streaming of the response.
620 // Returns io.ReadCloser which could be used for streaming of the response, or an error
621 // Any non-2xx http status code causes an error. If we get a non-2xx code, we try to convert the body into an APIStatus object.
622 // If we can, we return that as an error. Otherwise, we create an error that lists the http status and the content of the response.
623 func (r *Request) Stream() (io.ReadCloser, error) {
630 url := r.URL().String()
631 req, err := http.NewRequest(r.verb, url, nil)
636 req = req.WithContext(r.ctx)
638 req.Header = r.headers
641 client = http.DefaultClient
643 r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
644 resp, err := client.Do(req)
645 updateURLMetrics(r, resp, err)
646 if r.baseURL != nil {
648 r.backoffMgr.UpdateBackoff(r.URL(), err, 0)
650 r.backoffMgr.UpdateBackoff(r.URL(), err, resp.StatusCode)
658 case (resp.StatusCode >= 200) && (resp.StatusCode < 300):
659 return resp.Body, nil
662 // ensure we close the body before returning the error
663 defer resp.Body.Close()
665 result := r.transformResponse(resp, req)
666 err := result.Error()
668 err = fmt.Errorf("%d while accessing %v: %s", result.statusCode, url, string(result.body))
674 // request connects to the server and invokes the provided function when a server response is
675 // received. It handles retry behavior and up front validation of requests. It will invoke
676 // fn at most once. It will return an error if a problem occurred prior to connecting to the
677 // server - the provided function is responsible for handling server errors.
678 func (r *Request) request(fn func(*http.Request, *http.Response)) error {
679 //Metrics for total request latency
682 metrics.RequestLatency.Observe(r.verb, r.finalURLTemplate(), time.Since(start))
686 klog.V(4).Infof("Error in request: %v", r.err)
690 // TODO: added to catch programmer errors (invoking operations with an object with an empty namespace)
691 if (r.verb == "GET" || r.verb == "PUT" || r.verb == "DELETE") && r.namespaceSet && len(r.resourceName) > 0 && len(r.namespace) == 0 {
692 return fmt.Errorf("an empty namespace may not be set when a resource name is provided")
694 if (r.verb == "POST") && r.namespaceSet && len(r.namespace) == 0 {
695 return fmt.Errorf("an empty namespace may not be set during creation")
700 client = http.DefaultClient
703 // Right now we make about ten retry attempts if we get a Retry-After response.
707 url := r.URL().String()
708 req, err := http.NewRequest(r.verb, url, r.body)
714 r.ctx = context.Background()
716 var cancelFn context.CancelFunc
717 r.ctx, cancelFn = context.WithTimeout(r.ctx, r.timeout)
721 req = req.WithContext(r.ctx)
723 req.Header = r.headers
725 r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
727 // We are retrying the request that we already send to apiserver
728 // at least once before.
729 // This request should also be throttled with the client-internal throttler.
732 resp, err := client.Do(req)
733 updateURLMetrics(r, resp, err)
735 r.backoffMgr.UpdateBackoff(r.URL(), err, 0)
737 r.backoffMgr.UpdateBackoff(r.URL(), err, resp.StatusCode)
740 // "Connection reset by peer" is usually a transient error.
741 // Thus in case of "GET" operations, we simply retry it.
742 // We are not automatically retrying "write" operations, as
743 // they are not idempotent.
744 if !net.IsConnectionReset(err) || r.verb != "GET" {
747 // For the purpose of retry, we set the artificial "retry-after" response.
748 // TODO: Should we clean the original response if it exists?
749 resp = &http.Response{
750 StatusCode: http.StatusInternalServerError,
751 Header: http.Header{"Retry-After": []string{"1"}},
752 Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
756 done := func() bool {
757 // Ensure the response body is fully read and closed
758 // before we reconnect, so that we reuse the same TCP
761 const maxBodySlurpSize = 2 << 10
762 if resp.ContentLength <= maxBodySlurpSize {
763 io.Copy(ioutil.Discard, &io.LimitedReader{R: resp.Body, N: maxBodySlurpSize})
769 if seconds, wait := checkWait(resp); wait && retries < maxRetries {
770 if seeker, ok := r.body.(io.Seeker); ok && r.body != nil {
771 _, err := seeker.Seek(0, 0)
773 klog.V(4).Infof("Could not retry request, can't Seek() back to beginning of body for %T", r.body)
779 klog.V(4).Infof("Got a Retry-After %ds response for attempt %d to %v", seconds, retries, url)
780 r.backoffMgr.Sleep(time.Duration(seconds) * time.Second)
792 // Do formats and executes the request. Returns a Result object for easy response
796 // * If the request can't be constructed, or an error happened earlier while building its
797 // arguments: *RequestConstructionError
798 // * If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
799 // * http.Client.Do errors are returned directly.
800 func (r *Request) Do() Result {
804 err := r.request(func(req *http.Request, resp *http.Response) {
805 result = r.transformResponse(resp, req)
808 return Result{err: err}
813 // DoRaw executes the request but does not process the response body.
814 func (r *Request) DoRaw() ([]byte, error) {
818 err := r.request(func(req *http.Request, resp *http.Response) {
819 result.body, result.err = ioutil.ReadAll(resp.Body)
820 glogBody("Response Body", result.body)
821 if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent {
822 result.err = r.transformUnstructuredResponseError(resp, req, result.body)
828 return result.body, result.err
831 // transformResponse converts an API response into a structured API object
832 func (r *Request) transformResponse(resp *http.Response, req *http.Request) Result {
834 if resp.Body != nil {
835 data, err := ioutil.ReadAll(resp.Body)
839 case http2.StreamError:
840 // This is trying to catch the scenario that the server may close the connection when sending the
841 // response body. This can be caused by server timeout due to a slow network connection.
842 // TODO: Add test for this. Steps may be:
843 // 1. client-go (or kubectl) sends a GET request.
844 // 2. Apiserver sends back the headers and then part of the body
845 // 3. Apiserver closes connection.
846 // 4. client-go should catch this and return an error.
847 klog.V(2).Infof("Stream error %#v when reading response body, may be caused by closed connection.", err)
848 streamErr := fmt.Errorf("Stream error %#v when reading response body, may be caused by closed connection. Please retry.", err)
853 klog.Errorf("Unexpected error when reading response body: %#v", err)
854 unexpectedErr := fmt.Errorf("Unexpected error %#v when reading response body. Please retry.", err)
861 glogBody("Response Body", body)
863 // verify the content type is accurate
864 contentType := resp.Header.Get("Content-Type")
865 decoder := r.serializers.Decoder
866 if len(contentType) > 0 && (decoder == nil || (len(r.content.ContentType) > 0 && contentType != r.content.ContentType)) {
867 mediaType, params, err := mime.ParseMediaType(contentType)
869 return Result{err: errors.NewInternalError(err)}
871 decoder, err = r.serializers.RenegotiatedDecoder(mediaType, params)
873 // if we fail to negotiate a decoder, treat this as an unstructured error
875 case resp.StatusCode == http.StatusSwitchingProtocols:
876 // no-op, we've been upgraded
877 case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent:
878 return Result{err: r.transformUnstructuredResponseError(resp, req, body)}
882 contentType: contentType,
883 statusCode: resp.StatusCode,
889 case resp.StatusCode == http.StatusSwitchingProtocols:
890 // no-op, we've been upgraded
891 case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent:
892 // calculate an unstructured error from the response which the Result object may use if the caller
893 // did not return a structured error.
894 retryAfter, _ := retryAfterSeconds(resp)
895 err := r.newUnstructuredResponseError(body, isTextResponse(resp), resp.StatusCode, req.Method, retryAfter)
898 contentType: contentType,
899 statusCode: resp.StatusCode,
907 contentType: contentType,
908 statusCode: resp.StatusCode,
913 // truncateBody decides if the body should be truncated, based on the glog Verbosity.
914 func truncateBody(body string) string {
917 case bool(klog.V(10)):
919 case bool(klog.V(9)):
921 case bool(klog.V(8)):
925 if len(body) <= max {
929 return body[:max] + fmt.Sprintf(" [truncated %d chars]", len(body)-max)
932 // glogBody logs a body output that could be either JSON or protobuf. It explicitly guards against
933 // allocating a new string for the body output unless necessary. Uses a simple heuristic to determine
934 // whether the body is printable.
935 func glogBody(prefix string, body []byte) {
937 if bytes.IndexFunc(body, func(r rune) bool {
940 klog.Infof("%s:\n%s", prefix, truncateBody(hex.Dump(body)))
942 klog.Infof("%s: %s", prefix, truncateBody(string(body)))
947 // maxUnstructuredResponseTextBytes is an upper bound on how much output to include in the unstructured error.
948 const maxUnstructuredResponseTextBytes = 2048
950 // transformUnstructuredResponseError handles an error from the server that is not in a structured form.
951 // It is expected to transform any response that is not recognizable as a clear server sent error from the
952 // K8S API using the information provided with the request. In practice, HTTP proxies and client libraries
953 // introduce a level of uncertainty to the responses returned by servers that in common use result in
954 // unexpected responses. The rough structure is:
956 // 1. Assume the server sends you something sane - JSON + well defined error objects + proper codes
957 // - this is the happy path
958 // - when you get this output, trust what the server sends
959 // 2. Guard against empty fields / bodies in received JSON and attempt to cull sufficient info from them to
960 // generate a reasonable facsimile of the original failure.
961 // - Be sure to use a distinct error type or flag that allows a client to distinguish between this and error 1 above
962 // 3. Handle true disconnect failures / completely malformed data by moving up to a more generic client error
963 // 4. Distinguish between various connection failures like SSL certificates, timeouts, proxy errors, unexpected
964 // initial contact, the presence of mismatched body contents from posted content types
965 // - Give these a separate distinct error type and capture as much as possible of the original message
967 // TODO: introduce transformation of generic http.Client.Do() errors that separates 4.
968 func (r *Request) transformUnstructuredResponseError(resp *http.Response, req *http.Request, body []byte) error {
969 if body == nil && resp.Body != nil {
970 if data, err := ioutil.ReadAll(&io.LimitedReader{R: resp.Body, N: maxUnstructuredResponseTextBytes}); err == nil {
974 retryAfter, _ := retryAfterSeconds(resp)
975 return r.newUnstructuredResponseError(body, isTextResponse(resp), resp.StatusCode, req.Method, retryAfter)
978 // newUnstructuredResponseError instantiates the appropriate generic error for the provided input. It also logs the body.
979 func (r *Request) newUnstructuredResponseError(body []byte, isTextResponse bool, statusCode int, method string, retryAfter int) error {
980 // cap the amount of output we create
981 if len(body) > maxUnstructuredResponseTextBytes {
982 body = body[:maxUnstructuredResponseTextBytes]
987 message = strings.TrimSpace(string(body))
989 var groupResource schema.GroupResource
990 if len(r.resource) > 0 {
991 groupResource.Group = r.content.GroupVersion.Group
992 groupResource.Resource = r.resource
994 return errors.NewGenericServerResponse(
1005 // isTextResponse returns true if the response appears to be a textual media type.
1006 func isTextResponse(resp *http.Response) bool {
1007 contentType := resp.Header.Get("Content-Type")
1008 if len(contentType) == 0 {
1011 media, _, err := mime.ParseMediaType(contentType)
1015 return strings.HasPrefix(media, "text/")
1018 // checkWait returns true along with a number of seconds if the server instructed us to wait
1020 func checkWait(resp *http.Response) (int, bool) {
1021 switch r := resp.StatusCode; {
1022 // any 500 error code and 429 can trigger a wait
1023 case r == http.StatusTooManyRequests, r >= 500:
1027 i, ok := retryAfterSeconds(resp)
1031 // retryAfterSeconds returns the value of the Retry-After header and true, or 0 and false if
1032 // the header was missing or not a valid number.
1033 func retryAfterSeconds(resp *http.Response) (int, bool) {
1034 if h := resp.Header.Get("Retry-After"); len(h) > 0 {
1035 if i, err := strconv.Atoi(h); err == nil {
1042 // Result contains the result of calling Request.Do().
1043 type Result struct {
1049 decoder runtime.Decoder
1052 // Raw returns the raw result.
1053 func (r Result) Raw() ([]byte, error) {
1054 return r.body, r.err
1057 // Get returns the result as an object, which means it passes through the decoder.
1058 // If the returned object is of type Status and has .Status != StatusSuccess, the
1059 // additional information in Status will be used to enrich the error.
1060 func (r Result) Get() (runtime.Object, error) {
1062 // Check whether the result has a Status object in the body and prefer that.
1063 return nil, r.Error()
1065 if r.decoder == nil {
1066 return nil, fmt.Errorf("serializer for %s doesn't exist", r.contentType)
1069 // decode, but if the result is Status return that as an error instead.
1070 out, _, err := r.decoder.Decode(r.body, nil, nil)
1074 switch t := out.(type) {
1075 case *metav1.Status:
1076 // any status besides StatusSuccess is considered an error.
1077 if t.Status != metav1.StatusSuccess {
1078 return nil, errors.FromObject(t)
1084 // StatusCode returns the HTTP status code of the request. (Only valid if no
1085 // error was returned.)
1086 func (r Result) StatusCode(statusCode *int) Result {
1087 *statusCode = r.statusCode
1091 // Into stores the result into obj, if possible. If obj is nil it is ignored.
1092 // If the returned object is of type Status and has .Status != StatusSuccess, the
1093 // additional information in Status will be used to enrich the error.
1094 func (r Result) Into(obj runtime.Object) error {
1096 // Check whether the result has a Status object in the body and prefer that.
1099 if r.decoder == nil {
1100 return fmt.Errorf("serializer for %s doesn't exist", r.contentType)
1102 if len(r.body) == 0 {
1103 return fmt.Errorf("0-length response")
1106 out, _, err := r.decoder.Decode(r.body, nil, obj)
1107 if err != nil || out == obj {
1110 // if a different object is returned, see if it is Status and avoid double decoding
1112 switch t := out.(type) {
1113 case *metav1.Status:
1114 // any status besides StatusSuccess is considered an error.
1115 if t.Status != metav1.StatusSuccess {
1116 return errors.FromObject(t)
1122 // WasCreated updates the provided bool pointer to whether the server returned
1123 // 201 created or a different response.
1124 func (r Result) WasCreated(wasCreated *bool) Result {
1125 *wasCreated = r.statusCode == http.StatusCreated
1129 // Error returns the error executing the request, nil if no error occurred.
1130 // If the returned object is of type Status and has Status != StatusSuccess, the
1131 // additional information in Status will be used to enrich the error.
1132 // See the Request.Do() comment for what errors you might get.
1133 func (r Result) Error() error {
1134 // if we have received an unexpected server error, and we have a body and decoder, we can try to extract
1136 if r.err == nil || !errors.IsUnexpectedServerError(r.err) || len(r.body) == 0 || r.decoder == nil {
1140 // attempt to convert the body into a Status object
1141 // to be backwards compatible with old servers that do not return a version, default to "v1"
1142 out, _, err := r.decoder.Decode(r.body, &schema.GroupVersionKind{Version: "v1"}, nil)
1144 klog.V(5).Infof("body was not decodable (unable to check for Status): %v", err)
1147 switch t := out.(type) {
1148 case *metav1.Status:
1149 // because we default the kind, we *must* check for StatusFailure
1150 if t.Status == metav1.StatusFailure {
1151 return errors.FromObject(t)
1157 // NameMayNotBe specifies strings that cannot be used as names specified as path segments (like the REST API or etcd store)
1158 var NameMayNotBe = []string{".", ".."}
1160 // NameMayNotContain specifies substrings that cannot be used in names specified as path segments (like the REST API or etcd store)
1161 var NameMayNotContain = []string{"/", "%"}
1163 // IsValidPathSegmentName validates the name can be safely encoded as a path segment
1164 func IsValidPathSegmentName(name string) []string {
1165 for _, illegalName := range NameMayNotBe {
1166 if name == illegalName {
1167 return []string{fmt.Sprintf(`may not be '%s'`, illegalName)}
1172 for _, illegalContent := range NameMayNotContain {
1173 if strings.Contains(name, illegalContent) {
1174 errors = append(errors, fmt.Sprintf(`may not contain '%s'`, illegalContent))
1181 // IsValidPathSegmentPrefix validates the name can be used as a prefix for a name which will be encoded as a path segment
1182 // It does not check for exact matches with disallowed names, since an arbitrary suffix might make the name valid
1183 func IsValidPathSegmentPrefix(name string) []string {
1185 for _, illegalContent := range NameMayNotContain {
1186 if strings.Contains(name, illegalContent) {
1187 errors = append(errors, fmt.Sprintf(`may not contain '%s'`, illegalContent))
1194 // ValidatePathSegmentName validates the name can be safely encoded as a path segment
1195 func ValidatePathSegmentName(name string, prefix bool) []string {
1197 return IsValidPathSegmentPrefix(name)
1199 return IsValidPathSegmentName(name)