Code refactoring for bpa operator
[icn.git] / cmd / bpa-operator / vendor / k8s.io / client-go / rest / request.go
1 /*
2 Copyright 2014 The Kubernetes Authors.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8     http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 package rest
18
19 import (
20         "bytes"
21         "context"
22         "encoding/hex"
23         "fmt"
24         "io"
25         "io/ioutil"
26         "mime"
27         "net/http"
28         "net/url"
29         "path"
30         "reflect"
31         "strconv"
32         "strings"
33         "time"
34
35         "golang.org/x/net/http2"
36         "k8s.io/apimachinery/pkg/api/errors"
37         metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
38         "k8s.io/apimachinery/pkg/runtime"
39         "k8s.io/apimachinery/pkg/runtime/schema"
40         "k8s.io/apimachinery/pkg/runtime/serializer/streaming"
41         "k8s.io/apimachinery/pkg/util/net"
42         "k8s.io/apimachinery/pkg/watch"
43         restclientwatch "k8s.io/client-go/rest/watch"
44         "k8s.io/client-go/tools/metrics"
45         "k8s.io/client-go/util/flowcontrol"
46         "k8s.io/klog"
47 )
48
49 var (
50         // longThrottleLatency defines threshold for logging requests. All requests being
51         // throttle for more than longThrottleLatency will be logged.
52         longThrottleLatency = 50 * time.Millisecond
53 )
54
55 // HTTPClient is an interface for testing a request object.
56 type HTTPClient interface {
57         Do(req *http.Request) (*http.Response, error)
58 }
59
60 // ResponseWrapper is an interface for getting a response.
61 // The response may be either accessed as a raw data (the whole output is put into memory) or as a stream.
62 type ResponseWrapper interface {
63         DoRaw() ([]byte, error)
64         Stream() (io.ReadCloser, error)
65 }
66
67 // RequestConstructionError is returned when there's an error assembling a request.
68 type RequestConstructionError struct {
69         Err error
70 }
71
72 // Error returns a textual description of 'r'.
73 func (r *RequestConstructionError) Error() string {
74         return fmt.Sprintf("request construction error: '%v'", r.Err)
75 }
76
77 // Request allows for building up a request to a server in a chained fashion.
78 // Any errors are stored until the end of your call, so you only have to
79 // check once.
80 type Request struct {
81         // required
82         client HTTPClient
83         verb   string
84
85         baseURL     *url.URL
86         content     ContentConfig
87         serializers Serializers
88
89         // generic components accessible via method setters
90         pathPrefix string
91         subpath    string
92         params     url.Values
93         headers    http.Header
94
95         // structural elements of the request that are part of the Kubernetes API conventions
96         namespace    string
97         namespaceSet bool
98         resource     string
99         resourceName string
100         subresource  string
101         timeout      time.Duration
102
103         // output
104         err  error
105         body io.Reader
106
107         // This is only used for per-request timeouts, deadlines, and cancellations.
108         ctx context.Context
109
110         backoffMgr BackoffManager
111         throttle   flowcontrol.RateLimiter
112 }
113
114 // NewRequest creates a new request helper object for accessing runtime.Objects on a server.
115 func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPath string, content ContentConfig, serializers Serializers, backoff BackoffManager, throttle flowcontrol.RateLimiter, timeout time.Duration) *Request {
116         if backoff == nil {
117                 klog.V(2).Infof("Not implementing request backoff strategy.")
118                 backoff = &NoBackoff{}
119         }
120
121         pathPrefix := "/"
122         if baseURL != nil {
123                 pathPrefix = path.Join(pathPrefix, baseURL.Path)
124         }
125         r := &Request{
126                 client:      client,
127                 verb:        verb,
128                 baseURL:     baseURL,
129                 pathPrefix:  path.Join(pathPrefix, versionedAPIPath),
130                 content:     content,
131                 serializers: serializers,
132                 backoffMgr:  backoff,
133                 throttle:    throttle,
134                 timeout:     timeout,
135         }
136         switch {
137         case len(content.AcceptContentTypes) > 0:
138                 r.SetHeader("Accept", content.AcceptContentTypes)
139         case len(content.ContentType) > 0:
140                 r.SetHeader("Accept", content.ContentType+", */*")
141         }
142         return r
143 }
144
145 // Prefix adds segments to the relative beginning to the request path. These
146 // items will be placed before the optional Namespace, Resource, or Name sections.
147 // Setting AbsPath will clear any previously set Prefix segments
148 func (r *Request) Prefix(segments ...string) *Request {
149         if r.err != nil {
150                 return r
151         }
152         r.pathPrefix = path.Join(r.pathPrefix, path.Join(segments...))
153         return r
154 }
155
156 // Suffix appends segments to the end of the path. These items will be placed after the prefix and optional
157 // Namespace, Resource, or Name sections.
158 func (r *Request) Suffix(segments ...string) *Request {
159         if r.err != nil {
160                 return r
161         }
162         r.subpath = path.Join(r.subpath, path.Join(segments...))
163         return r
164 }
165
166 // Resource sets the resource to access (<resource>/[ns/<namespace>/]<name>)
167 func (r *Request) Resource(resource string) *Request {
168         if r.err != nil {
169                 return r
170         }
171         if len(r.resource) != 0 {
172                 r.err = fmt.Errorf("resource already set to %q, cannot change to %q", r.resource, resource)
173                 return r
174         }
175         if msgs := IsValidPathSegmentName(resource); len(msgs) != 0 {
176                 r.err = fmt.Errorf("invalid resource %q: %v", resource, msgs)
177                 return r
178         }
179         r.resource = resource
180         return r
181 }
182
183 // BackOff sets the request's backoff manager to the one specified,
184 // or defaults to the stub implementation if nil is provided
185 func (r *Request) BackOff(manager BackoffManager) *Request {
186         if manager == nil {
187                 r.backoffMgr = &NoBackoff{}
188                 return r
189         }
190
191         r.backoffMgr = manager
192         return r
193 }
194
195 // Throttle receives a rate-limiter and sets or replaces an existing request limiter
196 func (r *Request) Throttle(limiter flowcontrol.RateLimiter) *Request {
197         r.throttle = limiter
198         return r
199 }
200
201 // SubResource sets a sub-resource path which can be multiple segments after the resource
202 // name but before the suffix.
203 func (r *Request) SubResource(subresources ...string) *Request {
204         if r.err != nil {
205                 return r
206         }
207         subresource := path.Join(subresources...)
208         if len(r.subresource) != 0 {
209                 r.err = fmt.Errorf("subresource already set to %q, cannot change to %q", r.resource, subresource)
210                 return r
211         }
212         for _, s := range subresources {
213                 if msgs := IsValidPathSegmentName(s); len(msgs) != 0 {
214                         r.err = fmt.Errorf("invalid subresource %q: %v", s, msgs)
215                         return r
216                 }
217         }
218         r.subresource = subresource
219         return r
220 }
221
222 // Name sets the name of a resource to access (<resource>/[ns/<namespace>/]<name>)
223 func (r *Request) Name(resourceName string) *Request {
224         if r.err != nil {
225                 return r
226         }
227         if len(resourceName) == 0 {
228                 r.err = fmt.Errorf("resource name may not be empty")
229                 return r
230         }
231         if len(r.resourceName) != 0 {
232                 r.err = fmt.Errorf("resource name already set to %q, cannot change to %q", r.resourceName, resourceName)
233                 return r
234         }
235         if msgs := IsValidPathSegmentName(resourceName); len(msgs) != 0 {
236                 r.err = fmt.Errorf("invalid resource name %q: %v", resourceName, msgs)
237                 return r
238         }
239         r.resourceName = resourceName
240         return r
241 }
242
243 // Namespace applies the namespace scope to a request (<resource>/[ns/<namespace>/]<name>)
244 func (r *Request) Namespace(namespace string) *Request {
245         if r.err != nil {
246                 return r
247         }
248         if r.namespaceSet {
249                 r.err = fmt.Errorf("namespace already set to %q, cannot change to %q", r.namespace, namespace)
250                 return r
251         }
252         if msgs := IsValidPathSegmentName(namespace); len(msgs) != 0 {
253                 r.err = fmt.Errorf("invalid namespace %q: %v", namespace, msgs)
254                 return r
255         }
256         r.namespaceSet = true
257         r.namespace = namespace
258         return r
259 }
260
261 // NamespaceIfScoped is a convenience function to set a namespace if scoped is true
262 func (r *Request) NamespaceIfScoped(namespace string, scoped bool) *Request {
263         if scoped {
264                 return r.Namespace(namespace)
265         }
266         return r
267 }
268
269 // AbsPath overwrites an existing path with the segments provided. Trailing slashes are preserved
270 // when a single segment is passed.
271 func (r *Request) AbsPath(segments ...string) *Request {
272         if r.err != nil {
273                 return r
274         }
275         r.pathPrefix = path.Join(r.baseURL.Path, path.Join(segments...))
276         if len(segments) == 1 && (len(r.baseURL.Path) > 1 || len(segments[0]) > 1) && strings.HasSuffix(segments[0], "/") {
277                 // preserve any trailing slashes for legacy behavior
278                 r.pathPrefix += "/"
279         }
280         return r
281 }
282
283 // RequestURI overwrites existing path and parameters with the value of the provided server relative
284 // URI.
285 func (r *Request) RequestURI(uri string) *Request {
286         if r.err != nil {
287                 return r
288         }
289         locator, err := url.Parse(uri)
290         if err != nil {
291                 r.err = err
292                 return r
293         }
294         r.pathPrefix = locator.Path
295         if len(locator.Query()) > 0 {
296                 if r.params == nil {
297                         r.params = make(url.Values)
298                 }
299                 for k, v := range locator.Query() {
300                         r.params[k] = v
301                 }
302         }
303         return r
304 }
305
306 // Param creates a query parameter with the given string value.
307 func (r *Request) Param(paramName, s string) *Request {
308         if r.err != nil {
309                 return r
310         }
311         return r.setParam(paramName, s)
312 }
313
314 // VersionedParams will take the provided object, serialize it to a map[string][]string using the
315 // implicit RESTClient API version and the default parameter codec, and then add those as parameters
316 // to the request. Use this to provide versioned query parameters from client libraries.
317 // VersionedParams will not write query parameters that have omitempty set and are empty. If a
318 // parameter has already been set it is appended to (Params and VersionedParams are additive).
319 func (r *Request) VersionedParams(obj runtime.Object, codec runtime.ParameterCodec) *Request {
320         return r.SpecificallyVersionedParams(obj, codec, *r.content.GroupVersion)
321 }
322
323 func (r *Request) SpecificallyVersionedParams(obj runtime.Object, codec runtime.ParameterCodec, version schema.GroupVersion) *Request {
324         if r.err != nil {
325                 return r
326         }
327         params, err := codec.EncodeParameters(obj, version)
328         if err != nil {
329                 r.err = err
330                 return r
331         }
332         for k, v := range params {
333                 if r.params == nil {
334                         r.params = make(url.Values)
335                 }
336                 r.params[k] = append(r.params[k], v...)
337         }
338         return r
339 }
340
341 func (r *Request) setParam(paramName, value string) *Request {
342         if r.params == nil {
343                 r.params = make(url.Values)
344         }
345         r.params[paramName] = append(r.params[paramName], value)
346         return r
347 }
348
349 func (r *Request) SetHeader(key string, values ...string) *Request {
350         if r.headers == nil {
351                 r.headers = http.Header{}
352         }
353         r.headers.Del(key)
354         for _, value := range values {
355                 r.headers.Add(key, value)
356         }
357         return r
358 }
359
360 // Timeout makes the request use the given duration as an overall timeout for the
361 // request. Additionally, if set passes the value as "timeout" parameter in URL.
362 func (r *Request) Timeout(d time.Duration) *Request {
363         if r.err != nil {
364                 return r
365         }
366         r.timeout = d
367         return r
368 }
369
370 // Body makes the request use obj as the body. Optional.
371 // If obj is a string, try to read a file of that name.
372 // If obj is a []byte, send it directly.
373 // If obj is an io.Reader, use it directly.
374 // If obj is a runtime.Object, marshal it correctly, and set Content-Type header.
375 // If obj is a runtime.Object and nil, do nothing.
376 // Otherwise, set an error.
377 func (r *Request) Body(obj interface{}) *Request {
378         if r.err != nil {
379                 return r
380         }
381         switch t := obj.(type) {
382         case string:
383                 data, err := ioutil.ReadFile(t)
384                 if err != nil {
385                         r.err = err
386                         return r
387                 }
388                 glogBody("Request Body", data)
389                 r.body = bytes.NewReader(data)
390         case []byte:
391                 glogBody("Request Body", t)
392                 r.body = bytes.NewReader(t)
393         case io.Reader:
394                 r.body = t
395         case runtime.Object:
396                 // callers may pass typed interface pointers, therefore we must check nil with reflection
397                 if reflect.ValueOf(t).IsNil() {
398                         return r
399                 }
400                 data, err := runtime.Encode(r.serializers.Encoder, t)
401                 if err != nil {
402                         r.err = err
403                         return r
404                 }
405                 glogBody("Request Body", data)
406                 r.body = bytes.NewReader(data)
407                 r.SetHeader("Content-Type", r.content.ContentType)
408         default:
409                 r.err = fmt.Errorf("unknown type used for body: %+v", obj)
410         }
411         return r
412 }
413
414 // Context adds a context to the request. Contexts are only used for
415 // timeouts, deadlines, and cancellations.
416 func (r *Request) Context(ctx context.Context) *Request {
417         r.ctx = ctx
418         return r
419 }
420
421 // URL returns the current working URL.
422 func (r *Request) URL() *url.URL {
423         p := r.pathPrefix
424         if r.namespaceSet && len(r.namespace) > 0 {
425                 p = path.Join(p, "namespaces", r.namespace)
426         }
427         if len(r.resource) != 0 {
428                 p = path.Join(p, strings.ToLower(r.resource))
429         }
430         // Join trims trailing slashes, so preserve r.pathPrefix's trailing slash for backwards compatibility if nothing was changed
431         if len(r.resourceName) != 0 || len(r.subpath) != 0 || len(r.subresource) != 0 {
432                 p = path.Join(p, r.resourceName, r.subresource, r.subpath)
433         }
434
435         finalURL := &url.URL{}
436         if r.baseURL != nil {
437                 *finalURL = *r.baseURL
438         }
439         finalURL.Path = p
440
441         query := url.Values{}
442         for key, values := range r.params {
443                 for _, value := range values {
444                         query.Add(key, value)
445                 }
446         }
447
448         // timeout is handled specially here.
449         if r.timeout != 0 {
450                 query.Set("timeout", r.timeout.String())
451         }
452         finalURL.RawQuery = query.Encode()
453         return finalURL
454 }
455
456 // finalURLTemplate is similar to URL(), but will make all specific parameter values equal
457 // - instead of name or namespace, "{name}" and "{namespace}" will be used, and all query
458 // parameters will be reset. This creates a copy of the url so as not to change the
459 // underlying object.
460 func (r Request) finalURLTemplate() url.URL {
461         newParams := url.Values{}
462         v := []string{"{value}"}
463         for k := range r.params {
464                 newParams[k] = v
465         }
466         r.params = newParams
467         url := r.URL()
468         segments := strings.Split(r.URL().Path, "/")
469         groupIndex := 0
470         index := 0
471         if r.URL() != nil && r.baseURL != nil && strings.Contains(r.URL().Path, r.baseURL.Path) {
472                 groupIndex += len(strings.Split(r.baseURL.Path, "/"))
473         }
474         if groupIndex >= len(segments) {
475                 return *url
476         }
477
478         const CoreGroupPrefix = "api"
479         const NamedGroupPrefix = "apis"
480         isCoreGroup := segments[groupIndex] == CoreGroupPrefix
481         isNamedGroup := segments[groupIndex] == NamedGroupPrefix
482         if isCoreGroup {
483                 // checking the case of core group with /api/v1/... format
484                 index = groupIndex + 2
485         } else if isNamedGroup {
486                 // checking the case of named group with /apis/apps/v1/... format
487                 index = groupIndex + 3
488         } else {
489                 // this should not happen that the only two possibilities are /api... and /apis..., just want to put an
490                 // outlet here in case more API groups are added in future if ever possible:
491                 // https://kubernetes.io/docs/concepts/overview/kubernetes-api/#api-groups
492                 // if a wrong API groups name is encountered, return the {prefix} for url.Path
493                 url.Path = "/{prefix}"
494                 url.RawQuery = ""
495                 return *url
496         }
497         //switch segLength := len(segments) - index; segLength {
498         switch {
499         // case len(segments) - index == 1:
500         // resource (with no name) do nothing
501         case len(segments)-index == 2:
502                 // /$RESOURCE/$NAME: replace $NAME with {name}
503                 segments[index+1] = "{name}"
504         case len(segments)-index == 3:
505                 if segments[index+2] == "finalize" || segments[index+2] == "status" {
506                         // /$RESOURCE/$NAME/$SUBRESOURCE: replace $NAME with {name}
507                         segments[index+1] = "{name}"
508                 } else {
509                         // /namespace/$NAMESPACE/$RESOURCE: replace $NAMESPACE with {namespace}
510                         segments[index+1] = "{namespace}"
511                 }
512         case len(segments)-index >= 4:
513                 segments[index+1] = "{namespace}"
514                 // /namespace/$NAMESPACE/$RESOURCE/$NAME: replace $NAMESPACE with {namespace},  $NAME with {name}
515                 if segments[index+3] != "finalize" && segments[index+3] != "status" {
516                         // /$RESOURCE/$NAME/$SUBRESOURCE: replace $NAME with {name}
517                         segments[index+3] = "{name}"
518                 }
519         }
520         url.Path = path.Join(segments...)
521         return *url
522 }
523
524 func (r *Request) tryThrottle() {
525         now := time.Now()
526         if r.throttle != nil {
527                 r.throttle.Accept()
528         }
529         if latency := time.Since(now); latency > longThrottleLatency {
530                 klog.V(4).Infof("Throttling request took %v, request: %s:%s", latency, r.verb, r.URL().String())
531         }
532 }
533
534 // Watch attempts to begin watching the requested location.
535 // Returns a watch.Interface, or an error.
536 func (r *Request) Watch() (watch.Interface, error) {
537         return r.WatchWithSpecificDecoders(
538                 func(body io.ReadCloser) streaming.Decoder {
539                         framer := r.serializers.Framer.NewFrameReader(body)
540                         return streaming.NewDecoder(framer, r.serializers.StreamingSerializer)
541                 },
542                 r.serializers.Decoder,
543         )
544 }
545
546 // WatchWithSpecificDecoders attempts to begin watching the requested location with a *different* decoder.
547 // Turns out that you want one "standard" decoder for the watch event and one "personal" decoder for the content
548 // Returns a watch.Interface, or an error.
549 func (r *Request) WatchWithSpecificDecoders(wrapperDecoderFn func(io.ReadCloser) streaming.Decoder, embeddedDecoder runtime.Decoder) (watch.Interface, error) {
550         // We specifically don't want to rate limit watches, so we
551         // don't use r.throttle here.
552         if r.err != nil {
553                 return nil, r.err
554         }
555         if r.serializers.Framer == nil {
556                 return nil, fmt.Errorf("watching resources is not possible with this client (content-type: %s)", r.content.ContentType)
557         }
558
559         url := r.URL().String()
560         req, err := http.NewRequest(r.verb, url, r.body)
561         if err != nil {
562                 return nil, err
563         }
564         if r.ctx != nil {
565                 req = req.WithContext(r.ctx)
566         }
567         req.Header = r.headers
568         client := r.client
569         if client == nil {
570                 client = http.DefaultClient
571         }
572         r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
573         resp, err := client.Do(req)
574         updateURLMetrics(r, resp, err)
575         if r.baseURL != nil {
576                 if err != nil {
577                         r.backoffMgr.UpdateBackoff(r.baseURL, err, 0)
578                 } else {
579                         r.backoffMgr.UpdateBackoff(r.baseURL, err, resp.StatusCode)
580                 }
581         }
582         if err != nil {
583                 // The watch stream mechanism handles many common partial data errors, so closed
584                 // connections can be retried in many cases.
585                 if net.IsProbableEOF(err) {
586                         return watch.NewEmptyWatch(), nil
587                 }
588                 return nil, err
589         }
590         if resp.StatusCode != http.StatusOK {
591                 defer resp.Body.Close()
592                 if result := r.transformResponse(resp, req); result.err != nil {
593                         return nil, result.err
594                 }
595                 return nil, fmt.Errorf("for request '%+v', got status: %v", url, resp.StatusCode)
596         }
597         wrapperDecoder := wrapperDecoderFn(resp.Body)
598         return watch.NewStreamWatcher(restclientwatch.NewDecoder(wrapperDecoder, embeddedDecoder)), nil
599 }
600
601 // updateURLMetrics is a convenience function for pushing metrics.
602 // It also handles corner cases for incomplete/invalid request data.
603 func updateURLMetrics(req *Request, resp *http.Response, err error) {
604         url := "none"
605         if req.baseURL != nil {
606                 url = req.baseURL.Host
607         }
608
609         // Errors can be arbitrary strings. Unbound label cardinality is not suitable for a metric
610         // system so we just report them as `<error>`.
611         if err != nil {
612                 metrics.RequestResult.Increment("<error>", req.verb, url)
613         } else {
614                 //Metrics for failure codes
615                 metrics.RequestResult.Increment(strconv.Itoa(resp.StatusCode), req.verb, url)
616         }
617 }
618
619 // Stream formats and executes the request, and offers streaming of the response.
620 // Returns io.ReadCloser which could be used for streaming of the response, or an error
621 // Any non-2xx http status code causes an error.  If we get a non-2xx code, we try to convert the body into an APIStatus object.
622 // If we can, we return that as an error.  Otherwise, we create an error that lists the http status and the content of the response.
623 func (r *Request) Stream() (io.ReadCloser, error) {
624         if r.err != nil {
625                 return nil, r.err
626         }
627
628         r.tryThrottle()
629
630         url := r.URL().String()
631         req, err := http.NewRequest(r.verb, url, nil)
632         if err != nil {
633                 return nil, err
634         }
635         if r.ctx != nil {
636                 req = req.WithContext(r.ctx)
637         }
638         req.Header = r.headers
639         client := r.client
640         if client == nil {
641                 client = http.DefaultClient
642         }
643         r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
644         resp, err := client.Do(req)
645         updateURLMetrics(r, resp, err)
646         if r.baseURL != nil {
647                 if err != nil {
648                         r.backoffMgr.UpdateBackoff(r.URL(), err, 0)
649                 } else {
650                         r.backoffMgr.UpdateBackoff(r.URL(), err, resp.StatusCode)
651                 }
652         }
653         if err != nil {
654                 return nil, err
655         }
656
657         switch {
658         case (resp.StatusCode >= 200) && (resp.StatusCode < 300):
659                 return resp.Body, nil
660
661         default:
662                 // ensure we close the body before returning the error
663                 defer resp.Body.Close()
664
665                 result := r.transformResponse(resp, req)
666                 err := result.Error()
667                 if err == nil {
668                         err = fmt.Errorf("%d while accessing %v: %s", result.statusCode, url, string(result.body))
669                 }
670                 return nil, err
671         }
672 }
673
674 // request connects to the server and invokes the provided function when a server response is
675 // received. It handles retry behavior and up front validation of requests. It will invoke
676 // fn at most once. It will return an error if a problem occurred prior to connecting to the
677 // server - the provided function is responsible for handling server errors.
678 func (r *Request) request(fn func(*http.Request, *http.Response)) error {
679         //Metrics for total request latency
680         start := time.Now()
681         defer func() {
682                 metrics.RequestLatency.Observe(r.verb, r.finalURLTemplate(), time.Since(start))
683         }()
684
685         if r.err != nil {
686                 klog.V(4).Infof("Error in request: %v", r.err)
687                 return r.err
688         }
689
690         // TODO: added to catch programmer errors (invoking operations with an object with an empty namespace)
691         if (r.verb == "GET" || r.verb == "PUT" || r.verb == "DELETE") && r.namespaceSet && len(r.resourceName) > 0 && len(r.namespace) == 0 {
692                 return fmt.Errorf("an empty namespace may not be set when a resource name is provided")
693         }
694         if (r.verb == "POST") && r.namespaceSet && len(r.namespace) == 0 {
695                 return fmt.Errorf("an empty namespace may not be set during creation")
696         }
697
698         client := r.client
699         if client == nil {
700                 client = http.DefaultClient
701         }
702
703         // Right now we make about ten retry attempts if we get a Retry-After response.
704         maxRetries := 10
705         retries := 0
706         for {
707                 url := r.URL().String()
708                 req, err := http.NewRequest(r.verb, url, r.body)
709                 if err != nil {
710                         return err
711                 }
712                 if r.timeout > 0 {
713                         if r.ctx == nil {
714                                 r.ctx = context.Background()
715                         }
716                         var cancelFn context.CancelFunc
717                         r.ctx, cancelFn = context.WithTimeout(r.ctx, r.timeout)
718                         defer cancelFn()
719                 }
720                 if r.ctx != nil {
721                         req = req.WithContext(r.ctx)
722                 }
723                 req.Header = r.headers
724
725                 r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
726                 if retries > 0 {
727                         // We are retrying the request that we already send to apiserver
728                         // at least once before.
729                         // This request should also be throttled with the client-internal throttler.
730                         r.tryThrottle()
731                 }
732                 resp, err := client.Do(req)
733                 updateURLMetrics(r, resp, err)
734                 if err != nil {
735                         r.backoffMgr.UpdateBackoff(r.URL(), err, 0)
736                 } else {
737                         r.backoffMgr.UpdateBackoff(r.URL(), err, resp.StatusCode)
738                 }
739                 if err != nil {
740                         // "Connection reset by peer" is usually a transient error.
741                         // Thus in case of "GET" operations, we simply retry it.
742                         // We are not automatically retrying "write" operations, as
743                         // they are not idempotent.
744                         if !net.IsConnectionReset(err) || r.verb != "GET" {
745                                 return err
746                         }
747                         // For the purpose of retry, we set the artificial "retry-after" response.
748                         // TODO: Should we clean the original response if it exists?
749                         resp = &http.Response{
750                                 StatusCode: http.StatusInternalServerError,
751                                 Header:     http.Header{"Retry-After": []string{"1"}},
752                                 Body:       ioutil.NopCloser(bytes.NewReader([]byte{})),
753                         }
754                 }
755
756                 done := func() bool {
757                         // Ensure the response body is fully read and closed
758                         // before we reconnect, so that we reuse the same TCP
759                         // connection.
760                         defer func() {
761                                 const maxBodySlurpSize = 2 << 10
762                                 if resp.ContentLength <= maxBodySlurpSize {
763                                         io.Copy(ioutil.Discard, &io.LimitedReader{R: resp.Body, N: maxBodySlurpSize})
764                                 }
765                                 resp.Body.Close()
766                         }()
767
768                         retries++
769                         if seconds, wait := checkWait(resp); wait && retries < maxRetries {
770                                 if seeker, ok := r.body.(io.Seeker); ok && r.body != nil {
771                                         _, err := seeker.Seek(0, 0)
772                                         if err != nil {
773                                                 klog.V(4).Infof("Could not retry request, can't Seek() back to beginning of body for %T", r.body)
774                                                 fn(req, resp)
775                                                 return true
776                                         }
777                                 }
778
779                                 klog.V(4).Infof("Got a Retry-After %ds response for attempt %d to %v", seconds, retries, url)
780                                 r.backoffMgr.Sleep(time.Duration(seconds) * time.Second)
781                                 return false
782                         }
783                         fn(req, resp)
784                         return true
785                 }()
786                 if done {
787                         return nil
788                 }
789         }
790 }
791
792 // Do formats and executes the request. Returns a Result object for easy response
793 // processing.
794 //
795 // Error type:
796 //  * If the request can't be constructed, or an error happened earlier while building its
797 //    arguments: *RequestConstructionError
798 //  * If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
799 //  * http.Client.Do errors are returned directly.
800 func (r *Request) Do() Result {
801         r.tryThrottle()
802
803         var result Result
804         err := r.request(func(req *http.Request, resp *http.Response) {
805                 result = r.transformResponse(resp, req)
806         })
807         if err != nil {
808                 return Result{err: err}
809         }
810         return result
811 }
812
813 // DoRaw executes the request but does not process the response body.
814 func (r *Request) DoRaw() ([]byte, error) {
815         r.tryThrottle()
816
817         var result Result
818         err := r.request(func(req *http.Request, resp *http.Response) {
819                 result.body, result.err = ioutil.ReadAll(resp.Body)
820                 glogBody("Response Body", result.body)
821                 if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent {
822                         result.err = r.transformUnstructuredResponseError(resp, req, result.body)
823                 }
824         })
825         if err != nil {
826                 return nil, err
827         }
828         return result.body, result.err
829 }
830
831 // transformResponse converts an API response into a structured API object
832 func (r *Request) transformResponse(resp *http.Response, req *http.Request) Result {
833         var body []byte
834         if resp.Body != nil {
835                 data, err := ioutil.ReadAll(resp.Body)
836                 switch err.(type) {
837                 case nil:
838                         body = data
839                 case http2.StreamError:
840                         // This is trying to catch the scenario that the server may close the connection when sending the
841                         // response body. This can be caused by server timeout due to a slow network connection.
842                         // TODO: Add test for this. Steps may be:
843                         // 1. client-go (or kubectl) sends a GET request.
844                         // 2. Apiserver sends back the headers and then part of the body
845                         // 3. Apiserver closes connection.
846                         // 4. client-go should catch this and return an error.
847                         klog.V(2).Infof("Stream error %#v when reading response body, may be caused by closed connection.", err)
848                         streamErr := fmt.Errorf("Stream error %#v when reading response body, may be caused by closed connection. Please retry.", err)
849                         return Result{
850                                 err: streamErr,
851                         }
852                 default:
853                         klog.Errorf("Unexpected error when reading response body: %#v", err)
854                         unexpectedErr := fmt.Errorf("Unexpected error %#v when reading response body. Please retry.", err)
855                         return Result{
856                                 err: unexpectedErr,
857                         }
858                 }
859         }
860
861         glogBody("Response Body", body)
862
863         // verify the content type is accurate
864         contentType := resp.Header.Get("Content-Type")
865         decoder := r.serializers.Decoder
866         if len(contentType) > 0 && (decoder == nil || (len(r.content.ContentType) > 0 && contentType != r.content.ContentType)) {
867                 mediaType, params, err := mime.ParseMediaType(contentType)
868                 if err != nil {
869                         return Result{err: errors.NewInternalError(err)}
870                 }
871                 decoder, err = r.serializers.RenegotiatedDecoder(mediaType, params)
872                 if err != nil {
873                         // if we fail to negotiate a decoder, treat this as an unstructured error
874                         switch {
875                         case resp.StatusCode == http.StatusSwitchingProtocols:
876                                 // no-op, we've been upgraded
877                         case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent:
878                                 return Result{err: r.transformUnstructuredResponseError(resp, req, body)}
879                         }
880                         return Result{
881                                 body:        body,
882                                 contentType: contentType,
883                                 statusCode:  resp.StatusCode,
884                         }
885                 }
886         }
887
888         switch {
889         case resp.StatusCode == http.StatusSwitchingProtocols:
890                 // no-op, we've been upgraded
891         case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent:
892                 // calculate an unstructured error from the response which the Result object may use if the caller
893                 // did not return a structured error.
894                 retryAfter, _ := retryAfterSeconds(resp)
895                 err := r.newUnstructuredResponseError(body, isTextResponse(resp), resp.StatusCode, req.Method, retryAfter)
896                 return Result{
897                         body:        body,
898                         contentType: contentType,
899                         statusCode:  resp.StatusCode,
900                         decoder:     decoder,
901                         err:         err,
902                 }
903         }
904
905         return Result{
906                 body:        body,
907                 contentType: contentType,
908                 statusCode:  resp.StatusCode,
909                 decoder:     decoder,
910         }
911 }
912
913 // truncateBody decides if the body should be truncated, based on the glog Verbosity.
914 func truncateBody(body string) string {
915         max := 0
916         switch {
917         case bool(klog.V(10)):
918                 return body
919         case bool(klog.V(9)):
920                 max = 10240
921         case bool(klog.V(8)):
922                 max = 1024
923         }
924
925         if len(body) <= max {
926                 return body
927         }
928
929         return body[:max] + fmt.Sprintf(" [truncated %d chars]", len(body)-max)
930 }
931
932 // glogBody logs a body output that could be either JSON or protobuf. It explicitly guards against
933 // allocating a new string for the body output unless necessary. Uses a simple heuristic to determine
934 // whether the body is printable.
935 func glogBody(prefix string, body []byte) {
936         if klog.V(8) {
937                 if bytes.IndexFunc(body, func(r rune) bool {
938                         return r < 0x0a
939                 }) != -1 {
940                         klog.Infof("%s:\n%s", prefix, truncateBody(hex.Dump(body)))
941                 } else {
942                         klog.Infof("%s: %s", prefix, truncateBody(string(body)))
943                 }
944         }
945 }
946
947 // maxUnstructuredResponseTextBytes is an upper bound on how much output to include in the unstructured error.
948 const maxUnstructuredResponseTextBytes = 2048
949
950 // transformUnstructuredResponseError handles an error from the server that is not in a structured form.
951 // It is expected to transform any response that is not recognizable as a clear server sent error from the
952 // K8S API using the information provided with the request. In practice, HTTP proxies and client libraries
953 // introduce a level of uncertainty to the responses returned by servers that in common use result in
954 // unexpected responses. The rough structure is:
955 //
956 // 1. Assume the server sends you something sane - JSON + well defined error objects + proper codes
957 //    - this is the happy path
958 //    - when you get this output, trust what the server sends
959 // 2. Guard against empty fields / bodies in received JSON and attempt to cull sufficient info from them to
960 //    generate a reasonable facsimile of the original failure.
961 //    - Be sure to use a distinct error type or flag that allows a client to distinguish between this and error 1 above
962 // 3. Handle true disconnect failures / completely malformed data by moving up to a more generic client error
963 // 4. Distinguish between various connection failures like SSL certificates, timeouts, proxy errors, unexpected
964 //    initial contact, the presence of mismatched body contents from posted content types
965 //    - Give these a separate distinct error type and capture as much as possible of the original message
966 //
967 // TODO: introduce transformation of generic http.Client.Do() errors that separates 4.
968 func (r *Request) transformUnstructuredResponseError(resp *http.Response, req *http.Request, body []byte) error {
969         if body == nil && resp.Body != nil {
970                 if data, err := ioutil.ReadAll(&io.LimitedReader{R: resp.Body, N: maxUnstructuredResponseTextBytes}); err == nil {
971                         body = data
972                 }
973         }
974         retryAfter, _ := retryAfterSeconds(resp)
975         return r.newUnstructuredResponseError(body, isTextResponse(resp), resp.StatusCode, req.Method, retryAfter)
976 }
977
978 // newUnstructuredResponseError instantiates the appropriate generic error for the provided input. It also logs the body.
979 func (r *Request) newUnstructuredResponseError(body []byte, isTextResponse bool, statusCode int, method string, retryAfter int) error {
980         // cap the amount of output we create
981         if len(body) > maxUnstructuredResponseTextBytes {
982                 body = body[:maxUnstructuredResponseTextBytes]
983         }
984
985         message := "unknown"
986         if isTextResponse {
987                 message = strings.TrimSpace(string(body))
988         }
989         var groupResource schema.GroupResource
990         if len(r.resource) > 0 {
991                 groupResource.Group = r.content.GroupVersion.Group
992                 groupResource.Resource = r.resource
993         }
994         return errors.NewGenericServerResponse(
995                 statusCode,
996                 method,
997                 groupResource,
998                 r.resourceName,
999                 message,
1000                 retryAfter,
1001                 true,
1002         )
1003 }
1004
1005 // isTextResponse returns true if the response appears to be a textual media type.
1006 func isTextResponse(resp *http.Response) bool {
1007         contentType := resp.Header.Get("Content-Type")
1008         if len(contentType) == 0 {
1009                 return true
1010         }
1011         media, _, err := mime.ParseMediaType(contentType)
1012         if err != nil {
1013                 return false
1014         }
1015         return strings.HasPrefix(media, "text/")
1016 }
1017
1018 // checkWait returns true along with a number of seconds if the server instructed us to wait
1019 // before retrying.
1020 func checkWait(resp *http.Response) (int, bool) {
1021         switch r := resp.StatusCode; {
1022         // any 500 error code and 429 can trigger a wait
1023         case r == http.StatusTooManyRequests, r >= 500:
1024         default:
1025                 return 0, false
1026         }
1027         i, ok := retryAfterSeconds(resp)
1028         return i, ok
1029 }
1030
1031 // retryAfterSeconds returns the value of the Retry-After header and true, or 0 and false if
1032 // the header was missing or not a valid number.
1033 func retryAfterSeconds(resp *http.Response) (int, bool) {
1034         if h := resp.Header.Get("Retry-After"); len(h) > 0 {
1035                 if i, err := strconv.Atoi(h); err == nil {
1036                         return i, true
1037                 }
1038         }
1039         return 0, false
1040 }
1041
1042 // Result contains the result of calling Request.Do().
1043 type Result struct {
1044         body        []byte
1045         contentType string
1046         err         error
1047         statusCode  int
1048
1049         decoder runtime.Decoder
1050 }
1051
1052 // Raw returns the raw result.
1053 func (r Result) Raw() ([]byte, error) {
1054         return r.body, r.err
1055 }
1056
1057 // Get returns the result as an object, which means it passes through the decoder.
1058 // If the returned object is of type Status and has .Status != StatusSuccess, the
1059 // additional information in Status will be used to enrich the error.
1060 func (r Result) Get() (runtime.Object, error) {
1061         if r.err != nil {
1062                 // Check whether the result has a Status object in the body and prefer that.
1063                 return nil, r.Error()
1064         }
1065         if r.decoder == nil {
1066                 return nil, fmt.Errorf("serializer for %s doesn't exist", r.contentType)
1067         }
1068
1069         // decode, but if the result is Status return that as an error instead.
1070         out, _, err := r.decoder.Decode(r.body, nil, nil)
1071         if err != nil {
1072                 return nil, err
1073         }
1074         switch t := out.(type) {
1075         case *metav1.Status:
1076                 // any status besides StatusSuccess is considered an error.
1077                 if t.Status != metav1.StatusSuccess {
1078                         return nil, errors.FromObject(t)
1079                 }
1080         }
1081         return out, nil
1082 }
1083
1084 // StatusCode returns the HTTP status code of the request. (Only valid if no
1085 // error was returned.)
1086 func (r Result) StatusCode(statusCode *int) Result {
1087         *statusCode = r.statusCode
1088         return r
1089 }
1090
1091 // Into stores the result into obj, if possible. If obj is nil it is ignored.
1092 // If the returned object is of type Status and has .Status != StatusSuccess, the
1093 // additional information in Status will be used to enrich the error.
1094 func (r Result) Into(obj runtime.Object) error {
1095         if r.err != nil {
1096                 // Check whether the result has a Status object in the body and prefer that.
1097                 return r.Error()
1098         }
1099         if r.decoder == nil {
1100                 return fmt.Errorf("serializer for %s doesn't exist", r.contentType)
1101         }
1102         if len(r.body) == 0 {
1103                 return fmt.Errorf("0-length response")
1104         }
1105
1106         out, _, err := r.decoder.Decode(r.body, nil, obj)
1107         if err != nil || out == obj {
1108                 return err
1109         }
1110         // if a different object is returned, see if it is Status and avoid double decoding
1111         // the object.
1112         switch t := out.(type) {
1113         case *metav1.Status:
1114                 // any status besides StatusSuccess is considered an error.
1115                 if t.Status != metav1.StatusSuccess {
1116                         return errors.FromObject(t)
1117                 }
1118         }
1119         return nil
1120 }
1121
1122 // WasCreated updates the provided bool pointer to whether the server returned
1123 // 201 created or a different response.
1124 func (r Result) WasCreated(wasCreated *bool) Result {
1125         *wasCreated = r.statusCode == http.StatusCreated
1126         return r
1127 }
1128
1129 // Error returns the error executing the request, nil if no error occurred.
1130 // If the returned object is of type Status and has Status != StatusSuccess, the
1131 // additional information in Status will be used to enrich the error.
1132 // See the Request.Do() comment for what errors you might get.
1133 func (r Result) Error() error {
1134         // if we have received an unexpected server error, and we have a body and decoder, we can try to extract
1135         // a Status object.
1136         if r.err == nil || !errors.IsUnexpectedServerError(r.err) || len(r.body) == 0 || r.decoder == nil {
1137                 return r.err
1138         }
1139
1140         // attempt to convert the body into a Status object
1141         // to be backwards compatible with old servers that do not return a version, default to "v1"
1142         out, _, err := r.decoder.Decode(r.body, &schema.GroupVersionKind{Version: "v1"}, nil)
1143         if err != nil {
1144                 klog.V(5).Infof("body was not decodable (unable to check for Status): %v", err)
1145                 return r.err
1146         }
1147         switch t := out.(type) {
1148         case *metav1.Status:
1149                 // because we default the kind, we *must* check for StatusFailure
1150                 if t.Status == metav1.StatusFailure {
1151                         return errors.FromObject(t)
1152                 }
1153         }
1154         return r.err
1155 }
1156
1157 // NameMayNotBe specifies strings that cannot be used as names specified as path segments (like the REST API or etcd store)
1158 var NameMayNotBe = []string{".", ".."}
1159
1160 // NameMayNotContain specifies substrings that cannot be used in names specified as path segments (like the REST API or etcd store)
1161 var NameMayNotContain = []string{"/", "%"}
1162
1163 // IsValidPathSegmentName validates the name can be safely encoded as a path segment
1164 func IsValidPathSegmentName(name string) []string {
1165         for _, illegalName := range NameMayNotBe {
1166                 if name == illegalName {
1167                         return []string{fmt.Sprintf(`may not be '%s'`, illegalName)}
1168                 }
1169         }
1170
1171         var errors []string
1172         for _, illegalContent := range NameMayNotContain {
1173                 if strings.Contains(name, illegalContent) {
1174                         errors = append(errors, fmt.Sprintf(`may not contain '%s'`, illegalContent))
1175                 }
1176         }
1177
1178         return errors
1179 }
1180
1181 // IsValidPathSegmentPrefix validates the name can be used as a prefix for a name which will be encoded as a path segment
1182 // It does not check for exact matches with disallowed names, since an arbitrary suffix might make the name valid
1183 func IsValidPathSegmentPrefix(name string) []string {
1184         var errors []string
1185         for _, illegalContent := range NameMayNotContain {
1186                 if strings.Contains(name, illegalContent) {
1187                         errors = append(errors, fmt.Sprintf(`may not contain '%s'`, illegalContent))
1188                 }
1189         }
1190
1191         return errors
1192 }
1193
1194 // ValidatePathSegmentName validates the name can be safely encoded as a path segment
1195 func ValidatePathSegmentName(name string, prefix bool) []string {
1196         if prefix {
1197                 return IsValidPathSegmentPrefix(name)
1198         } else {
1199                 return IsValidPathSegmentName(name)
1200         }
1201 }