Code refactoring for bpa operator
[icn.git] / cmd / bpa-operator / vendor / k8s.io / client-go / tools / leaderelection / leaderelection.go
1 /*
2 Copyright 2015 The Kubernetes Authors.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8     http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 // Package leaderelection implements leader election of a set of endpoints.
18 // It uses an annotation in the endpoints object to store the record of the
19 // election state.
20 //
21 // This implementation does not guarantee that only one client is acting as a
22 // leader (a.k.a. fencing). A client observes timestamps captured locally to
23 // infer the state of the leader election. Thus the implementation is tolerant
24 // to arbitrary clock skew, but is not tolerant to arbitrary clock skew rate.
25 //
26 // However the level of tolerance to skew rate can be configured by setting
27 // RenewDeadline and LeaseDuration appropriately. The tolerance expressed as a
28 // maximum tolerated ratio of time passed on the fastest node to time passed on
29 // the slowest node can be approximately achieved with a configuration that sets
30 // the same ratio of LeaseDuration to RenewDeadline. For example if a user wanted
31 // to tolerate some nodes progressing forward in time twice as fast as other nodes,
32 // the user could set LeaseDuration to 60 seconds and RenewDeadline to 30 seconds.
33 //
34 // While not required, some method of clock synchronization between nodes in the
35 // cluster is highly recommended. It's important to keep in mind when configuring
36 // this client that the tolerance to skew rate varies inversely to master
37 // availability.
38 //
39 // Larger clusters often have a more lenient SLA for API latency. This should be
40 // taken into account when configuring the client. The rate of leader transitions
41 // should be monitored and RetryPeriod and LeaseDuration should be increased
42 // until the rate is stable and acceptably low. It's important to keep in mind
43 // when configuring this client that the tolerance to API latency varies inversely
44 // to master availability.
45 //
46 // DISCLAIMER: this is an alpha API. This library will likely change significantly
47 // or even be removed entirely in subsequent releases. Depend on this API at
48 // your own risk.
49 package leaderelection
50
51 import (
52         "context"
53         "fmt"
54         "reflect"
55         "time"
56
57         "k8s.io/apimachinery/pkg/api/errors"
58         metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
59         "k8s.io/apimachinery/pkg/util/clock"
60         "k8s.io/apimachinery/pkg/util/runtime"
61         "k8s.io/apimachinery/pkg/util/wait"
62         rl "k8s.io/client-go/tools/leaderelection/resourcelock"
63
64         "k8s.io/klog"
65 )
66
67 const (
68         JitterFactor = 1.2
69 )
70
71 // NewLeaderElector creates a LeaderElector from a LeaderElectionConfig
72 func NewLeaderElector(lec LeaderElectionConfig) (*LeaderElector, error) {
73         if lec.LeaseDuration <= lec.RenewDeadline {
74                 return nil, fmt.Errorf("leaseDuration must be greater than renewDeadline")
75         }
76         if lec.RenewDeadline <= time.Duration(JitterFactor*float64(lec.RetryPeriod)) {
77                 return nil, fmt.Errorf("renewDeadline must be greater than retryPeriod*JitterFactor")
78         }
79         if lec.LeaseDuration < 1 {
80                 return nil, fmt.Errorf("leaseDuration must be greater than zero")
81         }
82         if lec.RenewDeadline < 1 {
83                 return nil, fmt.Errorf("renewDeadline must be greater than zero")
84         }
85         if lec.RetryPeriod < 1 {
86                 return nil, fmt.Errorf("retryPeriod must be greater than zero")
87         }
88
89         if lec.Lock == nil {
90                 return nil, fmt.Errorf("Lock must not be nil.")
91         }
92         return &LeaderElector{
93                 config: lec,
94                 clock:  clock.RealClock{},
95         }, nil
96 }
97
98 type LeaderElectionConfig struct {
99         // Lock is the resource that will be used for locking
100         Lock rl.Interface
101
102         // LeaseDuration is the duration that non-leader candidates will
103         // wait to force acquire leadership. This is measured against time of
104         // last observed ack.
105         LeaseDuration time.Duration
106         // RenewDeadline is the duration that the acting master will retry
107         // refreshing leadership before giving up.
108         RenewDeadline time.Duration
109         // RetryPeriod is the duration the LeaderElector clients should wait
110         // between tries of actions.
111         RetryPeriod time.Duration
112
113         // Callbacks are callbacks that are triggered during certain lifecycle
114         // events of the LeaderElector
115         Callbacks LeaderCallbacks
116
117         // WatchDog is the associated health checker
118         // WatchDog may be null if its not needed/configured.
119         WatchDog *HealthzAdaptor
120
121         // Name is the name of the resource lock for debugging
122         Name string
123 }
124
125 // LeaderCallbacks are callbacks that are triggered during certain
126 // lifecycle events of the LeaderElector. These are invoked asynchronously.
127 //
128 // possible future callbacks:
129 //  * OnChallenge()
130 type LeaderCallbacks struct {
131         // OnStartedLeading is called when a LeaderElector client starts leading
132         OnStartedLeading func(context.Context)
133         // OnStoppedLeading is called when a LeaderElector client stops leading
134         OnStoppedLeading func()
135         // OnNewLeader is called when the client observes a leader that is
136         // not the previously observed leader. This includes the first observed
137         // leader when the client starts.
138         OnNewLeader func(identity string)
139 }
140
141 // LeaderElector is a leader election client.
142 type LeaderElector struct {
143         config LeaderElectionConfig
144         // internal bookkeeping
145         observedRecord rl.LeaderElectionRecord
146         observedTime   time.Time
147         // used to implement OnNewLeader(), may lag slightly from the
148         // value observedRecord.HolderIdentity if the transition has
149         // not yet been reported.
150         reportedLeader string
151
152         // clock is wrapper around time to allow for less flaky testing
153         clock clock.Clock
154
155         // name is the name of the resource lock for debugging
156         name string
157 }
158
159 // Run starts the leader election loop
160 func (le *LeaderElector) Run(ctx context.Context) {
161         defer func() {
162                 runtime.HandleCrash()
163                 le.config.Callbacks.OnStoppedLeading()
164         }()
165         if !le.acquire(ctx) {
166                 return // ctx signalled done
167         }
168         ctx, cancel := context.WithCancel(ctx)
169         defer cancel()
170         go le.config.Callbacks.OnStartedLeading(ctx)
171         le.renew(ctx)
172 }
173
174 // RunOrDie starts a client with the provided config or panics if the config
175 // fails to validate.
176 func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
177         le, err := NewLeaderElector(lec)
178         if err != nil {
179                 panic(err)
180         }
181         if lec.WatchDog != nil {
182                 lec.WatchDog.SetLeaderElection(le)
183         }
184         le.Run(ctx)
185 }
186
187 // GetLeader returns the identity of the last observed leader or returns the empty string if
188 // no leader has yet been observed.
189 func (le *LeaderElector) GetLeader() string {
190         return le.observedRecord.HolderIdentity
191 }
192
193 // IsLeader returns true if the last observed leader was this client else returns false.
194 func (le *LeaderElector) IsLeader() bool {
195         return le.observedRecord.HolderIdentity == le.config.Lock.Identity()
196 }
197
198 // acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds.
199 // Returns false if ctx signals done.
200 func (le *LeaderElector) acquire(ctx context.Context) bool {
201         ctx, cancel := context.WithCancel(ctx)
202         defer cancel()
203         succeeded := false
204         desc := le.config.Lock.Describe()
205         klog.Infof("attempting to acquire leader lease  %v...", desc)
206         wait.JitterUntil(func() {
207                 succeeded = le.tryAcquireOrRenew()
208                 le.maybeReportTransition()
209                 if !succeeded {
210                         klog.V(4).Infof("failed to acquire lease %v", desc)
211                         return
212                 }
213                 le.config.Lock.RecordEvent("became leader")
214                 klog.Infof("successfully acquired lease %v", desc)
215                 cancel()
216         }, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
217         return succeeded
218 }
219
220 // renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done.
221 func (le *LeaderElector) renew(ctx context.Context) {
222         ctx, cancel := context.WithCancel(ctx)
223         defer cancel()
224         wait.Until(func() {
225                 timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
226                 defer timeoutCancel()
227                 err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
228                         done := make(chan bool, 1)
229                         go func() {
230                                 defer close(done)
231                                 done <- le.tryAcquireOrRenew()
232                         }()
233
234                         select {
235                         case <-timeoutCtx.Done():
236                                 return false, fmt.Errorf("failed to tryAcquireOrRenew %s", timeoutCtx.Err())
237                         case result := <-done:
238                                 return result, nil
239                         }
240                 }, timeoutCtx.Done())
241
242                 le.maybeReportTransition()
243                 desc := le.config.Lock.Describe()
244                 if err == nil {
245                         klog.V(5).Infof("successfully renewed lease %v", desc)
246                         return
247                 }
248                 le.config.Lock.RecordEvent("stopped leading")
249                 klog.Infof("failed to renew lease %v: %v", desc, err)
250                 cancel()
251         }, le.config.RetryPeriod, ctx.Done())
252 }
253
254 // tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,
255 // else it tries to renew the lease if it has already been acquired. Returns true
256 // on success else returns false.
257 func (le *LeaderElector) tryAcquireOrRenew() bool {
258         now := metav1.Now()
259         leaderElectionRecord := rl.LeaderElectionRecord{
260                 HolderIdentity:       le.config.Lock.Identity(),
261                 LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
262                 RenewTime:            now,
263                 AcquireTime:          now,
264         }
265
266         // 1. obtain or create the ElectionRecord
267         oldLeaderElectionRecord, err := le.config.Lock.Get()
268         if err != nil {
269                 if !errors.IsNotFound(err) {
270                         klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
271                         return false
272                 }
273                 if err = le.config.Lock.Create(leaderElectionRecord); err != nil {
274                         klog.Errorf("error initially creating leader election record: %v", err)
275                         return false
276                 }
277                 le.observedRecord = leaderElectionRecord
278                 le.observedTime = le.clock.Now()
279                 return true
280         }
281
282         // 2. Record obtained, check the Identity & Time
283         if !reflect.DeepEqual(le.observedRecord, *oldLeaderElectionRecord) {
284                 le.observedRecord = *oldLeaderElectionRecord
285                 le.observedTime = le.clock.Now()
286         }
287         if le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
288                 !le.IsLeader() {
289                 klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
290                 return false
291         }
292
293         // 3. We're going to try to update. The leaderElectionRecord is set to it's default
294         // here. Let's correct it before updating.
295         if le.IsLeader() {
296                 leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
297                 leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
298         } else {
299                 leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
300         }
301
302         // update the lock itself
303         if err = le.config.Lock.Update(leaderElectionRecord); err != nil {
304                 klog.Errorf("Failed to update lock: %v", err)
305                 return false
306         }
307         le.observedRecord = leaderElectionRecord
308         le.observedTime = le.clock.Now()
309         return true
310 }
311
312 func (le *LeaderElector) maybeReportTransition() {
313         if le.observedRecord.HolderIdentity == le.reportedLeader {
314                 return
315         }
316         le.reportedLeader = le.observedRecord.HolderIdentity
317         if le.config.Callbacks.OnNewLeader != nil {
318                 go le.config.Callbacks.OnNewLeader(le.reportedLeader)
319         }
320 }
321
322 // Check will determine if the current lease is expired by more than timeout.
323 func (le *LeaderElector) Check(maxTolerableExpiredLease time.Duration) error {
324         if !le.IsLeader() {
325                 // Currently not concerned with the case that we are hot standby
326                 return nil
327         }
328         // If we are more than timeout seconds after the lease duration that is past the timeout
329         // on the lease renew. Time to start reporting ourselves as unhealthy. We should have
330         // died but conditions like deadlock can prevent this. (See #70819)
331         if le.clock.Since(le.observedTime) > le.config.LeaseDuration+maxTolerableExpiredLease {
332                 return fmt.Errorf("failed election to renew leadership on lease %s", le.config.Name)
333         }
334
335         return nil
336 }