Skip to content

introduce watch api #35

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Oct 26, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
language: csharp
sudo: required
sudo: false
matrix:
include:
- dotnet: 2.0.0
Expand Down
3 changes: 2 additions & 1 deletion src/Kubernetes.Auth.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public Kubernetes(KubernetesClientConfiguration config)

// set credentails for the kubernernet client
this.SetCredentials(config, handler);
this.InitializeHttpClient(handler);
this.InitializeHttpClient(handler, new DelegatingHandler[]{new WatcherDelegatingHandler()});
}

private X509Certificate2 CaCert { get; set; }
Expand Down Expand Up @@ -78,6 +78,7 @@ private void SetCredentials(KubernetesClientConfiguration config, HttpClientHand
!string.IsNullOrWhiteSpace(config.ClientKey)))
{
var cert = Utils.GeneratePfx(config);

handler.ClientCertificates.Add(cert);
}
}
Expand Down
1 change: 1 addition & 0 deletions src/KubernetesClient.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="BouncyCastle.NetCore" Version="1.8.1.3" />
<PackageReference Include="Microsoft.AspNetCore.WebUtilities" Version="1.1.2" />
<PackageReference Include="Microsoft.Rest.ClientRuntime" Version="3.0.3" />
<PackageReference Include="Newtonsoft.Json" Version="10.0.2" />
<PackageReference Include="YamlDotNet.NetCore" Version="1.0.0" />
Expand Down
143 changes: 143 additions & 0 deletions src/Watcher.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
using System;
using System.IO;
using System.Runtime.Serialization;
using System.Threading;
using System.Threading.Tasks;
using k8s.Exceptions;
using Microsoft.Rest;
using Microsoft.Rest.Serialization;

namespace k8s
{
public enum WatchEventType
{
[EnumMember(Value = "ADDED")] Added,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm definitely not a csharp style expert, but it feels weird to have newlines between these values.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really love go fmt

we might introduce something like https://github.com/dotnet/codeformatter and contribution guide. I will create another issue to track this.

[EnumMember(Value = "MODIFIED")] Modified,

[EnumMember(Value = "DELETED")] Deleted,

[EnumMember(Value = "ERROR")] Error
}

public class Watcher<T> : IDisposable
{
/// <summary>
/// indicate if the watch object is alive
/// </summary>
public bool Watching { get; private set; }

private readonly CancellationTokenSource _cts;
private readonly StreamReader _streamReader;

internal Watcher(StreamReader streamReader, Action<WatchEventType, T> onEvent, Action<Exception> onError)
{
_streamReader = streamReader;
OnEvent += onEvent;
OnError += onError;

_cts = new CancellationTokenSource();

var token = _cts.Token;

Task.Run(async () =>
{
try
{
Watching = true;

while (!streamReader.EndOfStream)
{
if (token.IsCancellationRequested)
{
return;
}

var line = await streamReader.ReadLineAsync();

try
{
var @event = SafeJsonConvert.DeserializeObject<WatchEvent>(line);
OnEvent?.Invoke(@event.Type, @event.Object);
}
catch (Exception e)
{
// error if deserialized failed or onevent throws
OnError?.Invoke(e);
}
}
}
catch (Exception e)
{
// error when transport error, IOException ect
OnError?.Invoke(e);
}
finally
{
Watching = false;
}
}, token);
}

public void Dispose()
{
_cts.Cancel();
_streamReader.Dispose();
}

/// <summary>
/// add/remove callbacks when any event raised from api server
/// </summary>
public event Action<WatchEventType, T> OnEvent;

/// <summary>
/// add/remove callbacks when any exception was caught during watching
/// </summary>
public event Action<Exception> OnError;

public class WatchEvent
{
public WatchEventType Type { get; set; }

public T Object { get; set; }
}
}

public static class WatcherExt
{
/// <summary>
/// create a watch object from a call to api server with watch=true
/// </summary>
/// <typeparam name="T">type of the event object</typeparam>
/// <param name="response">the api response</param>
/// <param name="onEvent">a callback when any event raised from api server</param>
/// <param name="onError">a callbak when any exception was caught during watching</param>
/// <returns>a watch object</returns>
public static Watcher<T> Watch<T>(this HttpOperationResponse response,
Action<WatchEventType, T> onEvent,
Action<Exception> onError = null)
{
if (!(response.Response.Content is WatcherDelegatingHandler.LineSeparatedHttpContent content))
{
throw new KubernetesClientException("not a watchable request or failed response");
}

return new Watcher<T>(content.StreamReader, onEvent, onError);
}

/// <summary>
/// create a watch object from a call to api server with watch=true
/// </summary>
/// <typeparam name="T">type of the event object</typeparam>
/// <param name="response">the api response</param>
/// <param name="onEvent">a callback when any event raised from api server</param>
/// <param name="onError">a callbak when any exception was caught during watching</param>
/// <returns>a watch object</returns>
public static Watcher<T> Watch<T>(this HttpOperationResponse<T> response,
Action<WatchEventType, T> onEvent,
Action<Exception> onError = null)
{
return Watch((HttpOperationResponse) response, onEvent, onError);
}
}
}
69 changes: 69 additions & 0 deletions src/WatcherDelegatingHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.WebUtilities;

namespace k8s
{
/// <summary>
/// This HttpDelegatingHandler is to rewrite the response and return first line to autorest client
/// then use WatchExt to create a watch object which interact with the replaced http response to get watch works.
/// </summary>
internal class WatcherDelegatingHandler : DelegatingHandler
{
protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request,
CancellationToken cancellationToken)
{
var originResponse = await base.SendAsync(request, cancellationToken);

if (originResponse.IsSuccessStatusCode)
{
var query = QueryHelpers.ParseQuery(request.RequestUri.Query);

if (query.TryGetValue("watch", out var values) && values.Any(v => v == "true"))
{
originResponse.Content = new LineSeparatedHttpContent(originResponse.Content);
}
}
return originResponse;
}

internal class LineSeparatedHttpContent : HttpContent
{
private readonly HttpContent _originContent;
private Stream _originStream;

public LineSeparatedHttpContent(HttpContent originContent)
{
_originContent = originContent;
}

internal StreamReader StreamReader { get; private set; }

protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context)
{
_originStream = await _originContent.ReadAsStreamAsync();

StreamReader = new StreamReader(_originStream);

var firstLine = await StreamReader.ReadLineAsync();
var writer = new StreamWriter(stream);

// using (writer) // leave open
{
await writer.WriteAsync(firstLine);
await writer.FlushAsync();
}
}

protected override bool TryComputeLength(out long length)
{
length = 0;
return false;
}
}
}
}
11 changes: 6 additions & 5 deletions tests/AuthTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Net;
using System.Net.Http.Headers;
using System.Security.Cryptography.X509Certificates;
using System.Threading.Tasks;
using k8s.Models;
using k8s.Tests.Mock;
using Microsoft.AspNetCore.Hosting;
Expand Down Expand Up @@ -39,7 +40,7 @@ public void TestAnonymous()
using (var server = new MockKubeApiServer(cxt =>
{
cxt.Response.StatusCode = (int) HttpStatusCode.Unauthorized;
return false;
return Task.FromResult(false);
}))
{
var client = new Kubernetes(new KubernetesClientConfiguration
Expand Down Expand Up @@ -69,10 +70,10 @@ public void TestBasicAuth()
if (header != expect)
{
cxt.Response.StatusCode = (int) HttpStatusCode.Unauthorized;
return false;
return Task.FromResult(false);
}

return true;
return Task.FromResult(true);
}))
{
{
Expand Down Expand Up @@ -256,10 +257,10 @@ public void TestToken()
if (header != expect)
{
cxt.Response.StatusCode = (int) HttpStatusCode.Unauthorized;
return false;
return Task.FromResult(false);
}

return true;
return Task.FromResult(true);
}))
{
{
Expand Down
12 changes: 5 additions & 7 deletions tests/Mock/MockKubeApiServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,19 @@ public class MockKubeApiServer : IDisposable

private readonly IWebHost _webHost;

public MockKubeApiServer(Func<HttpContext, bool> shouldNext = null, Action<ListenOptions> listenConfigure = null,
public MockKubeApiServer(Func<HttpContext, Task<bool>> shouldNext = null, Action<ListenOptions> listenConfigure = null,
string resp = MockPodResponse)
{
shouldNext = shouldNext ?? (_ => true);
shouldNext = shouldNext ?? (_ => Task.FromResult(true));
listenConfigure = listenConfigure ?? (_ => { });

_webHost = WebHost.CreateDefaultBuilder()
.Configure(app => app.Run(httpContext =>
.Configure(app => app.Run(async httpContext =>
{
if (shouldNext(httpContext))
if (await shouldNext(httpContext))
{
httpContext.Response.WriteAsync(resp);
await httpContext.Response.WriteAsync(resp);
}

return Task.Delay(0);
}))
.UseKestrel(options => { options.Listen(IPAddress.Loopback, 0, listenConfigure); })
.Build();
Expand Down
Loading