Skip to content

Commit b091d89

Browse files
committed
introduce watch api
1 parent c108799 commit b091d89

File tree

3 files changed

+163
-1
lines changed

3 files changed

+163
-1
lines changed

src/Kubernetes.Auth.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public Kubernetes(KubernetesClientConfiguration config)
4242

4343
// set credentails for the kubernernet client
4444
this.SetCredentials(config, handler);
45-
this.InitializeHttpClient(handler);
45+
this.InitializeHttpClient(handler, new DelegatingHandler[]{new WatcherDelegatingHandler()});
4646
}
4747

4848
private X509Certificate2 CaCert { get; set; }
@@ -71,6 +71,7 @@ private void SetCredentials(KubernetesClientConfiguration config, HttpClientHand
7171
!string.IsNullOrWhiteSpace(config.ClientKey)))
7272
{
7373
var cert = Utils.GeneratePfx(config);
74+
7475
handler.ClientCertificates.Add(cert);
7576
}
7677
else

src/Watcher.cs

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
using System;
2+
using System.IO;
3+
using System.Runtime.Serialization;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using k8s.Exceptions;
7+
using Microsoft.Rest;
8+
using Microsoft.Rest.Serialization;
9+
10+
namespace k8s
11+
{
12+
public enum WatchEventType
13+
{
14+
[EnumMember(Value = "ADDED")] Added,
15+
16+
[EnumMember(Value = "MODIFIED")] Modified,
17+
18+
[EnumMember(Value = "DELETED")] Deleted,
19+
20+
[EnumMember(Value = "ERROR")] Error
21+
}
22+
23+
public class Watcher<T> : IDisposable
24+
{
25+
private readonly StreamReader _streamReader;
26+
private readonly CancellationTokenSource _cts;
27+
28+
public Watcher(StreamReader streamReader, Action<WatchEventType, T> onEvent, Action<Exception> onError)
29+
{
30+
_streamReader = streamReader;
31+
OnEvent += onEvent;
32+
OnError += onError;
33+
34+
_cts = new CancellationTokenSource();
35+
36+
var token = _cts.Token;
37+
38+
Task.Run(async () =>
39+
{
40+
while (!streamReader.EndOfStream)
41+
{
42+
if (token.IsCancellationRequested)
43+
{
44+
return;
45+
}
46+
47+
try
48+
{
49+
var line = await streamReader.ReadLineAsync();
50+
var @event = SafeJsonConvert.DeserializeObject<WatchEvent>(line);
51+
52+
OnEvent?.Invoke(@event.Type, @event.Object);
53+
}
54+
catch (Exception e)
55+
{
56+
OnError?.Invoke(e);
57+
}
58+
}
59+
}, token);
60+
}
61+
62+
public void Dispose()
63+
{
64+
_cts.Cancel();
65+
_streamReader.Dispose();
66+
}
67+
68+
public event Action<WatchEventType, T> OnEvent;
69+
public event Action<Exception> OnError;
70+
71+
public class WatchEvent
72+
{
73+
public WatchEventType Type { get; set; }
74+
75+
public T Object { get; set; }
76+
}
77+
}
78+
79+
public static class WatcherExt
80+
{
81+
public static Watcher<T> Watch<T>(this HttpOperationResponse response,
82+
Action<WatchEventType, T> onEvent,
83+
Action<Exception> onError = null)
84+
{
85+
if (!(response.Response.Content is WatcherDelegatingHandler.LineSeparatedHttpContent content))
86+
{
87+
throw new KubernetesClientException("not a watchable request or failed response");
88+
}
89+
90+
return new Watcher<T>(content.StreamReader, onEvent, onError);
91+
}
92+
93+
public static Watcher<T> Watch<T>(this HttpOperationResponse<T> response,
94+
Action<WatchEventType, T> onEvent,
95+
Action<Exception> onError = null)
96+
{
97+
return Watch((HttpOperationResponse) response, onEvent, onError);
98+
}
99+
}
100+
}

src/WatcherDelegatingHandler.cs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
using System.IO;
2+
using System.Net;
3+
using System.Net.Http;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
7+
namespace k8s
8+
{
9+
internal class WatcherDelegatingHandler : DelegatingHandler
10+
{
11+
protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request,
12+
CancellationToken cancellationToken)
13+
{
14+
var originResponse = await base.SendAsync(request, cancellationToken);
15+
16+
if (originResponse.IsSuccessStatusCode)
17+
{
18+
if ($"{request.RequestUri.Query}".Contains("watch=true"))
19+
{
20+
originResponse.Content = new LineSeparatedHttpContent(originResponse.Content);
21+
}
22+
}
23+
return originResponse;
24+
}
25+
26+
internal class LineSeparatedHttpContent : HttpContent
27+
{
28+
private readonly HttpContent _originContent;
29+
private Stream _originStream;
30+
31+
public LineSeparatedHttpContent(HttpContent originContent)
32+
{
33+
_originContent = originContent;
34+
}
35+
36+
internal StreamReader StreamReader { get; private set; }
37+
38+
protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context)
39+
{
40+
_originStream = await _originContent.ReadAsStreamAsync();
41+
42+
StreamReader = new StreamReader(_originStream);
43+
44+
var firstLine = await StreamReader.ReadLineAsync();
45+
var writer = new StreamWriter(stream);
46+
47+
// using (writer) // leave open
48+
{
49+
await writer.WriteAsync(firstLine);
50+
await writer.FlushAsync();
51+
}
52+
}
53+
54+
protected override bool TryComputeLength(out long length)
55+
{
56+
length = 0;
57+
return false;
58+
}
59+
}
60+
}
61+
}

0 commit comments

Comments
 (0)