Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions bus/EasyCaching.Bus.CSRedis/DefaultCSRedisBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,21 @@ public override void BaseSubscribe(string topic, Action<EasyCachingMessage> acti
);
}

/// <summary>
/// Subscribe the specified topic and action async.
/// </summary>
/// <param name="topic">Topic.</param>
/// <param name="action">Action.</param>
/// <param name="cancellationToken">Cancellation token.</param>
public override Task BaseSubscribeAsync(string topic, Action<EasyCachingMessage> action, CancellationToken cancellationToken = default(CancellationToken))
{
_client.Subscribe(
(topic, msg => OnMessage(msg.Body))
);
return Task.CompletedTask;
}


/// <summary>
/// Ons the message.
/// </summary>
Expand Down
26 changes: 23 additions & 3 deletions bus/EasyCaching.Bus.ConfluentKafka/DefaultConfluentKafkaBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
using Microsoft.Extensions.Options;

public class DefaultConfluentKafkaBus : EasyCachingAbstractBus
{
{


/// <summary>
Expand Down Expand Up @@ -77,7 +77,7 @@ public override void BasePublish(string topic, EasyCachingMessage message)
/// <param name="cancellationToken">Cancellation token.</param>
public override async Task BasePublishAsync(string topic, EasyCachingMessage message, CancellationToken cancellationToken = default(CancellationToken))
{
var msg = _serializer.Serialize(message);
var msg = _serializer.Serialize(message);

await _producer.ProduceAsync(topic, new Message<Null, byte[]> { Value = msg });
}
Expand All @@ -89,7 +89,27 @@ public override void BasePublish(string topic, EasyCachingMessage message)
/// <param name="action">Action.</param>
public override void BaseSubscribe(string topic, Action<EasyCachingMessage> action)
{
Task.Factory.StartNew(() =>
_ = StartConsumer(topic);
}

/// <summary>
/// Subscribe the specified topic and action async.
/// </summary>
/// <param name="topic">Topic.</param>
/// <param name="action">Action.</param>
/// <param name="cancellationToken">Cancellation token.</param>
public override async Task BaseSubscribeAsync(string topic, Action<EasyCachingMessage> action, CancellationToken cancellationToken = default(CancellationToken))
{
await StartConsumer(topic);
}

/// <summary>
/// Ons the consumer.
/// </summary>
/// <param name="topic">Topic</param>
private Task StartConsumer(string topic)
{
return Task.Factory.StartNew(() =>
{
for (int i = 0; i < this._kafkaBusOptions.ConsumerCount; i++)
{
Expand Down
24 changes: 23 additions & 1 deletion bus/EasyCaching.Bus.RabbitMQ/DefaultRabbitMQBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ IPooledObjectPolicy<IModel> _objectPolicy
RequestedConnectionTimeout = System.TimeSpan.FromMilliseconds(_options.RequestedConnectionTimeout),
SocketReadTimeout = System.TimeSpan.FromMilliseconds(_options.SocketReadTimeout),
SocketWriteTimeout = System.TimeSpan.FromMilliseconds(_options.SocketWriteTimeout),
ClientProvidedName = _options.ClientProvidedName
ClientProvidedName = _options.ClientProvidedName,
};

_subConnection = factory.CreateConnection();
Expand Down Expand Up @@ -156,6 +156,28 @@ public override void BaseSubscribe(string topic, Action<EasyCachingMessage> acti
}


/// <summary>
/// Subscribe the specified topic and action async.
/// </summary>
/// <param name="topic">Topic.</param>
/// <param name="action">Action.</param>
/// <param name="cancellationToken">Cancellation token.</param>
public override Task BaseSubscribeAsync(string topic, Action<EasyCachingMessage> action, CancellationToken cancellationToken = default(CancellationToken))
{
var queueName = string.Empty;
if (string.IsNullOrWhiteSpace(_options.QueueName))
{
queueName = $"rmq.queue.undurable.easycaching.subscriber.{_busId}";
}
else
{
queueName = _options.QueueName;
}

StartConsumer(queueName, topic);
return Task.CompletedTask;
}

private void StartConsumer(string queueName, string topic)
{
var model = _subConnection.CreateModel();
Expand Down
11 changes: 11 additions & 0 deletions bus/EasyCaching.Bus.Redis/DefaultRedisBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,5 +104,16 @@ public override void BaseSubscribe(string topic, Action<EasyCachingMessage> acti
{
_subscriber.Subscribe(topic, OnMessage);
}

/// <summary>
/// Subscribe the specified topic and action async.
/// </summary>
/// <param name="topic">Topic.</param>
/// <param name="action">Action.</param>
/// <param name="cancellationToken">Cancellation token.</param>
public override async Task BaseSubscribeAsync(string topic, Action<EasyCachingMessage> action, CancellationToken cancellationToken = default(CancellationToken))
{
await _subscriber.SubscribeAsync(topic, OnMessage);
}
}
}
12 changes: 12 additions & 0 deletions bus/EasyCaching.Bus.Zookeeper/DefaultZookeeperBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,18 @@ public override void BaseSubscribe(string topic, Action<EasyCachingMessage> acti
}, TaskCreationOptions.LongRunning);
}

/// <summary>
/// Subscribe the specified topic and action async.
/// </summary>
/// <param name="topic">Topic.</param>
/// <param name="action">Action.</param>
/// <param name="cancellationToken">Cancellation token.</param>
public override async Task BaseSubscribeAsync(string topic, Action<EasyCachingMessage> action, CancellationToken cancellationToken = default(CancellationToken))
{
var path = $"{topic}";
await SubscribeDataChangeAsync(path, SubscribeDataChange);
}

/// <summary>
/// Ons the message.
/// </summary>
Expand Down
11 changes: 11 additions & 0 deletions src/EasyCaching.Core/Bus/IEasyCachingSubscriber.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
namespace EasyCaching.Core.Bus
{
using System;
using System.Threading;
using System.Threading.Tasks;

/// <summary>
/// EasyCaching subscriber.
Expand All @@ -14,5 +16,14 @@ public interface IEasyCachingSubscriber
/// <param name="action">Action.</param>
/// <param name="reconnectAction"> Reconnect Action.</param>
void Subscribe(string topic, Action<EasyCachingMessage> action, Action reconnectAction = null);

/// <summary>
/// Subscribe the specified topic and action async.
/// </summary>
/// <param name="topic">Topic.</param>
/// <param name="action">Action.</param>
/// <param name="reconnectAction"> Reconnect Action.</param>
/// <param name="cancellationToken">Cancellation token.</param>
Task SubscribeAsync(string topic, Action<EasyCachingMessage> action, Action reconnectAction = null, CancellationToken cancellationToken = default(CancellationToken));
}
}
12 changes: 12 additions & 0 deletions src/EasyCaching.Core/Bus/NullEasyCachingBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,17 @@ public void Subscribe(string topic, Action<EasyCachingMessage> action, Action re
{

}

/// <summary>
/// Subscribe the specified topic and action async.
/// </summary>
/// <param name="topic">Topic.</param>
/// <param name="action">Action.</param>
/// <param name="reconnectAction">Reconnect Action.</param>
/// <param name="cancellationToken">Cancellation token.</param>
public Task SubscribeAsync(string topic, Action<EasyCachingMessage> action, Action reconnectAction = null, CancellationToken cancellationToken = default(CancellationToken))
{
return Task.CompletedTask;
}
}
}
8 changes: 8 additions & 0 deletions src/EasyCaching.Core/EasyCachingAbstractBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public abstract class EasyCachingAbstractBus : IEasyCachingBus
public abstract void BasePublish(string topic, EasyCachingMessage message);
public abstract Task BasePublishAsync(string topic, EasyCachingMessage message, CancellationToken cancellationToken = default(CancellationToken));
public abstract void BaseSubscribe(string topic, Action<EasyCachingMessage> action);
public abstract Task BaseSubscribeAsync(string topic, Action<EasyCachingMessage> action, CancellationToken cancellationToken = default(CancellationToken));

protected Action<EasyCachingMessage> _handler;

Expand Down Expand Up @@ -83,6 +84,13 @@ public void Subscribe(string topic, Action<EasyCachingMessage> action, Action re
BaseSubscribe(topic, action);
}

public async Task SubscribeAsync(string topic, Action<EasyCachingMessage> action, Action reconnectAction, CancellationToken cancellationToken = default(CancellationToken))
{
_handler = action;
_reconnectHandler = reconnectAction;
await BaseSubscribeAsync(topic, action);
}

public virtual void BaseOnMessage(EasyCachingMessage message)
{
var operationId = s_diagnosticListener.WriteSubscribeMessageBefore(new BeforeSubscribeMessageRequestEventData(message));
Expand Down
11 changes: 10 additions & 1 deletion src/EasyCaching.HybridCache/HybridCachingProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ string name
else this._distributedCache = distributed;

this._bus = bus ?? NullEasyCachingBus.Instance;
this._bus.Subscribe(_options.TopicName, OnMessage, OnReconnect);
_ = SubscribeAsync();

this._cacheId = Guid.NewGuid().ToString("N");

Expand All @@ -117,6 +117,15 @@ string name
_busAsyncWrap = Policy.WrapAsync(fallbackAsyncPolicy, retryAsyncPolicy);
}

/// <summary>
/// Subscribe the topic
/// </summary>
/// <returns></returns>
private async Task SubscribeAsync()
{
await _bus.SubscribeAsync(_options.TopicName, OnMessage, OnReconnect);
}

/// <summary>
/// Ons the message.
/// </summary>
Expand Down
5 changes: 5 additions & 0 deletions test/EasyCaching.UnitTests/Fake/FakeBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,10 @@ public void Subscribe(string topic, Action<EasyCachingMessage> action, Action re
{

}

public Task SubscribeAsync(string topic, Action<EasyCachingMessage> action, Action reconnectAction = null, CancellationToken cancellationToken = default(CancellationToken))
{
return Task.CompletedTask;
}
}
}