Skip to content

Commit 9a90881

Browse files
authored
leader election (#537)
* init leader leaderelection * fix format * remove unused import * add config map * fix space * add multi lock * lease lock * cp case LeaderElection * more test cases * port all testcases from java * fix space * fix default timeout * try to fix flasky gh action by reducing renew deadline * try debug failed gh action
1 parent c0e96f5 commit 9a90881

File tree

12 files changed

+1283
-0
lines changed

12 files changed

+1283
-0
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
using System.Threading;
2+
using System.Threading.Tasks;
3+
4+
namespace k8s.LeaderElection
5+
{
6+
/// <summary>
7+
/// ILock offers a common interface for locking on arbitrary resources used in leader election. The Interface is used to hide the details on specific implementations in order to allow them to change over time.
8+
/// </summary>
9+
public interface ILock
10+
{
11+
/// <summary>
12+
/// Get returns the LeaderElectionRecord
13+
/// </summary>
14+
/// <param name="cancellationToken">token to cancel the task</param>
15+
/// <returns>the record</returns>
16+
Task<LeaderElectionRecord> GetAsync(CancellationToken cancellationToken = default);
17+
18+
/// <summary>
19+
/// Create attempts to create a LeaderElectionRecord
20+
/// </summary>
21+
/// <param name="record">record to create</param>
22+
/// <param name="cancellationToken">token to cancel the task</param>
23+
/// <returns>true if created</returns>
24+
Task<bool> CreateAsync(LeaderElectionRecord record, CancellationToken cancellationToken = default);
25+
26+
/// <summary>
27+
/// Update will update and existing LeaderElectionRecord
28+
/// </summary>
29+
/// <param name="record">record to create</param>
30+
/// <param name="cancellationToken">token to cancel the task</param>
31+
/// <returns>true if updated</returns>
32+
Task<bool> UpdateAsync(LeaderElectionRecord record, CancellationToken cancellationToken = default);
33+
34+
35+
/// <summary>
36+
/// the locks Identity
37+
/// </summary>
38+
string Identity { get; }
39+
40+
/// <summary>
41+
/// Describe is used to convert details on current resource lock into a string
42+
/// </summary>
43+
/// <returns>resource lock description</returns>
44+
string Describe();
45+
}
46+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
using System;
2+
3+
namespace k8s.LeaderElection
4+
{
5+
public class LeaderElectionConfig
6+
{
7+
public ILock Lock { get; set; }
8+
9+
public TimeSpan LeaseDuration { get; set; } = TimeSpan.FromSeconds(15);
10+
11+
public TimeSpan RenewDeadline { get; set; } = TimeSpan.FromSeconds(10);
12+
13+
public TimeSpan RetryPeriod { get; set; } = TimeSpan.FromSeconds(2);
14+
15+
public LeaderElectionConfig(ILock @lock)
16+
{
17+
Lock = @lock;
18+
}
19+
}
20+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
using System;
2+
3+
namespace k8s.LeaderElection
4+
{
5+
/// <summary>
6+
/// LeaderElectionRecord is the record that is stored in the leader election annotation.
7+
/// This information should be used for observational purposes only and could be replaced with a random string (e.g. UUID) with only slight modification of this code.
8+
/// </summary>
9+
public class LeaderElectionRecord
10+
{
11+
/// <summary>
12+
/// the ID that owns the lease. If empty, no one owns this lease and all callers may acquire.
13+
/// </summary>
14+
public string HolderIdentity { get; set; }
15+
16+
/// <summary>
17+
/// LeaseDuration in seconds
18+
/// </summary>
19+
public int LeaseDurationSeconds { get; set; }
20+
21+
/// <summary>
22+
/// acquire time
23+
/// </summary>
24+
// public DateTimeOffset? AcquireTime { get; set; }
25+
public DateTime? AcquireTime { get; set; }
26+
27+
/// <summary>
28+
/// renew time
29+
/// </summary>
30+
// public DateTimeOffset? RenewTime { get; set; }
31+
public DateTime? RenewTime { get; set; }
32+
33+
/// <summary>
34+
/// leader transitions
35+
/// </summary>
36+
public int LeaderTransitions { get; set; }
37+
38+
protected bool Equals(LeaderElectionRecord other)
39+
{
40+
return HolderIdentity == other?.HolderIdentity && Nullable.Equals(AcquireTime, other.AcquireTime) && Nullable.Equals(RenewTime, other.RenewTime);
41+
}
42+
43+
public override bool Equals(object obj)
44+
{
45+
if (ReferenceEquals(null, obj))
46+
{
47+
return false;
48+
}
49+
50+
if (ReferenceEquals(this, obj))
51+
{
52+
return true;
53+
}
54+
55+
if (obj.GetType() != this.GetType())
56+
{
57+
return false;
58+
}
59+
60+
return Equals((LeaderElectionRecord)obj);
61+
}
62+
63+
public override int GetHashCode()
64+
{
65+
unchecked
66+
{
67+
var hashCode = (HolderIdentity != null ? HolderIdentity.GetHashCode() : 0);
68+
hashCode = (hashCode * 397) ^ AcquireTime.GetHashCode();
69+
hashCode = (hashCode * 397) ^ RenewTime.GetHashCode();
70+
return hashCode;
71+
}
72+
}
73+
}
74+
}
Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
using System;
2+
using System.Net;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
using Microsoft.Rest;
6+
7+
namespace k8s.LeaderElection
8+
{
9+
public class LeaderElector : IDisposable
10+
{
11+
private const double JitterFactor = 1.2;
12+
13+
private readonly LeaderElectionConfig config;
14+
15+
/// <summary>
16+
/// OnStartedLeading is called when a LeaderElector client starts leading
17+
/// </summary>
18+
public event Action OnStartedLeading;
19+
20+
/// <summary>
21+
/// OnStoppedLeading is called when a LeaderElector client stops leading
22+
/// </summary>
23+
public event Action OnStoppedLeading;
24+
25+
/// <summary>
26+
/// OnNewLeader is called when the client observes a leader that is
27+
/// not the previously observed leader. This includes the first observed
28+
/// leader when the client starts.
29+
/// </summary>
30+
public event Action<string> OnNewLeader;
31+
32+
private volatile LeaderElectionRecord observedRecord;
33+
private DateTimeOffset observedTime = DateTimeOffset.MinValue;
34+
private string reportedLeader;
35+
36+
public LeaderElector(LeaderElectionConfig config)
37+
{
38+
this.config = config;
39+
}
40+
41+
public bool IsLeader()
42+
{
43+
return observedRecord?.HolderIdentity != null && observedRecord?.HolderIdentity == config.Lock.Identity;
44+
}
45+
46+
public string GetLeader()
47+
{
48+
return observedRecord?.HolderIdentity;
49+
}
50+
51+
public async Task RunAsync(CancellationToken cancellationToken = default)
52+
{
53+
await AcquireAsync(cancellationToken).ConfigureAwait(false);
54+
55+
try
56+
{
57+
OnStartedLeading?.Invoke();
58+
59+
// renew loop
60+
for (; ; )
61+
{
62+
cancellationToken.ThrowIfCancellationRequested();
63+
var acq = Task.Run(async () =>
64+
{
65+
try
66+
{
67+
while (!await TryAcquireOrRenew(cancellationToken).ConfigureAwait(false))
68+
{
69+
await Task.Delay(config.RetryPeriod, cancellationToken).ConfigureAwait(false);
70+
MaybeReportTransition();
71+
}
72+
}
73+
catch
74+
{
75+
// ignore
76+
return false;
77+
}
78+
79+
return true;
80+
});
81+
82+
83+
if (await Task.WhenAny(acq, Task.Delay(config.RenewDeadline, cancellationToken))
84+
.ConfigureAwait(false) == acq)
85+
{
86+
var succ = await acq.ConfigureAwait(false);
87+
88+
if (succ)
89+
{
90+
await Task.Delay(config.RetryPeriod, cancellationToken).ConfigureAwait(false);
91+
// retry
92+
continue;
93+
}
94+
95+
// renew failed
96+
}
97+
98+
// timeout
99+
break;
100+
}
101+
}
102+
finally
103+
{
104+
OnStoppedLeading?.Invoke();
105+
}
106+
}
107+
108+
private async Task<bool> TryAcquireOrRenew(CancellationToken cancellationToken)
109+
{
110+
var l = config.Lock;
111+
var leaderElectionRecord = new LeaderElectionRecord()
112+
{
113+
HolderIdentity = l.Identity,
114+
LeaseDurationSeconds = config.LeaseDuration.Seconds,
115+
AcquireTime = DateTime.UtcNow,
116+
RenewTime = DateTime.UtcNow,
117+
LeaderTransitions = 0,
118+
};
119+
120+
// 1. obtain or create the ElectionRecord
121+
122+
LeaderElectionRecord oldLeaderElectionRecord = null;
123+
try
124+
{
125+
oldLeaderElectionRecord = await l.GetAsync(cancellationToken).ConfigureAwait(false);
126+
}
127+
catch (HttpOperationException e)
128+
{
129+
if (e.Response.StatusCode != HttpStatusCode.NotFound)
130+
{
131+
return false;
132+
}
133+
}
134+
135+
if (oldLeaderElectionRecord?.AcquireTime == null ||
136+
oldLeaderElectionRecord?.RenewTime == null ||
137+
oldLeaderElectionRecord?.HolderIdentity == null)
138+
{
139+
var created = await l.CreateAsync(leaderElectionRecord, cancellationToken).ConfigureAwait(false);
140+
if (created)
141+
{
142+
observedRecord = leaderElectionRecord;
143+
observedTime = DateTimeOffset.Now;
144+
return true;
145+
}
146+
147+
return false;
148+
}
149+
150+
151+
// 2. Record obtained, check the Identity & Time
152+
if (!Equals(observedRecord, oldLeaderElectionRecord))
153+
{
154+
observedRecord = oldLeaderElectionRecord;
155+
observedTime = DateTimeOffset.Now;
156+
}
157+
158+
if (!string.IsNullOrEmpty(oldLeaderElectionRecord.HolderIdentity)
159+
&& observedTime + config.LeaseDuration > DateTimeOffset.Now
160+
&& !IsLeader())
161+
{
162+
// lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity
163+
return false;
164+
}
165+
166+
// 3. We're going to try to update. The leaderElectionRecord is set to it's default
167+
// here. Let's correct it before updating.
168+
if (IsLeader())
169+
{
170+
leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime;
171+
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions;
172+
}
173+
else
174+
{
175+
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1;
176+
}
177+
178+
var updated = await l.UpdateAsync(leaderElectionRecord, cancellationToken).ConfigureAwait(false);
179+
if (!updated)
180+
{
181+
return false;
182+
}
183+
184+
observedRecord = leaderElectionRecord;
185+
observedTime = DateTimeOffset.Now;
186+
187+
return true;
188+
}
189+
190+
private async Task AcquireAsync(CancellationToken cancellationToken)
191+
{
192+
for (; ; )
193+
{
194+
try
195+
{
196+
var delay = config.RetryPeriod.Milliseconds;
197+
var acq = TryAcquireOrRenew(cancellationToken);
198+
199+
if (await Task.WhenAny(acq, Task.Delay(delay, cancellationToken))
200+
.ConfigureAwait(false) == acq)
201+
{
202+
if (await acq.ConfigureAwait(false))
203+
{
204+
return;
205+
}
206+
}
207+
208+
delay = (int)(delay * JitterFactor);
209+
}
210+
finally
211+
{
212+
MaybeReportTransition();
213+
}
214+
}
215+
}
216+
217+
private void MaybeReportTransition()
218+
{
219+
if (observedRecord == null)
220+
{
221+
return;
222+
}
223+
224+
if (observedRecord.HolderIdentity == reportedLeader)
225+
{
226+
return;
227+
}
228+
229+
reportedLeader = observedRecord.HolderIdentity;
230+
231+
OnNewLeader?.Invoke(reportedLeader);
232+
}
233+
234+
protected virtual void Dispose(bool disposing)
235+
{
236+
if (disposing)
237+
{
238+
}
239+
}
240+
241+
/// <inheritdoc/>
242+
public void Dispose()
243+
{
244+
Dispose(true);
245+
GC.SuppressFinalize(this);
246+
}
247+
}
248+
}

0 commit comments

Comments
 (0)