Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
using Newtonsoft.Json.Linq;

namespace GraphQL.Server.Transports.Subscriptions.Abstractions
{
/// <summary>
Expand All @@ -20,7 +18,7 @@ public class OperationMessage
/// <summary>
/// Nullable payload
/// </summary>
public JObject Payload { get; set; }
public object Payload { get; set; }


/// <inheritdoc />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,13 @@ private Task HandleUnknownAsync(MessageHandlingContext context)
{
Type = MessageType.GQL_CONNECTION_ERROR,
Id = message.Id,
Payload = JObject.FromObject(new
Payload = new ExecutionResult
{
message.Id,
Errors = new ExecutionErrors
{
new ExecutionError($"Unexpected message type {message.Type}")
}
})
}
});
}

Expand All @@ -76,7 +75,7 @@ private Task HandleStartAsync(MessageHandlingContext context)
{
var message = context.Message;
_logger.LogDebug("Handle start: {id}", message.Id);
var payload = message.Payload.ToObject<OperationMessagePayload>();
var payload = ((JObject)message.Payload).ToObject<OperationMessagePayload>();
if (payload == null)
throw new InvalidOperationException($"Could not get OperationMessagePayload from message.Payload");

Expand Down
2 changes: 1 addition & 1 deletion src/Transports.Subscriptions.Abstractions/Subscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void OnNext(ExecutionResult value)
{
Type = MessageType.GQL_DATA,
Id = Id,
Payload = JObject.FromObject(value)
Payload = value
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ await writer.SendAsync(new OperationMessage
{
Type = MessageType.GQL_ERROR,
Id = id,
Payload = JObject.FromObject(result)
Payload = result
});

return null;
Expand All @@ -110,7 +110,7 @@ await writer.SendAsync(new OperationMessage
{
Type = MessageType.GQL_ERROR,
Id = id,
Payload = JObject.FromObject(result)
Payload = result
});

return null;
Expand All @@ -131,7 +131,7 @@ await writer.SendAsync(new OperationMessage
{
Type = MessageType.GQL_DATA,
Id = id,
Payload = JObject.FromObject(result)
Payload = result
});

await writer.SendAsync(new OperationMessage
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Collections.Generic;
using System.Net.WebSockets;
using GraphQL.Http;
using GraphQL.Server.Internal;
using GraphQL.Server.Transports.Subscriptions.Abstractions;
using GraphQL.Types;
Expand All @@ -14,23 +15,26 @@ public class WebSocketConnectionFactory<TSchema> : IWebSocketConnectionFactory<T
private readonly ILoggerFactory _loggerFactory;
private readonly IGraphQLExecuter<TSchema> _executer;
private readonly IEnumerable<IOperationMessageListener> _messageListeners;
private readonly IDocumentWriter _documentWriter;

public WebSocketConnectionFactory(ILogger<WebSocketConnectionFactory<TSchema>> logger,
ILoggerFactory loggerFactory,
IGraphQLExecuter<TSchema> executer,
IEnumerable<IOperationMessageListener> messageListeners)
IEnumerable<IOperationMessageListener> messageListeners,
IDocumentWriter documentWriter)
{
_logger = logger;
_loggerFactory = loggerFactory;
_executer = executer;
_messageListeners = messageListeners;
_documentWriter = documentWriter;
}

public WebSocketConnection CreateConnection(WebSocket socket, string connectionId)
{
_logger.LogDebug("Creating server for connection {connectionId}", connectionId);

var transport = new WebSocketTransport(socket);
var transport = new WebSocketTransport(socket, _documentWriter);
var manager = new SubscriptionManager(_executer, _loggerFactory);
var server = new SubscriptionServer(
transport,
Expand Down
5 changes: 3 additions & 2 deletions src/Transports.Subscriptions.WebSockets/WebSocketTransport.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
using GraphQL.Http;
using GraphQL.Server.Transports.Subscriptions.Abstractions;
using Newtonsoft.Json;
using Newtonsoft.Json.Serialization;
Expand All @@ -11,7 +12,7 @@ public class WebSocketTransport : IMessageTransport
{
private readonly WebSocket _socket;

public WebSocketTransport(WebSocket socket)
public WebSocketTransport(WebSocket socket, IDocumentWriter documentWriter)
{
_socket = socket;
var serializerSettings = new JsonSerializerSettings
Expand All @@ -22,7 +23,7 @@ public WebSocketTransport(WebSocket socket)
};

Reader = new WebSocketReaderPipeline(_socket, serializerSettings);
Writer = new WebSocketWriterPipeline(_socket, serializerSettings);
Writer = new WebSocketWriterPipeline(_socket, documentWriter);
}


Expand Down
56 changes: 21 additions & 35 deletions src/Transports.Subscriptions.WebSockets/WebSocketWriterPipeline.cs
Original file line number Diff line number Diff line change
@@ -1,33 +1,23 @@
using System;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using GraphQL.Http;
using GraphQL.Server.Transports.Subscriptions.Abstractions;
using Newtonsoft.Json;

namespace GraphQL.Server.Transports.WebSockets
{
public class WebSocketWriterPipeline : IWriterPipeline
{
private readonly ITargetBlock<string> _endBlock;
private readonly JsonSerializerSettings _serializerSettings;
private readonly WebSocket _socket;
private readonly IPropagatorBlock<OperationMessage, string> _startBlock;
private readonly IDocumentWriter _documentWriter;
private readonly ITargetBlock<OperationMessage> _startBlock;

public WebSocketWriterPipeline(WebSocket socket, JsonSerializerSettings serializerSettings)
public WebSocketWriterPipeline(WebSocket socket, IDocumentWriter documentWriter)
{
_socket = socket;
_serializerSettings = serializerSettings;
_documentWriter = documentWriter;

_endBlock = CreateMessageWriter();
_startBlock = CreateWriterJsonTransformer();

_startBlock.LinkTo(_endBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
_startBlock = CreateMessageWriter();
}

public bool Post(OperationMessage message)
Expand All @@ -40,29 +30,17 @@ public Task SendAsync(OperationMessage message)
return _startBlock.SendAsync(message);
}

public Task Completion => _endBlock.Completion;
public Task Completion => _startBlock.Completion;

public Task Complete()
{
_startBlock.Complete();
return Task.CompletedTask;
}

protected IPropagatorBlock<OperationMessage, string> CreateWriterJsonTransformer()
private ITargetBlock<OperationMessage> CreateMessageWriter()
{
var transformer = new TransformBlock<OperationMessage, string>(
input => JsonConvert.SerializeObject(input, _serializerSettings),
new ExecutionDataflowBlockOptions
{
EnsureOrdered = true
});

return transformer;
}

private ITargetBlock<string> CreateMessageWriter()
{
var target = new ActionBlock<string>(
var target = new ActionBlock<OperationMessage>(
WriteMessageAsync, new ExecutionDataflowBlockOptions
{
BoundedCapacity = 1,
Expand All @@ -73,12 +51,20 @@ private ITargetBlock<string> CreateMessageWriter()
return target;
}

private Task WriteMessageAsync(string message)
private async Task WriteMessageAsync(OperationMessage message)
{
if (_socket.CloseStatus.HasValue) return Task.CompletedTask;
if (_socket.CloseStatus.HasValue) return;

var messageSegment = new ArraySegment<byte>(Encoding.UTF8.GetBytes(message));
return _socket.SendAsync(messageSegment, WebSocketMessageType.Text, true, CancellationToken.None);
var stream = new WebsocketWriterStream(_socket);
try
{
await _documentWriter.WriteAsync(stream, message);
}
finally
{
await stream.FlushAsync();
stream.Dispose();
}
}
}
}
66 changes: 66 additions & 0 deletions src/Transports.Subscriptions.WebSockets/WebsocketWriterStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
using System;
using System.IO;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;

namespace GraphQL.Server.Transports.WebSockets
{
public class WebsocketWriterStream : Stream
{
private readonly WebSocket _webSocket;

public WebsocketWriterStream(WebSocket webSocket)
{
_webSocket = webSocket;
}

public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return _webSocket.SendAsync(new ArraySegment<byte>(buffer, offset, count), WebSocketMessageType.Text, false,
cancellationToken);
}

public override void Write(byte[] buffer, int offset, int count)
{
WriteAsync(buffer, offset, count).GetAwaiter().GetResult();
}

public override Task FlushAsync(CancellationToken cancellationToken)
{
return _webSocket.SendAsync(new ArraySegment<byte>(Array.Empty<byte>()), WebSocketMessageType.Text, true, cancellationToken);
}

public override void Flush()
{
FlushAsync().GetAwaiter().GetResult();
}

public override int Read(byte[] buffer, int offset, int count)
{
throw new System.NotSupportedException();
}

public override long Seek(long offset, SeekOrigin origin)
{
throw new System.NotSupportedException();
}

public override void SetLength(long value)
{
throw new System.NotSupportedException();
}

public override bool CanRead => false;
public override bool CanSeek => false;
public override bool CanWrite => true;

public override long Length => throw new NotSupportedException();

public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public ChatSpec()
private void AssertReceivedData(List<OperationMessage> writtenMessages, Predicate<JObject> predicate)
{
var dataMessages = writtenMessages.Where(m => m.Type == MessageType.GQL_DATA);
var results = dataMessages.Select(m => m.Payload["data"] as JObject)
var results = dataMessages.Select(m => JObject.FromObject(((ExecutionResult)m.Payload).Data))
.ToList();

Assert.Contains(results, predicate);
Expand Down
11 changes: 11 additions & 0 deletions tests/Transports.Subscriptions.WebSockets.Tests/TestMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System;

namespace GraphQL.Server.Transports.WebSockets.Tests
{
public class TestMessage
{
public string Content { get; set; }

public DateTimeOffset SentAt { get; set; }
}
}
Loading