Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion _site
Submodule _site updated 161 files
48 changes: 2 additions & 46 deletions projects/RabbitMQ.Client/client/impl/Command.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,64 +98,20 @@ public static void CheckEmptyFrameSize()
}

internal void Transmit(int channelNumber, Connection connection)
{
if (Method.HasContent)
{
TransmitAsFrameSet(channelNumber, connection);
}
else
{
TransmitAsSingleFrame(channelNumber, connection);
}
}

internal void TransmitAsSingleFrame(int channelNumber, Connection connection)
{
connection.WriteFrame(new MethodOutboundFrame(channelNumber, Method));
}

internal void TransmitAsFrameSet(int channelNumber, Connection connection)
{
var frames = new List<OutboundFrame> { new MethodOutboundFrame(channelNumber, Method) };
if (Method.HasContent)
{
frames.Add(new HeaderOutboundFrame(channelNumber, Header, Body.Length));
connection.WriteFrame(new HeaderOutboundFrame(channelNumber, Header, Body.Length));
int frameMax = (int)Math.Min(int.MaxValue, connection.FrameMax);
int bodyPayloadMax = (frameMax == 0) ? Body.Length : frameMax - EmptyFrameSize;
for (int offset = 0; offset < Body.Length; offset += bodyPayloadMax)
{
int remaining = Body.Length - offset;
int count = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax;
frames.Add(new BodySegmentOutboundFrame(channelNumber, Body.Slice(offset, count)));
connection.WriteFrame(new BodySegmentOutboundFrame(channelNumber, Body.Slice(offset, count)));
}
}

connection.WriteFrameSet(frames);
}


internal static List<OutboundFrame> CalculateFrames(int channelNumber, Connection connection, IList<Command> commands)
{
var frames = new List<OutboundFrame>();

foreach (Command cmd in commands)
{
frames.Add(new MethodOutboundFrame(channelNumber, cmd.Method));
if (cmd.Method.HasContent)
{
frames.Add(new HeaderOutboundFrame(channelNumber, cmd.Header, cmd.Body.Length));
int frameMax = (int)Math.Min(int.MaxValue, connection.FrameMax);
int bodyPayloadMax = (frameMax == 0) ? cmd.Body.Length : frameMax - EmptyFrameSize;
for (int offset = 0; offset < cmd.Body.Length; offset += bodyPayloadMax)
{
int remaining = cmd.Body.Length - offset;
int count = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax;
frames.Add(new BodySegmentOutboundFrame(channelNumber, cmd.Body.Slice(offset, count)));
}
}
}

return frames;
}

public void Dispose()
Expand Down
5 changes: 0 additions & 5 deletions projects/RabbitMQ.Client/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -948,11 +948,6 @@ public void WriteFrame(OutboundFrame f)
_frameHandler.WriteFrame(f);
}

public void WriteFrameSet(IList<OutboundFrame> f)
{
_frameHandler.WriteFrameSet(f);
}

public void UpdateSecret(string newSecret, string reason)
{
_model0.UpdateSecret(newSecret, reason);
Expand Down
4 changes: 1 addition & 3 deletions projects/RabbitMQ.Client/client/impl/IFrameHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ interface IFrameHandler

void SendHeader();

void WriteFrame(OutboundFrame frame, bool flush = true);

void WriteFrameSet(IList<OutboundFrame> frames);
void WriteFrame(OutboundFrame frame);
}
}
7 changes: 6 additions & 1 deletion projects/RabbitMQ.Client/client/impl/SessionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,14 @@ public virtual void Transmit(Command cmd)
// of frames within a channel. But that is fixed in socket frame handler instead, so no need to lock.
cmd.Transmit(ChannelNumber, Connection);
}

public virtual void Transmit(IList<Command> commands)
{
Connection.WriteFrameSet(Command.CalculateFrames(ChannelNumber, Connection, commands));
for (int i = 0; i < commands.Count; i++)
{
Command command = commands[i];
command.Transmit(ChannelNumber, Connection);
}
}
}
}
61 changes: 29 additions & 32 deletions projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
using System.Net.Sockets;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;

using RabbitMQ.Client.Exceptions;
Expand Down Expand Up @@ -79,7 +80,8 @@ class SocketFrameHandler : IFrameHandler
private readonly ITcpClient _socket;
private readonly Stream _writer;
private readonly object _semaphore = new object();
private readonly object _streamLock = new object();
private readonly Channel<OutboundFrame> _frameChannel = Channel.CreateUnbounded<OutboundFrame>(new UnboundedChannelOptions { AllowSynchronousContinuations = false, SingleReader = true, SingleWriter = false });
private Task _frameWriter;
private bool _closed;
public SocketFrameHandler(AmqpTcpEndpoint endpoint,
Func<AddressFamily, ITcpClient> socketFactory,
Expand Down Expand Up @@ -124,6 +126,7 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint,
_writer = new BufferedStream(netstream, _socket.Client.SendBufferSize);

WriteTimeout = writeTimeout;
_frameWriter = Task.Run(WriteFrameImpl);
}
public AmqpTcpEndpoint Endpoint { get; set; }

Expand Down Expand Up @@ -181,6 +184,15 @@ public void Close()
{
if (!_closed)
{
try
{
_frameChannel.Writer.Complete();
_frameWriter.Wait();
}
catch(Exception)
{
}

try
{
_socket.Close();
Expand Down Expand Up @@ -222,46 +234,31 @@ public void SendHeader()
headerBytes[7] = (byte)Endpoint.Protocol.MinorVersion;
}

Write(new ArraySegment<byte>(headerBytes), true);
_writer.Write(headerBytes, 0, 8);
_writer.Flush();
}

public void WriteFrame(OutboundFrame frame, bool flush = true)
public void WriteFrame(OutboundFrame frame)
{
int bufferSize = frame.GetMinimumBufferSize();
byte[] memoryArray = ArrayPool<byte>.Shared.Rent(bufferSize);
Memory<byte> slice = new Memory<byte>(memoryArray, 0, bufferSize);
frame.WriteTo(slice);
_socket.Client.Poll(_writeableStateTimeoutMicroSeconds, SelectMode.SelectWrite);
Write(slice.Slice(0, frame.ByteCount), flush);
ArrayPool<byte>.Shared.Return(memoryArray);
}

public void WriteFrameSet(IList<OutboundFrame> frames)
{
for (int i = 0; i < frames.Count; i++)
{
WriteFrame(frames[i], false);
}

lock (_streamLock)
{
_writer.Flush();
}
_frameChannel.Writer.TryWrite(frame);
}

private void Write(ReadOnlyMemory<byte> buffer, bool flush)
public async Task WriteFrameImpl()
{
lock (_streamLock)
while (await _frameChannel.Reader.WaitToReadAsync().ConfigureAwait(false))
{
if (MemoryMarshal.TryGetArray(buffer, out ArraySegment<byte> segment))
_socket.Client.Poll(_writeableStateTimeoutMicroSeconds, SelectMode.SelectWrite);
while (_frameChannel.Reader.TryRead(out OutboundFrame frame))
{
_writer.Write(segment.Array, segment.Offset, segment.Count);

if (flush)
{
_writer.Flush();
}
int bufferSize = frame.GetMinimumBufferSize();
byte[] memoryArray = ArrayPool<byte>.Shared.Rent(bufferSize);
Memory<byte> slice = new Memory<byte>(memoryArray, 0, bufferSize);
frame.WriteTo(slice);
_writer.Write(memoryArray, 0, bufferSize);
ArrayPool<byte>.Shared.Return(memoryArray);
}

_writer.Flush();
}
}

Expand Down
16 changes: 9 additions & 7 deletions projects/Unit/TestRecoverAfterCancel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,21 +82,23 @@ public void TestRecoverAfterCancel_()
UTF8Encoding enc = new UTF8Encoding();
Channel.BasicPublish("", Queue, null, enc.GetBytes("message"));
EventingBasicConsumer Consumer = new EventingBasicConsumer(Channel);
SharedQueue<BasicDeliverEventArgs> EventQueue = new SharedQueue<BasicDeliverEventArgs>();
Consumer.Received += (_, e) => EventQueue.Enqueue(e);
SharedQueue<(bool Redelivered, byte[] Body)> EventQueue = new SharedQueue<(bool Redelivered, byte[] Body)>();
// Making sure we copy the delivery body since it could be disposed at any time.
Consumer.Received += (_, e) => EventQueue.Enqueue((e.Redelivered, e.Body.ToArray()));

string CTag = Channel.BasicConsume(Queue, false, Consumer);
BasicDeliverEventArgs Event = EventQueue.Dequeue();
(bool Redelivered, byte[] Body) Event = EventQueue.Dequeue();
Channel.BasicCancel(CTag);
Channel.BasicRecover(true);

EventingBasicConsumer Consumer2 = new EventingBasicConsumer(Channel);
SharedQueue<BasicDeliverEventArgs> EventQueue2 = new SharedQueue<BasicDeliverEventArgs>();
Consumer2.Received += (_, e) => EventQueue2.Enqueue(e);
SharedQueue<(bool Redelivered, byte[] Body)> EventQueue2 = new SharedQueue<(bool Redelivered, byte[] Body)>();
// Making sure we copy the delivery body since it could be disposed at any time.
Consumer2.Received += (_, e) => EventQueue2.Enqueue((e.Redelivered, e.Body.ToArray()));
Channel.BasicConsume(Queue, false, Consumer2);
BasicDeliverEventArgs Event2 = EventQueue2.Dequeue();
(bool Redelivered, byte[] Body) Event2 = EventQueue2.Dequeue();

CollectionAssert.AreEqual(Event.Body.ToArray(), Event2.Body.ToArray());
CollectionAssert.AreEqual(Event.Body, Event2.Body);
Assert.IsFalse(Event.Redelivered);
Assert.IsTrue(Event2.Redelivered);
}
Expand Down