diff --git a/.travis.yml b/.travis.yml index bfaec0c38..6051ec73a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,5 @@ language: csharp -sudo: required +sudo: false matrix: include: - dotnet: 2.0.0 diff --git a/src/Kubernetes.Auth.cs b/src/Kubernetes.Auth.cs index d3fc3b5c8..f845b8d1d 100644 --- a/src/Kubernetes.Auth.cs +++ b/src/Kubernetes.Auth.cs @@ -45,7 +45,7 @@ public Kubernetes(KubernetesClientConfiguration config) // set credentails for the kubernernet client this.SetCredentials(config, handler); - this.InitializeHttpClient(handler); + this.InitializeHttpClient(handler, new DelegatingHandler[]{new WatcherDelegatingHandler()}); } private X509Certificate2 CaCert { get; set; } @@ -78,6 +78,7 @@ private void SetCredentials(KubernetesClientConfiguration config, HttpClientHand !string.IsNullOrWhiteSpace(config.ClientKey))) { var cert = Utils.GeneratePfx(config); + handler.ClientCertificates.Add(cert); } } diff --git a/src/KubernetesClient.csproj b/src/KubernetesClient.csproj index 38559e0f1..1c2e1c4a2 100644 --- a/src/KubernetesClient.csproj +++ b/src/KubernetesClient.csproj @@ -8,6 +8,7 @@ + diff --git a/src/Watcher.cs b/src/Watcher.cs new file mode 100644 index 000000000..8a84487b1 --- /dev/null +++ b/src/Watcher.cs @@ -0,0 +1,143 @@ +using System; +using System.IO; +using System.Runtime.Serialization; +using System.Threading; +using System.Threading.Tasks; +using k8s.Exceptions; +using Microsoft.Rest; +using Microsoft.Rest.Serialization; + +namespace k8s +{ + public enum WatchEventType + { + [EnumMember(Value = "ADDED")] Added, + + [EnumMember(Value = "MODIFIED")] Modified, + + [EnumMember(Value = "DELETED")] Deleted, + + [EnumMember(Value = "ERROR")] Error + } + + public class Watcher : IDisposable + { + /// + /// indicate if the watch object is alive + /// + public bool Watching { get; private set; } + + private readonly CancellationTokenSource _cts; + private readonly StreamReader _streamReader; + + internal Watcher(StreamReader streamReader, Action onEvent, Action onError) + { + _streamReader = streamReader; + OnEvent += onEvent; + OnError += onError; + + _cts = new CancellationTokenSource(); + + var token = _cts.Token; + + Task.Run(async () => + { + try + { + Watching = true; + + while (!streamReader.EndOfStream) + { + if (token.IsCancellationRequested) + { + return; + } + + var line = await streamReader.ReadLineAsync(); + + try + { + var @event = SafeJsonConvert.DeserializeObject(line); + OnEvent?.Invoke(@event.Type, @event.Object); + } + catch (Exception e) + { + // error if deserialized failed or onevent throws + OnError?.Invoke(e); + } + } + } + catch (Exception e) + { + // error when transport error, IOException ect + OnError?.Invoke(e); + } + finally + { + Watching = false; + } + }, token); + } + + public void Dispose() + { + _cts.Cancel(); + _streamReader.Dispose(); + } + + /// + /// add/remove callbacks when any event raised from api server + /// + public event Action OnEvent; + + /// + /// add/remove callbacks when any exception was caught during watching + /// + public event Action OnError; + + public class WatchEvent + { + public WatchEventType Type { get; set; } + + public T Object { get; set; } + } + } + + public static class WatcherExt + { + /// + /// create a watch object from a call to api server with watch=true + /// + /// type of the event object + /// the api response + /// a callback when any event raised from api server + /// a callbak when any exception was caught during watching + /// a watch object + public static Watcher Watch(this HttpOperationResponse response, + Action onEvent, + Action onError = null) + { + if (!(response.Response.Content is WatcherDelegatingHandler.LineSeparatedHttpContent content)) + { + throw new KubernetesClientException("not a watchable request or failed response"); + } + + return new Watcher(content.StreamReader, onEvent, onError); + } + + /// + /// create a watch object from a call to api server with watch=true + /// + /// type of the event object + /// the api response + /// a callback when any event raised from api server + /// a callbak when any exception was caught during watching + /// a watch object + public static Watcher Watch(this HttpOperationResponse response, + Action onEvent, + Action onError = null) + { + return Watch((HttpOperationResponse) response, onEvent, onError); + } + } +} \ No newline at end of file diff --git a/src/WatcherDelegatingHandler.cs b/src/WatcherDelegatingHandler.cs new file mode 100644 index 000000000..29a80db66 --- /dev/null +++ b/src/WatcherDelegatingHandler.cs @@ -0,0 +1,69 @@ +using System.IO; +using System.Linq; +using System.Net; +using System.Net.Http; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.AspNetCore.WebUtilities; + +namespace k8s +{ + /// + /// This HttpDelegatingHandler is to rewrite the response and return first line to autorest client + /// then use WatchExt to create a watch object which interact with the replaced http response to get watch works. + /// + internal class WatcherDelegatingHandler : DelegatingHandler + { + protected override async Task SendAsync(HttpRequestMessage request, + CancellationToken cancellationToken) + { + var originResponse = await base.SendAsync(request, cancellationToken); + + if (originResponse.IsSuccessStatusCode) + { + var query = QueryHelpers.ParseQuery(request.RequestUri.Query); + + if (query.TryGetValue("watch", out var values) && values.Any(v => v == "true")) + { + originResponse.Content = new LineSeparatedHttpContent(originResponse.Content); + } + } + return originResponse; + } + + internal class LineSeparatedHttpContent : HttpContent + { + private readonly HttpContent _originContent; + private Stream _originStream; + + public LineSeparatedHttpContent(HttpContent originContent) + { + _originContent = originContent; + } + + internal StreamReader StreamReader { get; private set; } + + protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context) + { + _originStream = await _originContent.ReadAsStreamAsync(); + + StreamReader = new StreamReader(_originStream); + + var firstLine = await StreamReader.ReadLineAsync(); + var writer = new StreamWriter(stream); + +// using (writer) // leave open + { + await writer.WriteAsync(firstLine); + await writer.FlushAsync(); + } + } + + protected override bool TryComputeLength(out long length) + { + length = 0; + return false; + } + } + } +} \ No newline at end of file diff --git a/tests/AuthTests.cs b/tests/AuthTests.cs index 62115675d..40674e089 100644 --- a/tests/AuthTests.cs +++ b/tests/AuthTests.cs @@ -4,6 +4,7 @@ using System.Net; using System.Net.Http.Headers; using System.Security.Cryptography.X509Certificates; +using System.Threading.Tasks; using k8s.Models; using k8s.Tests.Mock; using Microsoft.AspNetCore.Hosting; @@ -39,7 +40,7 @@ public void TestAnonymous() using (var server = new MockKubeApiServer(cxt => { cxt.Response.StatusCode = (int) HttpStatusCode.Unauthorized; - return false; + return Task.FromResult(false); })) { var client = new Kubernetes(new KubernetesClientConfiguration @@ -69,10 +70,10 @@ public void TestBasicAuth() if (header != expect) { cxt.Response.StatusCode = (int) HttpStatusCode.Unauthorized; - return false; + return Task.FromResult(false); } - return true; + return Task.FromResult(true); })) { { @@ -256,10 +257,10 @@ public void TestToken() if (header != expect) { cxt.Response.StatusCode = (int) HttpStatusCode.Unauthorized; - return false; + return Task.FromResult(false); } - return true; + return Task.FromResult(true); })) { { diff --git a/tests/Mock/MockKubeApiServer.cs b/tests/Mock/MockKubeApiServer.cs index aa0831037..d6da5cbad 100644 --- a/tests/Mock/MockKubeApiServer.cs +++ b/tests/Mock/MockKubeApiServer.cs @@ -20,21 +20,19 @@ public class MockKubeApiServer : IDisposable private readonly IWebHost _webHost; - public MockKubeApiServer(Func shouldNext = null, Action listenConfigure = null, + public MockKubeApiServer(Func> shouldNext = null, Action listenConfigure = null, string resp = MockPodResponse) { - shouldNext = shouldNext ?? (_ => true); + shouldNext = shouldNext ?? (_ => Task.FromResult(true)); listenConfigure = listenConfigure ?? (_ => { }); _webHost = WebHost.CreateDefaultBuilder() - .Configure(app => app.Run(httpContext => + .Configure(app => app.Run(async httpContext => { - if (shouldNext(httpContext)) + if (await shouldNext(httpContext)) { - httpContext.Response.WriteAsync(resp); + await httpContext.Response.WriteAsync(resp); } - - return Task.Delay(0); })) .UseKestrel(options => { options.Listen(IPAddress.Loopback, 0, listenConfigure); }) .Build(); diff --git a/tests/WatchTests.cs b/tests/WatchTests.cs new file mode 100644 index 000000000..31a3dacbc --- /dev/null +++ b/tests/WatchTests.cs @@ -0,0 +1,274 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Net; +using System.Threading; +using System.Threading.Tasks; +using k8s.Exceptions; +using k8s.Models; +using k8s.Tests.Mock; +using Microsoft.AspNetCore.Http; +using Newtonsoft.Json; +using Newtonsoft.Json.Converters; +using Xunit; + +namespace k8s.Tests +{ + public class WatchTests + { + private static readonly string MockAddedEventStreamLine = BuildWatchEventStreamLine(WatchEventType.Added); + private static readonly string MockDeletedStreamLine = BuildWatchEventStreamLine(WatchEventType.Deleted); + private static readonly string MockModifiedStreamLine = BuildWatchEventStreamLine(WatchEventType.Modified); + private static readonly string MockErrorStreamLine = BuildWatchEventStreamLine(WatchEventType.Error); + private static readonly string MockBadStreamLine = "bad json"; + + private static string BuildWatchEventStreamLine(WatchEventType eventType) + { + var corev1PodList = JsonConvert.DeserializeObject(MockKubeApiServer.MockPodResponse); + return JsonConvert.SerializeObject(new Watcher.WatchEvent + { + Type = eventType, + Object = corev1PodList.Items.First() + }, new StringEnumConverter()); + } + + private static async Task WriteStreamLine(HttpContext httpContext, string reponseLine) + { + const string crlf = "\r\n"; + await httpContext.Response.WriteAsync(reponseLine.Replace(crlf, "")); + await httpContext.Response.WriteAsync(crlf); + await httpContext.Response.Body.FlushAsync(); + } + + [Fact] + public void TestCannotWatch() + { + using (var server = new MockKubeApiServer()) + { + var client = new Kubernetes(new KubernetesClientConfiguration + { + Host = server.Uri.ToString() + }); + + // did not pass watch param + { + var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default").Result; + Assert.ThrowsAny(() => + { + listTask.Watch((type, item) => { }); + }); + } + + // server did not response line by line + { + Assert.ThrowsAny(() => + { + var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).Result; + + // this line did not throw + // listTask.Watch((type, item) => { }); + }); + } + } + } + + [Fact] + public void TestSuriveBadLine() + { + using (var server = new MockKubeApiServer(async httpContext => + { + httpContext.Response.StatusCode = (int) HttpStatusCode.OK; + httpContext.Response.ContentLength = null; + + await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse); + await Task.Delay(TimeSpan.FromMilliseconds(100)); + + await WriteStreamLine(httpContext, MockBadStreamLine); + await Task.Delay(TimeSpan.FromMilliseconds(100)); + + await WriteStreamLine(httpContext, MockAddedEventStreamLine); + await Task.Delay(TimeSpan.FromMilliseconds(100)); + + await WriteStreamLine(httpContext, MockBadStreamLine); + await Task.Delay(TimeSpan.FromMilliseconds(100)); + + await WriteStreamLine(httpContext, MockModifiedStreamLine); + await Task.Delay(TimeSpan.FromMilliseconds(100)); + + // make server alive, cannot set to int.max as of it would block response + await Task.Delay(TimeSpan.FromDays(1)); + return false; + })) + { + var client = new Kubernetes(new KubernetesClientConfiguration + { + Host = server.Uri.ToString() + }); + + var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).Result; + + + var events = new HashSet(); + var errors = 0; + + var watcher = listTask.Watch( + (type, item) => { events.Add(type); }, + e => { errors += 1; } + ); + + // wait server yields all events + Thread.Sleep(TimeSpan.FromMilliseconds(1000)); + + Assert.Contains(WatchEventType.Added, events); + Assert.Contains(WatchEventType.Modified, events); + + Assert.Equal(2, errors); + + Assert.True(watcher.Watching); + + // prevent from server down exception trigger + Thread.Sleep(TimeSpan.FromMilliseconds(1000)); + } + } + + [Fact] + public void TestDisposeWatch() + { + using (var server = new MockKubeApiServer(async httpContext => + { + await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse); + await Task.Delay(TimeSpan.FromMilliseconds(100)); + + for (;;) + { + await WriteStreamLine(httpContext, MockAddedEventStreamLine); + await Task.Delay(TimeSpan.FromMilliseconds(100)); + + } + })) + { + var client = new Kubernetes(new KubernetesClientConfiguration + { + Host = server.Uri.ToString() + }); + + var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).Result; + + + var events = new HashSet(); + + var watcher = listTask.Watch( + (type, item) => { events.Add(type); } + ); + + // wait at least an event + Thread.Sleep(TimeSpan.FromMilliseconds(300)); + + Assert.NotEmpty(events); + Assert.True(watcher.Watching); + + watcher.Dispose(); + + events.Clear(); + + // make sure wait event called + Thread.Sleep(TimeSpan.FromMilliseconds(300)); + Assert.Empty(events); + Assert.False(watcher.Watching); + + } + } + + [Fact] + public void TestWatchAllEvents() + { + using (var server = new MockKubeApiServer(async httpContext => + { + await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse); + await Task.Delay(TimeSpan.FromMilliseconds(100)); + + await WriteStreamLine(httpContext, MockAddedEventStreamLine); + await Task.Delay(TimeSpan.FromMilliseconds(100)); + + await WriteStreamLine(httpContext, MockDeletedStreamLine); + await Task.Delay(TimeSpan.FromMilliseconds(100)); + + await WriteStreamLine(httpContext, MockModifiedStreamLine); + await Task.Delay(TimeSpan.FromMilliseconds(100)); + + await WriteStreamLine(httpContext, MockErrorStreamLine); + await Task.Delay(TimeSpan.FromMilliseconds(100)); + + // make server alive, cannot set to int.max as of it would block response + await Task.Delay(TimeSpan.FromDays(1)); + return false; + })) + { + var client = new Kubernetes(new KubernetesClientConfiguration + { + Host = server.Uri.ToString() + }); + + var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).Result; + + + var events = new HashSet(); + var errors = 0; + + var watcher = listTask.Watch( + (type, item) => { events.Add(type); }, + e => { errors += 1; } + ); + + // wait server yields all events + Thread.Sleep(TimeSpan.FromMilliseconds(750)); + + Assert.Contains(WatchEventType.Added, events); + Assert.Contains(WatchEventType.Deleted, events); + Assert.Contains(WatchEventType.Modified, events); + Assert.Contains(WatchEventType.Error, events); + + + Assert.Equal(0, errors); + + Assert.True(watcher.Watching); + } + } + + [Fact] + public void TestWatchServerDisconnect() + { + Watcher watcher; + Exception exceptionCatched = null; + + using (var server = new MockKubeApiServer(async httpContext => + { + await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse); + + // make sure watch success + await Task.Delay(TimeSpan.FromMilliseconds(200)); + + throw new IOException("server down"); + })) + { + var client = new Kubernetes(new KubernetesClientConfiguration + { + Host = server.Uri.ToString() + }); + + var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).Result; + + watcher = listTask.Watch( + (type, item) => { }, + e => { exceptionCatched = e; }); + } + + // wait server down + Thread.Sleep(TimeSpan.FromMilliseconds(500)); + + Assert.False(watcher.Watching); + Assert.IsType(exceptionCatched); + } + } +} \ No newline at end of file