Code refactoring for bpa operator
[icn.git] / cmd / bpa-operator / vendor / k8s.io / client-go / plugin / pkg / client / auth / exec / exec.go
1 /*
2 Copyright 2018 The Kubernetes Authors.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8     http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 package exec
18
19 import (
20         "bytes"
21         "context"
22         "crypto/tls"
23         "errors"
24         "fmt"
25         "io"
26         "net"
27         "net/http"
28         "os"
29         "os/exec"
30         "reflect"
31         "sync"
32         "time"
33
34         "golang.org/x/crypto/ssh/terminal"
35         "k8s.io/apimachinery/pkg/apis/meta/v1"
36         "k8s.io/apimachinery/pkg/runtime"
37         "k8s.io/apimachinery/pkg/runtime/schema"
38         "k8s.io/apimachinery/pkg/runtime/serializer"
39         utilruntime "k8s.io/apimachinery/pkg/util/runtime"
40         "k8s.io/client-go/pkg/apis/clientauthentication"
41         "k8s.io/client-go/pkg/apis/clientauthentication/v1alpha1"
42         "k8s.io/client-go/pkg/apis/clientauthentication/v1beta1"
43         "k8s.io/client-go/tools/clientcmd/api"
44         "k8s.io/client-go/transport"
45         "k8s.io/client-go/util/connrotation"
46         "k8s.io/klog"
47 )
48
49 const execInfoEnv = "KUBERNETES_EXEC_INFO"
50
51 var scheme = runtime.NewScheme()
52 var codecs = serializer.NewCodecFactory(scheme)
53
54 func init() {
55         v1.AddToGroupVersion(scheme, schema.GroupVersion{Version: "v1"})
56         utilruntime.Must(v1alpha1.AddToScheme(scheme))
57         utilruntime.Must(v1beta1.AddToScheme(scheme))
58         utilruntime.Must(clientauthentication.AddToScheme(scheme))
59 }
60
61 var (
62         // Since transports can be constantly re-initialized by programs like kubectl,
63         // keep a cache of initialized authenticators keyed by a hash of their config.
64         globalCache = newCache()
65         // The list of API versions we accept.
66         apiVersions = map[string]schema.GroupVersion{
67                 v1alpha1.SchemeGroupVersion.String(): v1alpha1.SchemeGroupVersion,
68                 v1beta1.SchemeGroupVersion.String():  v1beta1.SchemeGroupVersion,
69         }
70 )
71
72 func newCache() *cache {
73         return &cache{m: make(map[string]*Authenticator)}
74 }
75
76 func cacheKey(c *api.ExecConfig) string {
77         return fmt.Sprintf("%#v", c)
78 }
79
80 type cache struct {
81         mu sync.Mutex
82         m  map[string]*Authenticator
83 }
84
85 func (c *cache) get(s string) (*Authenticator, bool) {
86         c.mu.Lock()
87         defer c.mu.Unlock()
88         a, ok := c.m[s]
89         return a, ok
90 }
91
92 // put inserts an authenticator into the cache. If an authenticator is already
93 // associated with the key, the first one is returned instead.
94 func (c *cache) put(s string, a *Authenticator) *Authenticator {
95         c.mu.Lock()
96         defer c.mu.Unlock()
97         existing, ok := c.m[s]
98         if ok {
99                 return existing
100         }
101         c.m[s] = a
102         return a
103 }
104
105 // GetAuthenticator returns an exec-based plugin for providing client credentials.
106 func GetAuthenticator(config *api.ExecConfig) (*Authenticator, error) {
107         return newAuthenticator(globalCache, config)
108 }
109
110 func newAuthenticator(c *cache, config *api.ExecConfig) (*Authenticator, error) {
111         key := cacheKey(config)
112         if a, ok := c.get(key); ok {
113                 return a, nil
114         }
115
116         gv, ok := apiVersions[config.APIVersion]
117         if !ok {
118                 return nil, fmt.Errorf("exec plugin: invalid apiVersion %q", config.APIVersion)
119         }
120
121         a := &Authenticator{
122                 cmd:   config.Command,
123                 args:  config.Args,
124                 group: gv,
125
126                 stdin:       os.Stdin,
127                 stderr:      os.Stderr,
128                 interactive: terminal.IsTerminal(int(os.Stdout.Fd())),
129                 now:         time.Now,
130                 environ:     os.Environ,
131         }
132
133         for _, env := range config.Env {
134                 a.env = append(a.env, env.Name+"="+env.Value)
135         }
136
137         return c.put(key, a), nil
138 }
139
140 // Authenticator is a client credential provider that rotates credentials by executing a plugin.
141 // The plugin input and output are defined by the API group client.authentication.k8s.io.
142 type Authenticator struct {
143         // Set by the config
144         cmd   string
145         args  []string
146         group schema.GroupVersion
147         env   []string
148
149         // Stubbable for testing
150         stdin       io.Reader
151         stderr      io.Writer
152         interactive bool
153         now         func() time.Time
154         environ     func() []string
155
156         // Cached results.
157         //
158         // The mutex also guards calling the plugin. Since the plugin could be
159         // interactive we want to make sure it's only called once.
160         mu          sync.Mutex
161         cachedCreds *credentials
162         exp         time.Time
163
164         onRotate func()
165 }
166
167 type credentials struct {
168         token string
169         cert  *tls.Certificate
170 }
171
172 // UpdateTransportConfig updates the transport.Config to use credentials
173 // returned by the plugin.
174 func (a *Authenticator) UpdateTransportConfig(c *transport.Config) error {
175         wt := c.WrapTransport
176         c.WrapTransport = func(rt http.RoundTripper) http.RoundTripper {
177                 if wt != nil {
178                         rt = wt(rt)
179                 }
180                 return &roundTripper{a, rt}
181         }
182
183         if c.TLS.GetCert != nil {
184                 return errors.New("can't add TLS certificate callback: transport.Config.TLS.GetCert already set")
185         }
186         c.TLS.GetCert = a.cert
187
188         var dial func(ctx context.Context, network, addr string) (net.Conn, error)
189         if c.Dial != nil {
190                 dial = c.Dial
191         } else {
192                 dial = (&net.Dialer{Timeout: 30 * time.Second, KeepAlive: 30 * time.Second}).DialContext
193         }
194         d := connrotation.NewDialer(dial)
195         a.onRotate = d.CloseAll
196         c.Dial = d.DialContext
197
198         return nil
199 }
200
201 type roundTripper struct {
202         a    *Authenticator
203         base http.RoundTripper
204 }
205
206 func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
207         // If a user has already set credentials, use that. This makes commands like
208         // "kubectl get --token (token) pods" work.
209         if req.Header.Get("Authorization") != "" {
210                 return r.base.RoundTrip(req)
211         }
212
213         creds, err := r.a.getCreds()
214         if err != nil {
215                 return nil, fmt.Errorf("getting credentials: %v", err)
216         }
217         if creds.token != "" {
218                 req.Header.Set("Authorization", "Bearer "+creds.token)
219         }
220
221         res, err := r.base.RoundTrip(req)
222         if err != nil {
223                 return nil, err
224         }
225         if res.StatusCode == http.StatusUnauthorized {
226                 resp := &clientauthentication.Response{
227                         Header: res.Header,
228                         Code:   int32(res.StatusCode),
229                 }
230                 if err := r.a.maybeRefreshCreds(creds, resp); err != nil {
231                         klog.Errorf("refreshing credentials: %v", err)
232                 }
233         }
234         return res, nil
235 }
236
237 func (a *Authenticator) credsExpired() bool {
238         if a.exp.IsZero() {
239                 return false
240         }
241         return a.now().After(a.exp)
242 }
243
244 func (a *Authenticator) cert() (*tls.Certificate, error) {
245         creds, err := a.getCreds()
246         if err != nil {
247                 return nil, err
248         }
249         return creds.cert, nil
250 }
251
252 func (a *Authenticator) getCreds() (*credentials, error) {
253         a.mu.Lock()
254         defer a.mu.Unlock()
255         if a.cachedCreds != nil && !a.credsExpired() {
256                 return a.cachedCreds, nil
257         }
258
259         if err := a.refreshCredsLocked(nil); err != nil {
260                 return nil, err
261         }
262         return a.cachedCreds, nil
263 }
264
265 // maybeRefreshCreds executes the plugin to force a rotation of the
266 // credentials, unless they were rotated already.
267 func (a *Authenticator) maybeRefreshCreds(creds *credentials, r *clientauthentication.Response) error {
268         a.mu.Lock()
269         defer a.mu.Unlock()
270
271         // Since we're not making a new pointer to a.cachedCreds in getCreds, no
272         // need to do deep comparison.
273         if creds != a.cachedCreds {
274                 // Credentials already rotated.
275                 return nil
276         }
277
278         return a.refreshCredsLocked(r)
279 }
280
281 // refreshCredsLocked executes the plugin and reads the credentials from
282 // stdout. It must be called while holding the Authenticator's mutex.
283 func (a *Authenticator) refreshCredsLocked(r *clientauthentication.Response) error {
284         cred := &clientauthentication.ExecCredential{
285                 Spec: clientauthentication.ExecCredentialSpec{
286                         Response:    r,
287                         Interactive: a.interactive,
288                 },
289         }
290
291         env := append(a.environ(), a.env...)
292         if a.group == v1alpha1.SchemeGroupVersion {
293                 // Input spec disabled for beta due to lack of use. Possibly re-enable this later if
294                 // someone wants it back.
295                 //
296                 // See: https://github.com/kubernetes/kubernetes/issues/61796
297                 data, err := runtime.Encode(codecs.LegacyCodec(a.group), cred)
298                 if err != nil {
299                         return fmt.Errorf("encode ExecCredentials: %v", err)
300                 }
301                 env = append(env, fmt.Sprintf("%s=%s", execInfoEnv, data))
302         }
303
304         stdout := &bytes.Buffer{}
305         cmd := exec.Command(a.cmd, a.args...)
306         cmd.Env = env
307         cmd.Stderr = a.stderr
308         cmd.Stdout = stdout
309         if a.interactive {
310                 cmd.Stdin = a.stdin
311         }
312
313         if err := cmd.Run(); err != nil {
314                 return fmt.Errorf("exec: %v", err)
315         }
316
317         _, gvk, err := codecs.UniversalDecoder(a.group).Decode(stdout.Bytes(), nil, cred)
318         if err != nil {
319                 return fmt.Errorf("decoding stdout: %v", err)
320         }
321         if gvk.Group != a.group.Group || gvk.Version != a.group.Version {
322                 return fmt.Errorf("exec plugin is configured to use API version %s, plugin returned version %s",
323                         a.group, schema.GroupVersion{Group: gvk.Group, Version: gvk.Version})
324         }
325
326         if cred.Status == nil {
327                 return fmt.Errorf("exec plugin didn't return a status field")
328         }
329         if cred.Status.Token == "" && cred.Status.ClientCertificateData == "" && cred.Status.ClientKeyData == "" {
330                 return fmt.Errorf("exec plugin didn't return a token or cert/key pair")
331         }
332         if (cred.Status.ClientCertificateData == "") != (cred.Status.ClientKeyData == "") {
333                 return fmt.Errorf("exec plugin returned only certificate or key, not both")
334         }
335
336         if cred.Status.ExpirationTimestamp != nil {
337                 a.exp = cred.Status.ExpirationTimestamp.Time
338         } else {
339                 a.exp = time.Time{}
340         }
341
342         newCreds := &credentials{
343                 token: cred.Status.Token,
344         }
345         if cred.Status.ClientKeyData != "" && cred.Status.ClientCertificateData != "" {
346                 cert, err := tls.X509KeyPair([]byte(cred.Status.ClientCertificateData), []byte(cred.Status.ClientKeyData))
347                 if err != nil {
348                         return fmt.Errorf("failed parsing client key/certificate: %v", err)
349                 }
350                 newCreds.cert = &cert
351         }
352
353         oldCreds := a.cachedCreds
354         a.cachedCreds = newCreds
355         // Only close all connections when TLS cert rotates. Token rotation doesn't
356         // need the extra noise.
357         if a.onRotate != nil && oldCreds != nil && !reflect.DeepEqual(oldCreds.cert, a.cachedCreds.cert) {
358                 a.onRotate()
359         }
360         return nil
361 }