Skip to content

Commit 93ead30

Browse files
Fixed issues with scheduling around partial items and results. This will fix flakey tests, as well as possible edge cases where partial items are consumed in rapid succession
1 parent 28f6068 commit 93ead30

36 files changed

+614
-156
lines changed

src/Client/LanguageClient.cs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Collections.Generic;
33
using System.Diagnostics.CodeAnalysis;
44
using System.Linq;
5+
using System.Reactive.Concurrency;
56
using System.Reactive.Disposables;
67
using System.Reactive.Linq;
78
using System.Reactive.Subjects;
@@ -48,6 +49,7 @@ public class LanguageClient : JsonRpcServerBase, ILanguageClient
4849
private readonly IEnumerable<IOnLanguageClientInitialized> _initializedHandlers;
4950
private readonly LspSerializer _serializer;
5051
private readonly InstanceHasStarted _instanceHasStarted;
52+
private readonly IScheduler _scheduler;
5153
private readonly IResponseRouter _responseRouter;
5254
private readonly ISubject<InitializeResult> _initializeComplete = new AsyncSubject<InitializeResult>();
5355
private readonly CompositeDisposable _disposable = new CompositeDisposable();
@@ -150,7 +152,8 @@ internal LanguageClient(
150152
IEnumerable<OnLanguageClientInitializedDelegate> initializedDelegates,
151153
IEnumerable<IOnLanguageClientInitialized> initializedHandlers,
152154
LspSerializer serializer,
153-
InstanceHasStarted instanceHasStarted
155+
InstanceHasStarted instanceHasStarted,
156+
IScheduler scheduler
154157
) : base(handlerCollection, responseRouter)
155158
{
156159
_connection = connection;
@@ -179,6 +182,7 @@ InstanceHasStarted instanceHasStarted
179182
_initializedHandlers = initializedHandlers;
180183
_serializer = serializer;
181184
_instanceHasStarted = instanceHasStarted;
185+
_scheduler = scheduler;
182186
_concurrency = options.Value.Concurrency;
183187

184188
// We need to at least create Window here in case any handler does loggin in their constructor
@@ -262,6 +266,7 @@ await LanguageProtocolEventingHelper.Run(
262266
_initializeHandlers.Union(_collection.Select(z => z.Handler).OfType<IOnLanguageClientInitialize>()),
263267
(handler, ct) => handler.OnInitialize(this, @params, ct),
264268
_concurrency,
269+
_scheduler,
265270
token
266271
).ConfigureAwait(false);
267272

@@ -281,6 +286,7 @@ await LanguageProtocolEventingHelper.Run(
281286
_initializedHandlers.Union(_collection.Select(z => z.Handler).OfType<IOnLanguageClientInitialized>()),
282287
(handler, ct) => handler.OnInitialized(this, @params, serverParams, ct),
283288
_concurrency,
289+
_scheduler,
284290
token
285291
).ConfigureAwait(false);
286292

@@ -299,6 +305,7 @@ await LanguageProtocolEventingHelper.Run(
299305
_startedHandlers.Union(_collection.Select(z => z.Handler).OfType<IOnLanguageClientStarted>()),
300306
(handler, ct) => handler.OnStarted(this, ct),
301307
_concurrency,
308+
_scheduler,
302309
token
303310
).ConfigureAwait(false);
304311

@@ -395,7 +402,7 @@ private Supports<T> UseOrTryAndFindCapability<T>(Supports<T> supports) where T :
395402
bool IResponseRouter.TryGetRequest(long id, [NotNullWhen(true)] out string method, [NotNullWhen(true)] out TaskCompletionSource<JToken> pendingTask) =>
396403
_responseRouter.TryGetRequest(id, out method, out pendingTask);
397404

398-
public Task<InitializeResult> WasStarted => _initializeComplete.ToTask();
405+
public Task<InitializeResult> WasStarted => _initializeComplete.ToTask(_scheduler);
399406

400407
public void Dispose()
401408
{

src/Client/LanguageClientOptionsExtensions.cs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Reactive.Concurrency;
23
using Microsoft.Extensions.Configuration;
34
using Microsoft.Extensions.DependencyInjection;
45
using Microsoft.Extensions.Logging;
@@ -87,6 +88,54 @@ public static LanguageClientOptions WithClientCapabilities(this LanguageClientOp
8788
return options;
8889
}
8990

91+
/// <summary>
92+
/// Sets both input and output schedulers to the same scheduler
93+
/// </summary>
94+
/// <param name="options"></param>
95+
/// <param name="inputScheduler"></param>
96+
/// <returns></returns>
97+
public static LanguageClientOptions WithScheduler(this LanguageClientOptions options, IScheduler inputScheduler)
98+
{
99+
options.InputScheduler = options.OutputScheduler = options.DefaultScheduler = inputScheduler;
100+
return options;
101+
}
102+
103+
/// <summary>
104+
/// Sets the scheduler used during reading input
105+
/// </summary>
106+
/// <param name="options"></param>
107+
/// <param name="inputScheduler"></param>
108+
/// <returns></returns>
109+
public static LanguageClientOptions WithInputScheduler(this LanguageClientOptions options, IScheduler inputScheduler)
110+
{
111+
options.InputScheduler = inputScheduler;
112+
return options;
113+
}
114+
115+
/// <summary>
116+
/// Sets the default scheduler to be used when scheduling other tasks
117+
/// </summary>
118+
/// <param name="options"></param>
119+
/// <param name="defaultScheduler"></param>
120+
/// <returns></returns>
121+
public static LanguageClientOptions WithDefaultScheduler(this LanguageClientOptions options, IScheduler defaultScheduler)
122+
{
123+
options.DefaultScheduler = defaultScheduler;
124+
return options;
125+
}
126+
127+
/// <summary>
128+
/// Sets the scheduler use during writing output
129+
/// </summary>
130+
/// <param name="options"></param>
131+
/// <param name="outputScheduler"></param>
132+
/// <returns></returns>
133+
public static LanguageClientOptions WithOutputScheduler(this LanguageClientOptions options, IScheduler outputScheduler)
134+
{
135+
options.OutputScheduler = outputScheduler;
136+
return options;
137+
}
138+
90139
public static LanguageClientOptions OnInitialize(this LanguageClientOptions options, OnLanguageClientInitializeDelegate @delegate)
91140
{
92141
options.Services.AddSingleton(@delegate);

src/Dap.Client/DebugAdapterClient.cs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Linq;
4+
using System.Reactive.Concurrency;
45
using System.Reactive.Disposables;
56
using System.Reactive.Subjects;
67
using System.Reactive.Threading.Tasks;
@@ -30,6 +31,7 @@ public class DebugAdapterClient : JsonRpcServerBase, IDebugAdapterClient, IDebug
3031
private readonly IEnumerable<OnDebugAdapterClientStartedDelegate> _startedDelegates;
3132
private readonly IEnumerable<IOnDebugAdapterClientStarted> _startedHandlers;
3233
private readonly InstanceHasStarted _instanceHasStarted;
34+
private readonly IScheduler _scheduler;
3335
private readonly CompositeDisposable _disposable = new CompositeDisposable();
3436
private readonly Connection _connection;
3537
private readonly DapReceiver _receiver;
@@ -97,7 +99,8 @@ internal DebugAdapterClient(
9799
IEnumerable<OnDebugAdapterClientInitializedDelegate> initializedDelegates,
98100
IEnumerable<IOnDebugAdapterClientInitialized> initializedHandlers,
99101
IEnumerable<IOnDebugAdapterClientStarted> startedHandlers,
100-
InstanceHasStarted instanceHasStarted
102+
InstanceHasStarted instanceHasStarted,
103+
IScheduler scheduler
101104
) : base(collection, responseRouter)
102105
{
103106
_settingsBag = settingsBag;
@@ -114,6 +117,7 @@ InstanceHasStarted instanceHasStarted
114117
_initializedHandlers = initializedHandlers;
115118
_startedHandlers = startedHandlers;
116119
_instanceHasStarted = instanceHasStarted;
120+
_scheduler = scheduler;
117121
_concurrency = options.Value.Concurrency;
118122

119123
_disposable.Add(collection.Add(this));
@@ -127,6 +131,7 @@ await DebugAdapterEventingHelper.Run(
127131
_initializeHandlers.Union(_collection.Select(z => z.Handler).OfType<IOnDebugAdapterClientInitialize>()),
128132
(handler, ct) => handler.OnInitialize(this, ClientSettings, ct),
129133
_concurrency,
134+
_scheduler,
130135
token
131136
).ConfigureAwait(false);
132137

@@ -145,17 +150,19 @@ await DebugAdapterEventingHelper.Run(
145150
_initializedHandlers.Union(_collection.Select(z => z.Handler).OfType<IOnDebugAdapterClientInitialized>()),
146151
(handler, ct) => handler.OnInitialized(this, ClientSettings, ServerSettings, ct),
147152
_concurrency,
153+
_scheduler,
148154
token
149155
).ConfigureAwait(false);
150156

151-
await _initializedComplete.ToTask(token);
157+
await _initializedComplete.ToTask(token, _scheduler);
152158

153159
await DebugAdapterEventingHelper.Run(
154160
_startedDelegates,
155161
(handler, ct) => handler(this, ct),
156162
_startedHandlers.Union(_collection.Select(z => z.Handler).OfType<IOnDebugAdapterClientStarted>()),
157163
(handler, ct) => handler.OnStarted(this, ct),
158164
_concurrency,
165+
_scheduler,
159166
token
160167
).ConfigureAwait(false);
161168

src/Dap.Client/DebugAdapterClientOptionsExtensions.cs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Reactive.Concurrency;
23
using Microsoft.Extensions.Configuration;
34
using Microsoft.Extensions.DependencyInjection;
45
using Microsoft.Extensions.Logging;
@@ -21,6 +22,54 @@ public static DebugAdapterClientOptions WithRequestProcessIdentifier(this DebugA
2122
return options;
2223
}
2324

25+
/// <summary>
26+
/// Sets both input and output schedulers to the same scheduler
27+
/// </summary>
28+
/// <param name="options"></param>
29+
/// <param name="inputScheduler"></param>
30+
/// <returns></returns>
31+
public static DebugAdapterClientOptions WithScheduler(this DebugAdapterClientOptions options, IScheduler inputScheduler)
32+
{
33+
options.InputScheduler = options.OutputScheduler = options.DefaultScheduler = inputScheduler;
34+
return options;
35+
}
36+
37+
/// <summary>
38+
/// Sets the scheduler used during reading input
39+
/// </summary>
40+
/// <param name="options"></param>
41+
/// <param name="inputScheduler"></param>
42+
/// <returns></returns>
43+
public static DebugAdapterClientOptions WithInputScheduler(this DebugAdapterClientOptions options, IScheduler inputScheduler)
44+
{
45+
options.InputScheduler = inputScheduler;
46+
return options;
47+
}
48+
49+
/// <summary>
50+
/// Sets the default scheduler to be used when scheduling other tasks
51+
/// </summary>
52+
/// <param name="options"></param>
53+
/// <param name="defaultScheduler"></param>
54+
/// <returns></returns>
55+
public static DebugAdapterClientOptions WithDefaultScheduler(this DebugAdapterClientOptions options, IScheduler defaultScheduler)
56+
{
57+
options.DefaultScheduler = defaultScheduler;
58+
return options;
59+
}
60+
61+
/// <summary>
62+
/// Sets the scheduler use during writing output
63+
/// </summary>
64+
/// <param name="options"></param>
65+
/// <param name="outputScheduler"></param>
66+
/// <returns></returns>
67+
public static DebugAdapterClientOptions WithOutputScheduler(this DebugAdapterClientOptions options, IScheduler outputScheduler)
68+
{
69+
options.OutputScheduler = outputScheduler;
70+
return options;
71+
}
72+
2473
public static DebugAdapterClientOptions OnInitialize(this DebugAdapterClientOptions options, OnDebugAdapterClientInitializeDelegate @delegate)
2574
{
2675
options.Services.AddSingleton(@delegate);

src/Dap.Server/DebugAdapterServer.cs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Linq;
4+
using System.Reactive.Concurrency;
45
using System.Reactive.Disposables;
56
using System.Reactive.Subjects;
67
using System.Reactive.Threading.Tasks;
@@ -30,6 +31,7 @@ public class DebugAdapterServer : JsonRpcServerBase, IDebugAdapterServer, IDebug
3031
private readonly IEnumerable<OnDebugAdapterServerStartedDelegate> _startedDelegates;
3132
private readonly IEnumerable<IOnDebugAdapterServerStarted> _startedHandlers;
3233
private readonly InstanceHasStarted _instanceHasStarted;
34+
private readonly IScheduler _scheduler;
3335
private readonly IServiceProvider _serviceProvider;
3436
private readonly CompositeDisposable _disposable = new CompositeDisposable();
3537
private readonly Connection _connection;
@@ -100,7 +102,8 @@ internal DebugAdapterServer(
100102
IEnumerable<IOnDebugAdapterServerInitialize> initializeHandlers,
101103
IEnumerable<IOnDebugAdapterServerInitialized> initializedHandlers,
102104
IEnumerable<IOnDebugAdapterServerStarted> startedHandlers,
103-
InstanceHasStarted instanceHasStarted
105+
InstanceHasStarted instanceHasStarted,
106+
IScheduler scheduler
104107
) : base(collection, responseRouter)
105108
{
106109
_capabilities = capabilities;
@@ -117,6 +120,7 @@ InstanceHasStarted instanceHasStarted
117120
_initializedHandlers = initializedHandlers;
118121
_startedHandlers = startedHandlers;
119122
_instanceHasStarted = instanceHasStarted;
123+
_scheduler = scheduler;
120124
_concurrency = options.Value.Concurrency;
121125

122126
_disposable.Add(collection.Add(this));
@@ -142,14 +146,15 @@ public async Task Initialize(CancellationToken token)
142146
_connection.Open();
143147
try
144148
{
145-
_initializingTask = _initializeComplete.ToTask(token);
149+
_initializingTask = _initializeComplete.ToTask(token, _scheduler);
146150
await _initializingTask.ConfigureAwait(false);
147151
await DebugAdapterEventingHelper.Run(
148152
_startedDelegates,
149153
(handler, ct) => handler(this, ct),
150154
_startedHandlers.Union(_collection.Select(z => z.Handler).OfType<IOnDebugAdapterServerStarted>()),
151155
(handler, ct) => handler.OnStarted(this, ct),
152156
_concurrency,
157+
_scheduler,
153158
token
154159
).ConfigureAwait(false);
155160
_instanceHasStarted.Started = true;
@@ -181,6 +186,7 @@ await DebugAdapterEventingHelper.Run(
181186
_initializeHandlers.Union(_collection.Select(z => z.Handler).OfType<IOnDebugAdapterServerInitialize>()),
182187
(handler, ct) => handler.OnInitialize(this, request, ct),
183188
_concurrency,
189+
_scheduler,
184190
cancellationToken
185191
).ConfigureAwait(false);
186192

@@ -231,6 +237,7 @@ await DebugAdapterEventingHelper.Run(
231237
_initializedHandlers.Union(_collection.Select(z => z.Handler).OfType<IOnDebugAdapterServerInitialized>()),
232238
(handler, ct) => handler.OnInitialized(this, request, response, ct),
233239
_concurrency,
240+
_scheduler,
234241
cancellationToken
235242
).ConfigureAwait(false);
236243

src/Dap.Server/DebugAdapterServerOptionsExtensions.cs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Reactive.Concurrency;
23
using Microsoft.Extensions.Configuration;
34
using Microsoft.Extensions.DependencyInjection;
45
using Microsoft.Extensions.Logging;
@@ -21,6 +22,54 @@ public static DebugAdapterServerOptions WithRequestProcessIdentifier(this DebugA
2122
return options;
2223
}
2324

25+
/// <summary>
26+
/// Sets both input and output schedulers to the same scheduler
27+
/// </summary>
28+
/// <param name="options"></param>
29+
/// <param name="inputScheduler"></param>
30+
/// <returns></returns>
31+
public static DebugAdapterServerOptions WithScheduler(this DebugAdapterServerOptions options, IScheduler inputScheduler)
32+
{
33+
options.InputScheduler = options.OutputScheduler = options.DefaultScheduler = inputScheduler;
34+
return options;
35+
}
36+
37+
/// <summary>
38+
/// Sets the scheduler used during reading input
39+
/// </summary>
40+
/// <param name="options"></param>
41+
/// <param name="inputScheduler"></param>
42+
/// <returns></returns>
43+
public static DebugAdapterServerOptions WithInputScheduler(this DebugAdapterServerOptions options, IScheduler inputScheduler)
44+
{
45+
options.InputScheduler = inputScheduler;
46+
return options;
47+
}
48+
49+
/// <summary>
50+
/// Sets the default scheduler to be used when scheduling other tasks
51+
/// </summary>
52+
/// <param name="options"></param>
53+
/// <param name="defaultScheduler"></param>
54+
/// <returns></returns>
55+
public static DebugAdapterServerOptions WithDefaultScheduler(this DebugAdapterServerOptions options, IScheduler defaultScheduler)
56+
{
57+
options.DefaultScheduler = defaultScheduler;
58+
return options;
59+
}
60+
61+
/// <summary>
62+
/// Sets the scheduler use during writing output
63+
/// </summary>
64+
/// <param name="options"></param>
65+
/// <param name="outputScheduler"></param>
66+
/// <returns></returns>
67+
public static DebugAdapterServerOptions WithOutputScheduler(this DebugAdapterServerOptions options, IScheduler outputScheduler)
68+
{
69+
options.OutputScheduler = outputScheduler;
70+
return options;
71+
}
72+
2473
public static DebugAdapterServerOptions OnInitialize(this DebugAdapterServerOptions options, OnDebugAdapterServerInitializeDelegate @delegate)
2574
{
2675
options.Services.AddSingleton(@delegate);

0 commit comments

Comments
 (0)