Skip to content

Commit 6b8c63f

Browse files
authored
coordinated request observables when aborted could bubble out an UnexpectedElasticsearchClientException or a OperationCancelledException, this normalizes what gets passed to OnError (OperationCancelledException) (#4027)
1 parent eae2bef commit 6b8c63f

File tree

5 files changed

+22
-5
lines changed

5 files changed

+22
-5
lines changed

src/Elasticsearch.Net/Transport/Pipeline/RequestPipeline.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,8 @@ public async Task FirstPoolUsageAsync(SemaphoreSlim semaphore, CancellationToken
287287
{
288288
if (!FirstPoolUsageNeedsSniffing) return;
289289

290+
// TODO cancellationToken could throw here and will bubble out as OperationCancelledException
291+
// everywhere else it would bubble out wrapped in a `UnexpectedElasticsearchClientException`
290292
var success = await semaphore.WaitAsync(_settings.RequestTimeout, cancellationToken).ConfigureAwait(false);
291293
if (!success)
292294
{

src/Elasticsearch.Net/Transport/Transport.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,9 @@ public async Task<TResponse> RequestAsync<TResponse>(HttpMethod method, string p
143143
}
144144
catch (Exception killerException)
145145
{
146+
if (killerException is OperationCanceledException && cancellationToken.IsCancellationRequested)
147+
pipeline.AuditCancellationRequested();
148+
146149
throw new UnexpectedElasticsearchClientException(killerException, seenExceptions)
147150
{
148151
Request = requestData,

src/Nest/CommonAbstractions/Reactive/CoordinatedRequestObserverBase.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using Elasticsearch.Net;
23

34
namespace Nest
45
{
@@ -27,7 +28,15 @@ protected CoordinatedRequestObserverBase(Action<T> onNext = null, Action<Excepti
2728

2829
public void OnCompleted() => _completed?.Invoke();
2930

30-
public void OnError(Exception error) => _onError?.Invoke(error);
31+
public void OnError(Exception error)
32+
{
33+
// This normalizes task cancellation exceptions for observables
34+
// If a task cancellation happens in the client it bubbles out as a UnexpectedElasticsearchClientException
35+
// where as inside our IObservable implementation we .ThrowIfCancellationRequested() directly.
36+
if (error is UnexpectedElasticsearchClientException es && es.InnerException != null && es.InnerException is OperationCanceledException c)
37+
_onError?.Invoke(c);
38+
else _onError?.Invoke(error);
39+
}
3140

3241
public void OnNext(T value) => _onNext?.Invoke(value);
3342
}

src/Tests/Tests/Document/Multiple/BulkAll/BulkAllCancellationTokenApiTests.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,13 @@ public void CancelBulkAll()
4545
//when we subscribe the observable becomes hot
4646
observableBulk.Subscribe(bulkObserver);
4747

48-
//we wait Nseconds to see some bulks
48+
//we wait N seconds to see some bulks
4949
handle.WaitOne(TimeSpan.FromSeconds(3));
5050
tokenSource.Cancel();
51-
//we wait Nseconds to give in flight request a chance to cancel
51+
//we wait N seconds to give in flight request a chance to cancel
5252
handle.WaitOne(TimeSpan.FromSeconds(3));
53-
if (ex != null && !(ex is TaskCanceledException) && !(ex is OperationCanceledException)) throw ex;
53+
54+
if (ex != null && !(ex is OperationCanceledException)) throw ex;
5455

5556
seenPages.Should().BeLessThan(pages).And.BeGreaterThan(0);
5657
var count = Client.Count<SmallObject>(f => f.Index(index));

src/Tests/Tests/Document/Multiple/BulkAll/BulkAllDisposeApiTests.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Threading;
33
using System.Threading.Tasks;
44
using Elastic.Xunit.XunitPlumbing;
5+
using Elasticsearch.Net;
56
using FluentAssertions;
67
using Nest;
78
using Tests.Core.ManagedElasticsearch.Clusters;
@@ -50,7 +51,8 @@ public void DisposingObservableCancelsBulkAll()
5051
observableBulk.Dispose();
5152
//we wait N seconds to give in flight request a chance to cancel
5253
handle.WaitOne(TimeSpan.FromSeconds(3));
53-
if (ex != null && !(ex is TaskCanceledException) && !(ex is OperationCanceledException)) throw ex;
54+
55+
if (ex != null && !(ex is OperationCanceledException)) throw ex;
5456

5557
seenPages.Should().BeLessThan(pages).And.BeGreaterThan(0);
5658
var count = Client.Count<SmallObject>(f => f.Index(index));

0 commit comments

Comments
 (0)