diff --git a/projects/RabbitMQ.Client/client/impl/CommandAssembler.cs b/projects/RabbitMQ.Client/client/impl/CommandAssembler.cs
index 5ae65d8193..a38e1b1181 100644
--- a/projects/RabbitMQ.Client/client/impl/CommandAssembler.cs
+++ b/projects/RabbitMQ.Client/client/impl/CommandAssembler.cs
@@ -34,6 +34,7 @@
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Framing.Impl;
+using RabbitMQ.Client.Logging;
using RabbitMQ.Util;
namespace RabbitMQ.Client.Impl
@@ -91,6 +92,7 @@ public bool HandleFrame(in InboundFrame frame, out IncomingCommand command)
return true;
}
+ RabbitMqClientEventSource.Log.CommandReceived();
command = new IncomingCommand(_method, _header, _body, _bodyBytes);
Reset();
return shallReturn;
diff --git a/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs b/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs
index e84546d503..30b379c131 100644
--- a/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs
+++ b/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs
@@ -35,6 +35,7 @@
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Impl;
+using RabbitMQ.Client.Logging;
using RabbitMQ.Util;
namespace RabbitMQ.Client.Framing.Impl
@@ -71,6 +72,7 @@ internal void HandleConnectionUnblocked()
private void Open()
{
+ RabbitMqClientEventSource.Log.ConnectionOpened();
StartAndTune();
_model0.ConnectionOpen(_factory.VirtualHost);
}
diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs
index 23ca452f44..1f786f88ef 100644
--- a/projects/RabbitMQ.Client/client/impl/Connection.cs
+++ b/projects/RabbitMQ.Client/client/impl/Connection.cs
@@ -339,6 +339,7 @@ private void FinishClose()
_frameHandler.Close();
_model0.SetCloseReason(CloseReason);
_model0.FinishClose();
+ RabbitMqClientEventSource.Log.ConnectionClosed();
}
///Broadcasts notification of the final shutdown of the connection.
diff --git a/projects/RabbitMQ.Client/client/impl/Frame.cs b/projects/RabbitMQ.Client/client/impl/Frame.cs
index a41a62f2ca..1a209ecba0 100644
--- a/projects/RabbitMQ.Client/client/impl/Frame.cs
+++ b/projects/RabbitMQ.Client/client/impl/Frame.cs
@@ -37,6 +37,7 @@
using System.Runtime.ExceptionServices;
using RabbitMQ.Client.Exceptions;
+using RabbitMQ.Client.Logging;
using RabbitMQ.Util;
namespace RabbitMQ.Client.Impl
@@ -49,9 +50,6 @@ internal static class Framing
* | 1 byte | 2 bytes | 4 bytes | x bytes | 1 byte |
* +------------+---------+----------------+---------+------------------+ */
internal const int BaseFrameSize = 1 + 2 + 4 + 1;
- internal const int StartFrameType = 0;
- internal const int StartChannel = 1;
- internal const int StartPayloadSize = 3;
private const int StartPayload = 7;
internal static class Method
@@ -199,10 +197,7 @@ internal static InboundFrame ReadFrom(Stream reader, byte[] frameHeaderBuffer)
{
try
{
- if (reader.Read(frameHeaderBuffer, 0, frameHeaderBuffer.Length) == 0)
- {
- throw new EndOfStreamException("Reached the end of the stream. Possible authentication failure.");
- }
+ ReadFromStream(reader, frameHeaderBuffer, frameHeaderBuffer.Length);
}
catch (IOException ioe)
{
@@ -234,19 +229,15 @@ internal static InboundFrame ReadFrom(Stream reader, byte[] frameHeaderBuffer)
// Is returned by InboundFrame.ReturnPayload in Connection.MainLoopIteration
int readSize = payloadSize + EndMarkerLength;
byte[] payloadBytes = ArrayPool.Shared.Rent(readSize);
- int bytesRead = 0;
try
{
- while (bytesRead < readSize)
- {
- bytesRead += reader.Read(payloadBytes, bytesRead, readSize - bytesRead);
- }
+ ReadFromStream(reader, payloadBytes, readSize);
}
catch (Exception)
{
// Early EOF.
ArrayPool.Shared.Return(payloadBytes);
- throw new MalformedFrameException($"Short frame - expected to read {readSize} bytes, only got {bytesRead} bytes");
+ throw new MalformedFrameException($"Short frame - expected to read {readSize} bytes");
}
if (payloadBytes[payloadSize] != Constants.FrameEnd)
@@ -255,9 +246,31 @@ internal static InboundFrame ReadFrom(Stream reader, byte[] frameHeaderBuffer)
throw new MalformedFrameException($"Bad frame end marker: {payloadBytes[payloadSize]}");
}
+ RabbitMqClientEventSource.Log.DataReceived(payloadSize + Framing.BaseFrameSize);
return new InboundFrame(type, channel, new Memory(payloadBytes, 0, payloadSize), payloadBytes);
}
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private static void ReadFromStream(Stream reader, byte[] buffer, int toRead)
+ {
+ int bytesRead = 0;
+ do
+ {
+ int read = reader.Read(buffer, bytesRead, toRead - bytesRead);
+ if (read == 0)
+ {
+ ThrowEndOfStream();
+ }
+
+ bytesRead += read;
+ } while (bytesRead != toRead);
+
+ static void ThrowEndOfStream()
+ {
+ throw new EndOfStreamException("Reached the end of the stream. Possible authentication failure.");
+ }
+ }
+
public byte[] TakeoverPayload()
{
return _rentedArray;
diff --git a/projects/RabbitMQ.Client/client/impl/SessionBase.cs b/projects/RabbitMQ.Client/client/impl/SessionBase.cs
index e98cd8b954..2c7be0e83b 100644
--- a/projects/RabbitMQ.Client/client/impl/SessionBase.cs
+++ b/projects/RabbitMQ.Client/client/impl/SessionBase.cs
@@ -31,9 +31,10 @@
using System;
using System.Threading;
-
+using RabbitMQ.Client.client.framing;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Framing.Impl;
+using RabbitMQ.Client.Logging;
namespace RabbitMQ.Client.Impl
{
@@ -50,6 +51,7 @@ protected SessionBase(Connection connection, ushort channelNumber)
{
connection.ConnectionShutdown += OnConnectionShutdown;
}
+ RabbitMqClientEventSource.Log.ChannelOpened();
}
public event EventHandler SessionShutdown
@@ -102,7 +104,10 @@ public void Close(ShutdownEventArgs reason)
public void Close(ShutdownEventArgs reason, bool notify)
{
- Interlocked.CompareExchange(ref _closeReason, reason, null);
+ if (Interlocked.CompareExchange(ref _closeReason, reason, null) is null)
+ {
+ RabbitMqClientEventSource.Log.ChannelClosed();
+ }
if (notify)
{
OnSessionShutdown(CloseReason);
@@ -126,7 +131,7 @@ public void Notify()
public virtual void Transmit(in T cmd) where T : struct, IOutgoingCommand
{
- if (!IsOpen && cmd.Method.ProtocolCommandId != client.framing.ProtocolCommandId.ChannelCloseOk)
+ if (!IsOpen && cmd.Method.ProtocolCommandId != ProtocolCommandId.ChannelCloseOk)
{
throw new AlreadyClosedException(CloseReason);
}
diff --git a/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs b/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs
index e9dea6bb14..b4f00be8b9 100644
--- a/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs
+++ b/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs
@@ -40,6 +40,7 @@
using System.Threading.Tasks;
using RabbitMQ.Client.Exceptions;
+using RabbitMQ.Client.Logging;
namespace RabbitMQ.Client.Impl
{
@@ -59,7 +60,7 @@ public static async Task TimeoutAfter(this Task task, TimeSpan timeout)
}
}
- internal class SocketFrameHandler : IFrameHandler
+ internal sealed class SocketFrameHandler : IFrameHandler
{
private readonly ITcpClient _socket;
private readonly Stream _reader;
@@ -282,6 +283,7 @@ private async Task WriteLoop()
#else
await _writer.WriteAsync(memory).ConfigureAwait(false);
#endif
+ RabbitMqClientEventSource.Log.CommandSent(segment.Count);
ArrayPool.Shared.Return(segment.Array);
}
diff --git a/projects/RabbitMQ.Client/client/logging/RabbitMqClientEventSource.Counters.cs b/projects/RabbitMQ.Client/client/logging/RabbitMqClientEventSource.Counters.cs
new file mode 100644
index 0000000000..2f3dc07f4f
--- /dev/null
+++ b/projects/RabbitMQ.Client/client/logging/RabbitMqClientEventSource.Counters.cs
@@ -0,0 +1,121 @@
+// This source code is dual-licensed under the Apache License, version
+// 2.0, and the Mozilla Public License, version 2.0.
+//
+// The APL v2.0:
+//
+//---------------------------------------------------------------------------
+// Copyright (c) 2007-2020 VMware, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//---------------------------------------------------------------------------
+//
+// The MPL v2.0:
+//
+//---------------------------------------------------------------------------
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at https://mozilla.org/MPL/2.0/.
+//
+// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
+//---------------------------------------------------------------------------
+
+using System;
+using System.Diagnostics.Tracing;
+using System.Threading;
+
+namespace RabbitMQ.Client.Logging
+{
+ #nullable enable
+ internal sealed partial class RabbitMqClientEventSource
+ {
+ private static int ConnectionsOpened;
+ private static int ConnectionsClosed;
+ private static int ChannelsOpened;
+ private static int ChannelsClosed;
+ private static long BytesSent;
+ private static long BytesReceived;
+ private static long CommandsSent;
+ private static long CommandsReceived;
+
+#if !NETSTANDARD
+ private PollingCounter? _connectionOpenedCounter;
+ private PollingCounter? _openConnectionCounter;
+ private PollingCounter? _channelOpenedCounter;
+ private PollingCounter? _openChannelCounter;
+ private IncrementingPollingCounter? _bytesSentCounter;
+ private IncrementingPollingCounter? _bytesReceivedCounter;
+ private IncrementingPollingCounter? _commandSentCounter;
+ private IncrementingPollingCounter? _commandReceivedCounter;
+
+ protected override void OnEventCommand(EventCommandEventArgs command)
+ {
+ if (command.Command == EventCommand.Enable)
+ {
+ _connectionOpenedCounter ??= new PollingCounter("total-connections-opened", this, () => ConnectionsOpened) { DisplayName = "Total connections opened" };
+ _openConnectionCounter ??= new PollingCounter("current-open-connections", this, () => ConnectionsOpened - ConnectionsClosed) { DisplayName = "Current open connections count" };
+
+ _channelOpenedCounter ??= new PollingCounter("total-channels-opened", this, () => ChannelsOpened) { DisplayName = "Total channels opened" };
+ _openChannelCounter ??= new PollingCounter("current-open-channels", this, () => ChannelsOpened - ChannelsClosed) { DisplayName = "Current open channels count" };
+
+ _bytesSentCounter ??= new IncrementingPollingCounter("bytes-sent-rate", this, () => Interlocked.Read(ref BytesSent)) { DisplayName = "Byte sending rate", DisplayUnits = "B", DisplayRateTimeScale = new TimeSpan(0, 0, 1) };
+ _bytesReceivedCounter ??= new IncrementingPollingCounter("bytes-received-rate", this, () => Interlocked.Read(ref BytesReceived)) { DisplayName = "Byte receiving rate", DisplayUnits = "B", DisplayRateTimeScale = new TimeSpan(0, 0, 1) };
+
+ _commandSentCounter ??= new IncrementingPollingCounter("AMQP-method-sent-rate", this, () => Interlocked.Read(ref CommandsSent)) { DisplayName = "AMQP method sending rate", DisplayUnits = "B", DisplayRateTimeScale = new TimeSpan(0, 0, 1) };
+ _commandReceivedCounter ??= new IncrementingPollingCounter("AMQP-method-received-rate", this, () => Interlocked.Read(ref CommandsReceived)) { DisplayName = "AMQP method receiving rate", DisplayUnits = "B", DisplayRateTimeScale = new TimeSpan(0, 0, 1) };
+ }
+ }
+#endif
+ [NonEvent]
+ public void ConnectionOpened()
+ {
+ Interlocked.Increment(ref ConnectionsOpened);
+ }
+
+ [NonEvent]
+ public void ConnectionClosed()
+ {
+ Interlocked.Increment(ref ConnectionsClosed);
+ }
+
+ [NonEvent]
+ public void ChannelOpened()
+ {
+ Interlocked.Increment(ref ChannelsOpened);
+ }
+
+ [NonEvent]
+ public void ChannelClosed()
+ {
+ Interlocked.Increment(ref ChannelsClosed);
+ }
+
+ [NonEvent]
+ public void DataReceived(int byteCount)
+ {
+ Interlocked.Add(ref BytesReceived, byteCount);
+ }
+
+ [NonEvent]
+ public void CommandSent(int byteCount)
+ {
+ Interlocked.Increment(ref CommandsSent);
+ Interlocked.Add(ref BytesSent, byteCount);
+ }
+
+ [NonEvent]
+ public void CommandReceived()
+ {
+ Interlocked.Increment(ref CommandsReceived);
+ }
+ }
+}
diff --git a/projects/RabbitMQ.Client/client/logging/RabbitMqClientEventSource.cs b/projects/RabbitMQ.Client/client/logging/RabbitMqClientEventSource.cs
index 8cc5f759e9..222eec743e 100644
--- a/projects/RabbitMQ.Client/client/logging/RabbitMqClientEventSource.cs
+++ b/projects/RabbitMQ.Client/client/logging/RabbitMqClientEventSource.cs
@@ -34,25 +34,20 @@
namespace RabbitMQ.Client.Logging
{
- [EventSource(Name="rabbitmq-dotnet-client")]
- public sealed class RabbitMqClientEventSource : EventSource
+ #nullable enable
+ internal sealed partial class RabbitMqClientEventSource : EventSource
{
- public class Keywords
+ public static readonly RabbitMqClientEventSource Log = new RabbitMqClientEventSource();
+
+ public RabbitMqClientEventSource()
+ : base("rabbitmq-client")
{
- public const EventKeywords Log = (EventKeywords)1;
}
-#if NET452
- public RabbitMqClientEventSource() : base()
- {
- }
-#else
- public RabbitMqClientEventSource() : base(EventSourceSettings.EtwSelfDescribingEventFormat)
+ public class Keywords
{
+ public const EventKeywords Log = (EventKeywords)1;
}
-#endif
-
- public static RabbitMqClientEventSource Log = new RabbitMqClientEventSource ();
[Event(1, Message = "INFO", Keywords = Keywords.Log, Level = EventLevel.Informational)]
public void Info(string message)
diff --git a/projects/Unit/APIApproval.Approve.verified.txt b/projects/Unit/APIApproval.Approve.verified.txt
index d9be7bb5b7..5a9c774ec5 100644
--- a/projects/Unit/APIApproval.Approve.verified.txt
+++ b/projects/Unit/APIApproval.Approve.verified.txt
@@ -808,25 +808,6 @@ namespace RabbitMQ.Client.Exceptions
}
namespace RabbitMQ.Client.Logging
{
- [System.Diagnostics.Tracing.EventSource(Name="rabbitmq-dotnet-client")]
- public sealed class RabbitMqClientEventSource : System.Diagnostics.Tracing.EventSource
- {
- public static RabbitMQ.Client.Logging.RabbitMqClientEventSource Log;
- public RabbitMqClientEventSource() { }
- [System.Diagnostics.Tracing.Event(3, Keywords=System.Diagnostics.Tracing.EventKeywords.None | System.Diagnostics.Tracing.EventKeywords.All, Level=System.Diagnostics.Tracing.EventLevel.Error, Message="ERROR")]
- public void Error(string message, RabbitMQ.Client.Logging.RabbitMqExceptionDetail ex) { }
- [System.Diagnostics.Tracing.NonEvent]
- public void Error(string message, System.Exception ex) { }
- [System.Diagnostics.Tracing.Event(1, Keywords=System.Diagnostics.Tracing.EventKeywords.None | System.Diagnostics.Tracing.EventKeywords.All, Level=System.Diagnostics.Tracing.EventLevel.Informational, Message="INFO")]
- public void Info(string message) { }
- [System.Diagnostics.Tracing.Event(2, Keywords=System.Diagnostics.Tracing.EventKeywords.None | System.Diagnostics.Tracing.EventKeywords.All, Level=System.Diagnostics.Tracing.EventLevel.Warning, Message="WARN")]
- public void Warn(string message) { }
- public class Keywords
- {
- public const System.Diagnostics.Tracing.EventKeywords Log = 1;
- public Keywords() { }
- }
- }
[System.Diagnostics.Tracing.EventData]
public class RabbitMqExceptionDetail
{