diff --git a/_site b/_site index b15c6d3532..6d593b826f 160000 --- a/_site +++ b/_site @@ -1 +1 @@ -Subproject commit b15c6d3532d726154b4371cd9b6ddcc12898f79d +Subproject commit 6d593b826f630a7ebb580dcddcf86f5edb878af7 diff --git a/projects/RabbitMQ.Client/client/impl/Command.cs b/projects/RabbitMQ.Client/client/impl/Command.cs index 01d866c2a2..06b908813d 100644 --- a/projects/RabbitMQ.Client/client/impl/Command.cs +++ b/projects/RabbitMQ.Client/client/impl/Command.cs @@ -98,64 +98,20 @@ public static void CheckEmptyFrameSize() } internal void Transmit(int channelNumber, Connection connection) - { - if (Method.HasContent) - { - TransmitAsFrameSet(channelNumber, connection); - } - else - { - TransmitAsSingleFrame(channelNumber, connection); - } - } - - internal void TransmitAsSingleFrame(int channelNumber, Connection connection) { connection.WriteFrame(new MethodOutboundFrame(channelNumber, Method)); - } - - internal void TransmitAsFrameSet(int channelNumber, Connection connection) - { - var frames = new List { new MethodOutboundFrame(channelNumber, Method) }; if (Method.HasContent) { - frames.Add(new HeaderOutboundFrame(channelNumber, Header, Body.Length)); + connection.WriteFrame(new HeaderOutboundFrame(channelNumber, Header, Body.Length)); int frameMax = (int)Math.Min(int.MaxValue, connection.FrameMax); int bodyPayloadMax = (frameMax == 0) ? Body.Length : frameMax - EmptyFrameSize; for (int offset = 0; offset < Body.Length; offset += bodyPayloadMax) { int remaining = Body.Length - offset; int count = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax; - frames.Add(new BodySegmentOutboundFrame(channelNumber, Body.Slice(offset, count))); + connection.WriteFrame(new BodySegmentOutboundFrame(channelNumber, Body.Slice(offset, count))); } } - - connection.WriteFrameSet(frames); - } - - - internal static List CalculateFrames(int channelNumber, Connection connection, IList commands) - { - var frames = new List(); - - foreach (Command cmd in commands) - { - frames.Add(new MethodOutboundFrame(channelNumber, cmd.Method)); - if (cmd.Method.HasContent) - { - frames.Add(new HeaderOutboundFrame(channelNumber, cmd.Header, cmd.Body.Length)); - int frameMax = (int)Math.Min(int.MaxValue, connection.FrameMax); - int bodyPayloadMax = (frameMax == 0) ? cmd.Body.Length : frameMax - EmptyFrameSize; - for (int offset = 0; offset < cmd.Body.Length; offset += bodyPayloadMax) - { - int remaining = cmd.Body.Length - offset; - int count = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax; - frames.Add(new BodySegmentOutboundFrame(channelNumber, cmd.Body.Slice(offset, count))); - } - } - } - - return frames; } public void Dispose() diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs index 3e498d0947..aead6f6c92 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.cs @@ -948,11 +948,6 @@ public void WriteFrame(OutboundFrame f) _frameHandler.WriteFrame(f); } - public void WriteFrameSet(IList f) - { - _frameHandler.WriteFrameSet(f); - } - public void UpdateSecret(string newSecret, string reason) { _model0.UpdateSecret(newSecret, reason); diff --git a/projects/RabbitMQ.Client/client/impl/IFrameHandler.cs b/projects/RabbitMQ.Client/client/impl/IFrameHandler.cs index 53935edbe5..96eda7a6db 100644 --- a/projects/RabbitMQ.Client/client/impl/IFrameHandler.cs +++ b/projects/RabbitMQ.Client/client/impl/IFrameHandler.cs @@ -71,8 +71,6 @@ interface IFrameHandler void SendHeader(); - void WriteFrame(OutboundFrame frame, bool flush = true); - - void WriteFrameSet(IList frames); + void WriteFrame(OutboundFrame frame); } } diff --git a/projects/RabbitMQ.Client/client/impl/SessionBase.cs b/projects/RabbitMQ.Client/client/impl/SessionBase.cs index 9a6154f059..8f2bcef0d7 100644 --- a/projects/RabbitMQ.Client/client/impl/SessionBase.cs +++ b/projects/RabbitMQ.Client/client/impl/SessionBase.cs @@ -191,9 +191,14 @@ public virtual void Transmit(Command cmd) // of frames within a channel. But that is fixed in socket frame handler instead, so no need to lock. cmd.Transmit(ChannelNumber, Connection); } + public virtual void Transmit(IList commands) { - Connection.WriteFrameSet(Command.CalculateFrames(ChannelNumber, Connection, commands)); + for (int i = 0; i < commands.Count; i++) + { + Command command = commands[i]; + command.Transmit(ChannelNumber, Connection); + } } } } diff --git a/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs b/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs index b5ef66ad60..45f899d078 100644 --- a/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs +++ b/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs @@ -46,6 +46,7 @@ using System.Net.Sockets; using System.Runtime.InteropServices; using System.Text; +using System.Threading.Channels; using System.Threading.Tasks; using RabbitMQ.Client.Exceptions; @@ -79,7 +80,8 @@ class SocketFrameHandler : IFrameHandler private readonly ITcpClient _socket; private readonly Stream _writer; private readonly object _semaphore = new object(); - private readonly object _streamLock = new object(); + private readonly Channel _frameChannel = Channel.CreateUnbounded(new UnboundedChannelOptions { AllowSynchronousContinuations = false, SingleReader = true, SingleWriter = false }); + private Task _frameWriter; private bool _closed; public SocketFrameHandler(AmqpTcpEndpoint endpoint, Func socketFactory, @@ -124,6 +126,7 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint, _writer = new BufferedStream(netstream, _socket.Client.SendBufferSize); WriteTimeout = writeTimeout; + _frameWriter = Task.Run(WriteFrameImpl); } public AmqpTcpEndpoint Endpoint { get; set; } @@ -181,6 +184,15 @@ public void Close() { if (!_closed) { + try + { + _frameChannel.Writer.Complete(); + _frameWriter.Wait(); + } + catch(Exception) + { + } + try { _socket.Close(); @@ -222,46 +234,31 @@ public void SendHeader() headerBytes[7] = (byte)Endpoint.Protocol.MinorVersion; } - Write(new ArraySegment(headerBytes), true); + _writer.Write(headerBytes, 0, 8); + _writer.Flush(); } - public void WriteFrame(OutboundFrame frame, bool flush = true) + public void WriteFrame(OutboundFrame frame) { - int bufferSize = frame.GetMinimumBufferSize(); - byte[] memoryArray = ArrayPool.Shared.Rent(bufferSize); - Memory slice = new Memory(memoryArray, 0, bufferSize); - frame.WriteTo(slice); - _socket.Client.Poll(_writeableStateTimeoutMicroSeconds, SelectMode.SelectWrite); - Write(slice.Slice(0, frame.ByteCount), flush); - ArrayPool.Shared.Return(memoryArray); - } - - public void WriteFrameSet(IList frames) - { - for (int i = 0; i < frames.Count; i++) - { - WriteFrame(frames[i], false); - } - - lock (_streamLock) - { - _writer.Flush(); - } + _frameChannel.Writer.TryWrite(frame); } - private void Write(ReadOnlyMemory buffer, bool flush) + public async Task WriteFrameImpl() { - lock (_streamLock) + while (await _frameChannel.Reader.WaitToReadAsync().ConfigureAwait(false)) { - if (MemoryMarshal.TryGetArray(buffer, out ArraySegment segment)) + _socket.Client.Poll(_writeableStateTimeoutMicroSeconds, SelectMode.SelectWrite); + while (_frameChannel.Reader.TryRead(out OutboundFrame frame)) { - _writer.Write(segment.Array, segment.Offset, segment.Count); - - if (flush) - { - _writer.Flush(); - } + int bufferSize = frame.GetMinimumBufferSize(); + byte[] memoryArray = ArrayPool.Shared.Rent(bufferSize); + Memory slice = new Memory(memoryArray, 0, bufferSize); + frame.WriteTo(slice); + _writer.Write(memoryArray, 0, bufferSize); + ArrayPool.Shared.Return(memoryArray); } + + _writer.Flush(); } } diff --git a/projects/Unit/TestRecoverAfterCancel.cs b/projects/Unit/TestRecoverAfterCancel.cs index 09b5699d84..df5ff7877e 100644 --- a/projects/Unit/TestRecoverAfterCancel.cs +++ b/projects/Unit/TestRecoverAfterCancel.cs @@ -82,21 +82,23 @@ public void TestRecoverAfterCancel_() UTF8Encoding enc = new UTF8Encoding(); Channel.BasicPublish("", Queue, null, enc.GetBytes("message")); EventingBasicConsumer Consumer = new EventingBasicConsumer(Channel); - SharedQueue EventQueue = new SharedQueue(); - Consumer.Received += (_, e) => EventQueue.Enqueue(e); + SharedQueue<(bool Redelivered, byte[] Body)> EventQueue = new SharedQueue<(bool Redelivered, byte[] Body)>(); + // Making sure we copy the delivery body since it could be disposed at any time. + Consumer.Received += (_, e) => EventQueue.Enqueue((e.Redelivered, e.Body.ToArray())); string CTag = Channel.BasicConsume(Queue, false, Consumer); - BasicDeliverEventArgs Event = EventQueue.Dequeue(); + (bool Redelivered, byte[] Body) Event = EventQueue.Dequeue(); Channel.BasicCancel(CTag); Channel.BasicRecover(true); EventingBasicConsumer Consumer2 = new EventingBasicConsumer(Channel); - SharedQueue EventQueue2 = new SharedQueue(); - Consumer2.Received += (_, e) => EventQueue2.Enqueue(e); + SharedQueue<(bool Redelivered, byte[] Body)> EventQueue2 = new SharedQueue<(bool Redelivered, byte[] Body)>(); + // Making sure we copy the delivery body since it could be disposed at any time. + Consumer2.Received += (_, e) => EventQueue2.Enqueue((e.Redelivered, e.Body.ToArray())); Channel.BasicConsume(Queue, false, Consumer2); - BasicDeliverEventArgs Event2 = EventQueue2.Dequeue(); + (bool Redelivered, byte[] Body) Event2 = EventQueue2.Dequeue(); - CollectionAssert.AreEqual(Event.Body.ToArray(), Event2.Body.ToArray()); + CollectionAssert.AreEqual(Event.Body, Event2.Body); Assert.IsFalse(Event.Redelivered); Assert.IsTrue(Event2.Redelivered); }