Skip to content

Commit 3ddfa0f

Browse files
Fixed schedulers for replay subjects
1 parent 0dae323 commit 3ddfa0f

20 files changed

+401
-20
lines changed

src/Client/LanguageClientRegistrationManager.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Collections.Concurrent;
33
using System.Collections.Generic;
44
using System.Linq;
5+
using System.Reactive.Concurrency;
56
using System.Reactive.Linq;
67
using System.Reactive.Subjects;
78
using System.Threading;
@@ -26,7 +27,7 @@ internal class LanguageClientRegistrationManager : IRegisterCapabilityHandler, I
2627
private readonly ILspHandlerTypeDescriptorProvider _handlerTypeDescriptorProvider;
2728
private readonly ILogger<LanguageClientRegistrationManager> _logger;
2829
private readonly ConcurrentDictionary<string, Registration> _registrations;
29-
private readonly ReplaySubject<IEnumerable<Registration>> _registrationSubject = new ReplaySubject<IEnumerable<Registration>>(1);
30+
private readonly ReplaySubject<IEnumerable<Registration>> _registrationSubject = new ReplaySubject<IEnumerable<Registration>>(1, Scheduler.Immediate);
3031

3132
public LanguageClientRegistrationManager(
3233
ISerializer serializer,

src/Client/LanguageClientWorkspaceFoldersManager.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Collections.Concurrent;
33
using System.Collections.Generic;
44
using System.Linq;
5+
using System.Reactive.Concurrency;
56
using System.Reactive.Linq;
67
using System.Reactive.Subjects;
78
using System.Threading;
@@ -25,7 +26,7 @@ public LanguageClientWorkspaceFoldersManager(IWorkspaceLanguageClient client, IE
2526
{
2627
_client = client;
2728
_workspaceFolders = new ConcurrentDictionary<DocumentUri, WorkspaceFolder>(DocumentUri.Comparer);
28-
_workspaceFoldersSubject = new ReplaySubject<IEnumerable<WorkspaceFolder>>(1);
29+
_workspaceFoldersSubject = new ReplaySubject<IEnumerable<WorkspaceFolder>>(1, Scheduler.Immediate);
2930

3031
foreach (var folder in workspaceFolders)
3132
{

src/Dap.Client/ProgressObservable.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Reactive.Concurrency;
23
using System.Reactive.Disposables;
34
using System.Reactive.Subjects;
45
using OmniSharp.Extensions.DebugAdapter.Protocol;
@@ -14,7 +15,7 @@ internal class ProgressObservable : IProgressObservable, IObserver<ProgressEvent
1415

1516
public ProgressObservable(ProgressToken token)
1617
{
17-
_dataSubject = new ReplaySubject<ProgressEvent>(1);
18+
_dataSubject = new ReplaySubject<ProgressEvent>(1, Scheduler.Immediate);
1819
_disposable = new CompositeDisposable { Disposable.Create(_dataSubject.OnCompleted) };
1920

2021
ProgressToken = token;

src/JsonRpc/OutputHandler.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ ILogger<OutputHandler> logger
3939
_outputFilters = outputFilters.ToArray();
4040
_logger = logger;
4141
_queue = new Subject<object>();
42-
_delayedQueue = new ReplaySubject<object>();
42+
_delayedQueue = new ReplaySubject<object>(Scheduler.Immediate);
4343
_outputIsFinished = new TaskCompletionSource<object?>();
4444

4545
_disposable = new CompositeDisposable {

src/JsonRpc/ProcessScheduler.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ IScheduler scheduler
3434

3535
var observableQueue =
3636
new BehaviorSubject<(RequestProcessType type, ReplaySubject<IObservable<Unit>> observer, Subject<Unit>? contentModifiedSource)>(
37-
( RequestProcessType.Serial, new ReplaySubject<IObservable<Unit>>(int.MaxValue), supportContentModified ? new Subject<Unit>() : null )
37+
( RequestProcessType.Serial, new ReplaySubject<IObservable<Unit>>(int.MaxValue, Scheduler.Immediate), supportContentModified ? new Subject<Unit>() : null )
3838
);
3939

4040
cd.Add(
@@ -52,7 +52,7 @@ IScheduler scheduler
5252

5353
logger.LogDebug("Completing existing request process type {Type}", observableQueue.Value.type);
5454
observableQueue.Value.observer.OnCompleted();
55-
observableQueue.OnNext(( item.type, new ReplaySubject<IObservable<Unit>>(int.MaxValue), supportContentModified ? new Subject<Unit>() : null ));
55+
observableQueue.OnNext(( item.type, new ReplaySubject<IObservable<Unit>>(int.MaxValue, Scheduler.Immediate), supportContentModified ? new Subject<Unit>() : null ));
5656
}
5757

5858
logger.LogDebug("Queueing {Type}:{Name} request for processing", item.type, item.name);

src/Protocol/Progress/PartialItemsRequestProgressObservable.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ Action onCompleteAction
3636
)
3737
{
3838
_serializer = serializer;
39-
_dataSubject = new ReplaySubject<IEnumerable<TItem>>(int.MaxValue);
39+
_dataSubject = new ReplaySubject<IEnumerable<TItem>>(int.MaxValue, Scheduler.Immediate);
4040
_disposable = new CompositeDisposable() { _dataSubject };
4141

4242
_task = Observable.Create<TResult>(

src/Protocol/Progress/ProgressObservable.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Reactive.Concurrency;
23
using System.Reactive.Disposables;
34
using System.Reactive.Linq;
45
using System.Reactive.Subjects;
@@ -16,7 +17,7 @@ internal class ProgressObservable<T> : IProgressObservable<T>, IObserver<JToken>
1617
public ProgressObservable(ProgressToken token, Func<JToken, T> factory, Action disposal)
1718
{
1819
_factory = factory;
19-
_dataSubject = new ReplaySubject<JToken>(1);
20+
_dataSubject = new ReplaySubject<JToken>(1, Scheduler.Immediate);
2021
_disposable = new CompositeDisposable { Disposable.Create(_dataSubject.OnCompleted), Disposable.Create(disposal) };
2122

2223
ProgressToken = token;

src/Server/LanguageServerWorkspaceFolderManager.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Collections.Concurrent;
33
using System.Collections.Generic;
44
using System.Linq;
5+
using System.Reactive.Concurrency;
56
using System.Reactive.Linq;
67
using System.Reactive.Subjects;
78
using System.Threading;
@@ -27,7 +28,7 @@ public LanguageServerWorkspaceFolderManager(IWorkspaceLanguageServer server)
2728
{
2829
_server = server;
2930
_workspaceFolders = new ConcurrentDictionary<DocumentUri, WorkspaceFolder>(DocumentUri.Comparer);
30-
_workspaceFoldersSubject = new ReplaySubject<IEnumerable<WorkspaceFolder>>(1);
31+
_workspaceFoldersSubject = new ReplaySubject<IEnumerable<WorkspaceFolder>>(1, Scheduler.Immediate);
3132
_workspaceFoldersChangedSubject = new Subject<WorkspaceFolderChange>();
3233
}
3334

test/Lsp.Tests/Integration/DynamicRegistrationTests.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ await TestHelper.DelayUntil(
119119
registrations => registrations.Any(registration => SelectorMatches(registration, x => x.HasLanguage && x.Language == "vb")),
120120
CancellationToken
121121
);
122+
123+
await Task.Delay(200);
122124
disposable.Dispose();
123125

124126

@@ -127,6 +129,7 @@ await TestHelper.DelayUntil(
127129
registrations => !registrations.Any(registration => SelectorMatches(registration, x => x.HasLanguage && x.Language == "vb")),
128130
CancellationToken
129131
);
132+
await Task.Delay(200);
130133

131134
client.RegistrationManager.CurrentRegistrations.Should().NotContain(
132135
x =>

test/Lsp.Tests/Integration/LanguageServerConfigurationTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,15 +254,15 @@ public async Task Should_Support_Options_Monitor()
254254
// IOptionsMonitor<> is registered as a singleton, so this will update
255255
options.CurrentValue.Host.Should().Be("localhost");
256256
options.CurrentValue.Port.Should().Be(443);
257-
sub.Received(1).Invoke(Arg.Any<BinderSourceUrl>());
257+
sub.Received(Quantity.AtLeastOne()).Invoke(Arg.Any<BinderSourceUrl>());
258258

259259
configuration.Update("mysection", new Dictionary<string, string> { ["host"] = "127.0.0.1", ["port"] = "80" });
260260
await options.WaitForChange(CancellationToken);
261261
await SettleNext();
262262

263263
options.CurrentValue.Host.Should().Be("127.0.0.1");
264264
options.CurrentValue.Port.Should().Be(80);
265-
sub.Received(2).Invoke(Arg.Any<BinderSourceUrl>());
265+
sub.Received(Quantity.Within(2, int.MaxValue)).Invoke(Arg.Any<BinderSourceUrl>());
266266
}
267267

268268
class BinderSourceUrl

0 commit comments

Comments
 (0)