2 Copyright 2015 The Kubernetes Authors.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
17 // Package leaderelection implements leader election of a set of endpoints.
18 // It uses an annotation in the endpoints object to store the record of the
21 // This implementation does not guarantee that only one client is acting as a
22 // leader (a.k.a. fencing). A client observes timestamps captured locally to
23 // infer the state of the leader election. Thus the implementation is tolerant
24 // to arbitrary clock skew, but is not tolerant to arbitrary clock skew rate.
26 // However the level of tolerance to skew rate can be configured by setting
27 // RenewDeadline and LeaseDuration appropriately. The tolerance expressed as a
28 // maximum tolerated ratio of time passed on the fastest node to time passed on
29 // the slowest node can be approximately achieved with a configuration that sets
30 // the same ratio of LeaseDuration to RenewDeadline. For example if a user wanted
31 // to tolerate some nodes progressing forward in time twice as fast as other nodes,
32 // the user could set LeaseDuration to 60 seconds and RenewDeadline to 30 seconds.
34 // While not required, some method of clock synchronization between nodes in the
35 // cluster is highly recommended. It's important to keep in mind when configuring
36 // this client that the tolerance to skew rate varies inversely to master
39 // Larger clusters often have a more lenient SLA for API latency. This should be
40 // taken into account when configuring the client. The rate of leader transitions
41 // should be monitored and RetryPeriod and LeaseDuration should be increased
42 // until the rate is stable and acceptably low. It's important to keep in mind
43 // when configuring this client that the tolerance to API latency varies inversely
44 // to master availability.
46 // DISCLAIMER: this is an alpha API. This library will likely change significantly
47 // or even be removed entirely in subsequent releases. Depend on this API at
49 package leaderelection
57 "k8s.io/apimachinery/pkg/api/errors"
58 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
59 "k8s.io/apimachinery/pkg/util/clock"
60 "k8s.io/apimachinery/pkg/util/runtime"
61 "k8s.io/apimachinery/pkg/util/wait"
62 rl "k8s.io/client-go/tools/leaderelection/resourcelock"
71 // NewLeaderElector creates a LeaderElector from a LeaderElectionConfig
72 func NewLeaderElector(lec LeaderElectionConfig) (*LeaderElector, error) {
73 if lec.LeaseDuration <= lec.RenewDeadline {
74 return nil, fmt.Errorf("leaseDuration must be greater than renewDeadline")
76 if lec.RenewDeadline <= time.Duration(JitterFactor*float64(lec.RetryPeriod)) {
77 return nil, fmt.Errorf("renewDeadline must be greater than retryPeriod*JitterFactor")
79 if lec.LeaseDuration < 1 {
80 return nil, fmt.Errorf("leaseDuration must be greater than zero")
82 if lec.RenewDeadline < 1 {
83 return nil, fmt.Errorf("renewDeadline must be greater than zero")
85 if lec.RetryPeriod < 1 {
86 return nil, fmt.Errorf("retryPeriod must be greater than zero")
90 return nil, fmt.Errorf("Lock must not be nil.")
92 return &LeaderElector{
94 clock: clock.RealClock{},
98 type LeaderElectionConfig struct {
99 // Lock is the resource that will be used for locking
102 // LeaseDuration is the duration that non-leader candidates will
103 // wait to force acquire leadership. This is measured against time of
104 // last observed ack.
105 LeaseDuration time.Duration
106 // RenewDeadline is the duration that the acting master will retry
107 // refreshing leadership before giving up.
108 RenewDeadline time.Duration
109 // RetryPeriod is the duration the LeaderElector clients should wait
110 // between tries of actions.
111 RetryPeriod time.Duration
113 // Callbacks are callbacks that are triggered during certain lifecycle
114 // events of the LeaderElector
115 Callbacks LeaderCallbacks
117 // WatchDog is the associated health checker
118 // WatchDog may be null if its not needed/configured.
119 WatchDog *HealthzAdaptor
121 // Name is the name of the resource lock for debugging
125 // LeaderCallbacks are callbacks that are triggered during certain
126 // lifecycle events of the LeaderElector. These are invoked asynchronously.
128 // possible future callbacks:
130 type LeaderCallbacks struct {
131 // OnStartedLeading is called when a LeaderElector client starts leading
132 OnStartedLeading func(context.Context)
133 // OnStoppedLeading is called when a LeaderElector client stops leading
134 OnStoppedLeading func()
135 // OnNewLeader is called when the client observes a leader that is
136 // not the previously observed leader. This includes the first observed
137 // leader when the client starts.
138 OnNewLeader func(identity string)
141 // LeaderElector is a leader election client.
142 type LeaderElector struct {
143 config LeaderElectionConfig
144 // internal bookkeeping
145 observedRecord rl.LeaderElectionRecord
146 observedTime time.Time
147 // used to implement OnNewLeader(), may lag slightly from the
148 // value observedRecord.HolderIdentity if the transition has
149 // not yet been reported.
150 reportedLeader string
152 // clock is wrapper around time to allow for less flaky testing
155 // name is the name of the resource lock for debugging
159 // Run starts the leader election loop
160 func (le *LeaderElector) Run(ctx context.Context) {
162 runtime.HandleCrash()
163 le.config.Callbacks.OnStoppedLeading()
165 if !le.acquire(ctx) {
166 return // ctx signalled done
168 ctx, cancel := context.WithCancel(ctx)
170 go le.config.Callbacks.OnStartedLeading(ctx)
174 // RunOrDie starts a client with the provided config or panics if the config
175 // fails to validate.
176 func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
177 le, err := NewLeaderElector(lec)
181 if lec.WatchDog != nil {
182 lec.WatchDog.SetLeaderElection(le)
187 // GetLeader returns the identity of the last observed leader or returns the empty string if
188 // no leader has yet been observed.
189 func (le *LeaderElector) GetLeader() string {
190 return le.observedRecord.HolderIdentity
193 // IsLeader returns true if the last observed leader was this client else returns false.
194 func (le *LeaderElector) IsLeader() bool {
195 return le.observedRecord.HolderIdentity == le.config.Lock.Identity()
198 // acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds.
199 // Returns false if ctx signals done.
200 func (le *LeaderElector) acquire(ctx context.Context) bool {
201 ctx, cancel := context.WithCancel(ctx)
204 desc := le.config.Lock.Describe()
205 klog.Infof("attempting to acquire leader lease %v...", desc)
206 wait.JitterUntil(func() {
207 succeeded = le.tryAcquireOrRenew()
208 le.maybeReportTransition()
210 klog.V(4).Infof("failed to acquire lease %v", desc)
213 le.config.Lock.RecordEvent("became leader")
214 klog.Infof("successfully acquired lease %v", desc)
216 }, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
220 // renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done.
221 func (le *LeaderElector) renew(ctx context.Context) {
222 ctx, cancel := context.WithCancel(ctx)
225 timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
226 defer timeoutCancel()
227 err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
228 done := make(chan bool, 1)
231 done <- le.tryAcquireOrRenew()
235 case <-timeoutCtx.Done():
236 return false, fmt.Errorf("failed to tryAcquireOrRenew %s", timeoutCtx.Err())
237 case result := <-done:
240 }, timeoutCtx.Done())
242 le.maybeReportTransition()
243 desc := le.config.Lock.Describe()
245 klog.V(5).Infof("successfully renewed lease %v", desc)
248 le.config.Lock.RecordEvent("stopped leading")
249 klog.Infof("failed to renew lease %v: %v", desc, err)
251 }, le.config.RetryPeriod, ctx.Done())
254 // tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,
255 // else it tries to renew the lease if it has already been acquired. Returns true
256 // on success else returns false.
257 func (le *LeaderElector) tryAcquireOrRenew() bool {
259 leaderElectionRecord := rl.LeaderElectionRecord{
260 HolderIdentity: le.config.Lock.Identity(),
261 LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
266 // 1. obtain or create the ElectionRecord
267 oldLeaderElectionRecord, err := le.config.Lock.Get()
269 if !errors.IsNotFound(err) {
270 klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
273 if err = le.config.Lock.Create(leaderElectionRecord); err != nil {
274 klog.Errorf("error initially creating leader election record: %v", err)
277 le.observedRecord = leaderElectionRecord
278 le.observedTime = le.clock.Now()
282 // 2. Record obtained, check the Identity & Time
283 if !reflect.DeepEqual(le.observedRecord, *oldLeaderElectionRecord) {
284 le.observedRecord = *oldLeaderElectionRecord
285 le.observedTime = le.clock.Now()
287 if le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
289 klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
293 // 3. We're going to try to update. The leaderElectionRecord is set to it's default
294 // here. Let's correct it before updating.
296 leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
297 leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
299 leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
302 // update the lock itself
303 if err = le.config.Lock.Update(leaderElectionRecord); err != nil {
304 klog.Errorf("Failed to update lock: %v", err)
307 le.observedRecord = leaderElectionRecord
308 le.observedTime = le.clock.Now()
312 func (le *LeaderElector) maybeReportTransition() {
313 if le.observedRecord.HolderIdentity == le.reportedLeader {
316 le.reportedLeader = le.observedRecord.HolderIdentity
317 if le.config.Callbacks.OnNewLeader != nil {
318 go le.config.Callbacks.OnNewLeader(le.reportedLeader)
322 // Check will determine if the current lease is expired by more than timeout.
323 func (le *LeaderElector) Check(maxTolerableExpiredLease time.Duration) error {
325 // Currently not concerned with the case that we are hot standby
328 // If we are more than timeout seconds after the lease duration that is past the timeout
329 // on the lease renew. Time to start reporting ourselves as unhealthy. We should have
330 // died but conditions like deadlock can prevent this. (See #70819)
331 if le.clock.Since(le.observedTime) > le.config.LeaseDuration+maxTolerableExpiredLease {
332 return fmt.Errorf("failed election to renew leadership on lease %s", le.config.Name)