Skip to content

Commit c74d625

Browse files
Merge pull request #35 from tg123/watch
introduce watch api
2 parents fe6de92 + 72ccaa6 commit c74d625

File tree

8 files changed

+501
-14
lines changed

8 files changed

+501
-14
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
language: csharp
2-
sudo: required
2+
sudo: false
33
matrix:
44
include:
55
- dotnet: 2.0.0

src/Kubernetes.Auth.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public Kubernetes(KubernetesClientConfiguration config)
4545

4646
// set credentails for the kubernernet client
4747
this.SetCredentials(config, handler);
48-
this.InitializeHttpClient(handler);
48+
this.InitializeHttpClient(handler, new DelegatingHandler[]{new WatcherDelegatingHandler()});
4949
}
5050

5151
private X509Certificate2 CaCert { get; set; }
@@ -78,6 +78,7 @@ private void SetCredentials(KubernetesClientConfiguration config, HttpClientHand
7878
!string.IsNullOrWhiteSpace(config.ClientKeyFilePath)))
7979
{
8080
var cert = Utils.GeneratePfx(config);
81+
8182
handler.ClientCertificates.Add(cert);
8283
}
8384
}

src/KubernetesClient.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
</ItemGroup>
99
<ItemGroup>
1010
<PackageReference Include="BouncyCastle.NetCore" Version="1.8.1.3" />
11+
<PackageReference Include="Microsoft.AspNetCore.WebUtilities" Version="1.1.2" />
1112
<PackageReference Include="Microsoft.Rest.ClientRuntime" Version="3.0.3" />
1213
<PackageReference Include="Newtonsoft.Json" Version="10.0.2" />
1314
<PackageReference Include="YamlDotNet.NetCore" Version="1.0.0" />

src/Watcher.cs

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
using System;
2+
using System.IO;
3+
using System.Runtime.Serialization;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using k8s.Exceptions;
7+
using Microsoft.Rest;
8+
using Microsoft.Rest.Serialization;
9+
10+
namespace k8s
11+
{
12+
public enum WatchEventType
13+
{
14+
[EnumMember(Value = "ADDED")] Added,
15+
16+
[EnumMember(Value = "MODIFIED")] Modified,
17+
18+
[EnumMember(Value = "DELETED")] Deleted,
19+
20+
[EnumMember(Value = "ERROR")] Error
21+
}
22+
23+
public class Watcher<T> : IDisposable
24+
{
25+
/// <summary>
26+
/// indicate if the watch object is alive
27+
/// </summary>
28+
public bool Watching { get; private set; }
29+
30+
private readonly CancellationTokenSource _cts;
31+
private readonly StreamReader _streamReader;
32+
33+
internal Watcher(StreamReader streamReader, Action<WatchEventType, T> onEvent, Action<Exception> onError)
34+
{
35+
_streamReader = streamReader;
36+
OnEvent += onEvent;
37+
OnError += onError;
38+
39+
_cts = new CancellationTokenSource();
40+
41+
var token = _cts.Token;
42+
43+
Task.Run(async () =>
44+
{
45+
try
46+
{
47+
Watching = true;
48+
49+
while (!streamReader.EndOfStream)
50+
{
51+
if (token.IsCancellationRequested)
52+
{
53+
return;
54+
}
55+
56+
var line = await streamReader.ReadLineAsync();
57+
58+
try
59+
{
60+
var @event = SafeJsonConvert.DeserializeObject<WatchEvent>(line);
61+
OnEvent?.Invoke(@event.Type, @event.Object);
62+
}
63+
catch (Exception e)
64+
{
65+
// error if deserialized failed or onevent throws
66+
OnError?.Invoke(e);
67+
}
68+
}
69+
}
70+
catch (Exception e)
71+
{
72+
// error when transport error, IOException ect
73+
OnError?.Invoke(e);
74+
}
75+
finally
76+
{
77+
Watching = false;
78+
}
79+
}, token);
80+
}
81+
82+
public void Dispose()
83+
{
84+
_cts.Cancel();
85+
_streamReader.Dispose();
86+
}
87+
88+
/// <summary>
89+
/// add/remove callbacks when any event raised from api server
90+
/// </summary>
91+
public event Action<WatchEventType, T> OnEvent;
92+
93+
/// <summary>
94+
/// add/remove callbacks when any exception was caught during watching
95+
/// </summary>
96+
public event Action<Exception> OnError;
97+
98+
public class WatchEvent
99+
{
100+
public WatchEventType Type { get; set; }
101+
102+
public T Object { get; set; }
103+
}
104+
}
105+
106+
public static class WatcherExt
107+
{
108+
/// <summary>
109+
/// create a watch object from a call to api server with watch=true
110+
/// </summary>
111+
/// <typeparam name="T">type of the event object</typeparam>
112+
/// <param name="response">the api response</param>
113+
/// <param name="onEvent">a callback when any event raised from api server</param>
114+
/// <param name="onError">a callbak when any exception was caught during watching</param>
115+
/// <returns>a watch object</returns>
116+
public static Watcher<T> Watch<T>(this HttpOperationResponse response,
117+
Action<WatchEventType, T> onEvent,
118+
Action<Exception> onError = null)
119+
{
120+
if (!(response.Response.Content is WatcherDelegatingHandler.LineSeparatedHttpContent content))
121+
{
122+
throw new KubernetesClientException("not a watchable request or failed response");
123+
}
124+
125+
return new Watcher<T>(content.StreamReader, onEvent, onError);
126+
}
127+
128+
/// <summary>
129+
/// create a watch object from a call to api server with watch=true
130+
/// </summary>
131+
/// <typeparam name="T">type of the event object</typeparam>
132+
/// <param name="response">the api response</param>
133+
/// <param name="onEvent">a callback when any event raised from api server</param>
134+
/// <param name="onError">a callbak when any exception was caught during watching</param>
135+
/// <returns>a watch object</returns>
136+
public static Watcher<T> Watch<T>(this HttpOperationResponse<T> response,
137+
Action<WatchEventType, T> onEvent,
138+
Action<Exception> onError = null)
139+
{
140+
return Watch((HttpOperationResponse) response, onEvent, onError);
141+
}
142+
}
143+
}

src/WatcherDelegatingHandler.cs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
using System.IO;
2+
using System.Linq;
3+
using System.Net;
4+
using System.Net.Http;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using Microsoft.AspNetCore.WebUtilities;
8+
9+
namespace k8s
10+
{
11+
/// <summary>
12+
/// This HttpDelegatingHandler is to rewrite the response and return first line to autorest client
13+
/// then use WatchExt to create a watch object which interact with the replaced http response to get watch works.
14+
/// </summary>
15+
internal class WatcherDelegatingHandler : DelegatingHandler
16+
{
17+
protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request,
18+
CancellationToken cancellationToken)
19+
{
20+
var originResponse = await base.SendAsync(request, cancellationToken);
21+
22+
if (originResponse.IsSuccessStatusCode)
23+
{
24+
var query = QueryHelpers.ParseQuery(request.RequestUri.Query);
25+
26+
if (query.TryGetValue("watch", out var values) && values.Any(v => v == "true"))
27+
{
28+
originResponse.Content = new LineSeparatedHttpContent(originResponse.Content);
29+
}
30+
}
31+
return originResponse;
32+
}
33+
34+
internal class LineSeparatedHttpContent : HttpContent
35+
{
36+
private readonly HttpContent _originContent;
37+
private Stream _originStream;
38+
39+
public LineSeparatedHttpContent(HttpContent originContent)
40+
{
41+
_originContent = originContent;
42+
}
43+
44+
internal StreamReader StreamReader { get; private set; }
45+
46+
protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context)
47+
{
48+
_originStream = await _originContent.ReadAsStreamAsync();
49+
50+
StreamReader = new StreamReader(_originStream);
51+
52+
var firstLine = await StreamReader.ReadLineAsync();
53+
var writer = new StreamWriter(stream);
54+
55+
// using (writer) // leave open
56+
{
57+
await writer.WriteAsync(firstLine);
58+
await writer.FlushAsync();
59+
}
60+
}
61+
62+
protected override bool TryComputeLength(out long length)
63+
{
64+
length = 0;
65+
return false;
66+
}
67+
}
68+
}
69+
}

tests/AuthTests.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System.Net;
55
using System.Net.Http.Headers;
66
using System.Security.Cryptography.X509Certificates;
7+
using System.Threading.Tasks;
78
using k8s.Models;
89
using k8s.Tests.Mock;
910
using Microsoft.AspNetCore.Hosting;
@@ -39,7 +40,7 @@ public void TestAnonymous()
3940
using (var server = new MockKubeApiServer(cxt =>
4041
{
4142
cxt.Response.StatusCode = (int) HttpStatusCode.Unauthorized;
42-
return false;
43+
return Task.FromResult(false);
4344
}))
4445
{
4546
var client = new Kubernetes(new KubernetesClientConfiguration
@@ -69,10 +70,10 @@ public void TestBasicAuth()
6970
if (header != expect)
7071
{
7172
cxt.Response.StatusCode = (int) HttpStatusCode.Unauthorized;
72-
return false;
73+
return Task.FromResult(false);
7374
}
7475

75-
return true;
76+
return Task.FromResult(true);
7677
}))
7778
{
7879
{
@@ -256,10 +257,10 @@ public void TestToken()
256257
if (header != expect)
257258
{
258259
cxt.Response.StatusCode = (int) HttpStatusCode.Unauthorized;
259-
return false;
260+
return Task.FromResult(false);
260261
}
261262

262-
return true;
263+
return Task.FromResult(true);
263264
}))
264265
{
265266
{

tests/Mock/MockKubeApiServer.cs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,19 @@ public class MockKubeApiServer : IDisposable
2020

2121
private readonly IWebHost _webHost;
2222

23-
public MockKubeApiServer(Func<HttpContext, bool> shouldNext = null, Action<ListenOptions> listenConfigure = null,
23+
public MockKubeApiServer(Func<HttpContext, Task<bool>> shouldNext = null, Action<ListenOptions> listenConfigure = null,
2424
string resp = MockPodResponse)
2525
{
26-
shouldNext = shouldNext ?? (_ => true);
26+
shouldNext = shouldNext ?? (_ => Task.FromResult(true));
2727
listenConfigure = listenConfigure ?? (_ => { });
2828

2929
_webHost = WebHost.CreateDefaultBuilder()
30-
.Configure(app => app.Run(httpContext =>
30+
.Configure(app => app.Run(async httpContext =>
3131
{
32-
if (shouldNext(httpContext))
32+
if (await shouldNext(httpContext))
3333
{
34-
httpContext.Response.WriteAsync(resp);
34+
await httpContext.Response.WriteAsync(resp);
3535
}
36-
37-
return Task.Delay(0);
3836
}))
3937
.UseKestrel(options => { options.Listen(IPAddress.Loopback, 0, listenConfigure); })
4038
.Build();

0 commit comments

Comments
 (0)