Skip to content

Commit c0293a0

Browse files
Mpdreamzcodebrain
authored andcommitted
Fix #3954 make sure BulkAll() is aborted properly if the whole bulk request keeps returning a bad statuscode (#4014)
Ensure BulkAll() is aborted properly if the whole bulk request keeps returning a bad statuscode.
1 parent 3134259 commit c0293a0

File tree

5 files changed

+107
-4
lines changed

5 files changed

+107
-4
lines changed

src/Nest/CommonAbstractions/Extensions/Extensions.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44
using System.Collections.ObjectModel;
55
using System.Linq;
66
using System.Reflection;
7+
using System.Runtime.ExceptionServices;
78
using System.Runtime.Serialization;
89
using System.Text;
910
using System.Threading;
1011
using System.Threading.Tasks;
12+
using Elasticsearch.Net;
1113
using Elasticsearch.Net.Utf8Json.Internal;
1214

1315
namespace Nest
@@ -218,6 +220,12 @@ internal static async Task ForEachAsync<TSource, TResult>(
218220
continue;
219221

220222
var task = await Task.WhenAny(tasks).ConfigureAwait(false);
223+
if (task.Exception != null
224+
&& (task.IsFaulted && task.Exception.Flatten().InnerExceptions.First() is Exception e))
225+
{
226+
ExceptionDispatchInfo.Capture(e).Throw();
227+
return;
228+
}
221229
tasks.Remove(task);
222230
}
223231

src/Nest/Document/Multiple/BulkAll/BulkAllObservable.cs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ public class BulkAllObservable<T> : IDisposable, IObservable<BulkAllResponse> wh
2222
private readonly Func<BulkResponseItemBase, T, bool> _retryPredicate;
2323
private Action _incrementFailed = () => { };
2424
private Action _incrementRetries = () => { };
25+
private Action<BulkResponse> _bulkResponseCallback;
2526

2627
public BulkAllObservable(
2728
IElasticClient client,
@@ -36,6 +37,8 @@ public BulkAllObservable(
3637
_bulkSize = _partitionedBulkRequest.Size ?? CoordinatedRequestDefaults.BulkAllSizeDefault;
3738
_retryPredicate = _partitionedBulkRequest.RetryDocumentPredicate ?? RetryBulkActionPredicate;
3839
_droppedDocumentCallBack = _partitionedBulkRequest.DroppedDocumentCallback ?? DroppedDocumentCallbackDefault;
40+
_bulkResponseCallback = _partitionedBulkRequest.BulkResponseCallback;
41+
3942
_maxDegreeOfParallelism =
4043
_partitionedBulkRequest.MaxDegreeOfParallelism ?? CoordinatedRequestDefaults.BulkAllMaxDegreeOfParallelismDefault;
4144
_compositeCancelTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
@@ -125,6 +128,8 @@ private async Task<BulkAllResponse> BulkAsync(IList<T> buffer, long page, int ba
125128
.ConfigureAwait(false);
126129

127130
_compositeCancelToken.ThrowIfCancellationRequested();
131+
132+
_bulkResponseCallback?.Invoke(response);
128133

129134
if (!response.ApiCall.Success)
130135
return await HandleBulkRequest(buffer, page, backOffRetries, response).ConfigureAwait(false);
@@ -167,28 +172,39 @@ private void HandleDroppedDocuments(List<Tuple<BulkResponseItemBase, T>> dropped
167172
private async Task<BulkAllResponse> HandleBulkRequest(IList<T> buffer, long page, int backOffRetries, BulkResponse response)
168173
{
169174
var clientException = response.ApiCall.OriginalException as ElasticsearchClientException;
170-
var failureReason = clientException?.FailureReason.GetValueOrDefault(PipelineFailure.Unexpected);
175+
var failureReason = clientException?.FailureReason;
176+
var reason = failureReason?.GetStringValue() ?? nameof(PipelineFailure.BadRequest);
171177
switch (failureReason)
172178
{
173179
case PipelineFailure.MaxRetriesReached:
174180
if (response.ApiCall.AuditTrail.Last().Event == AuditEvent.FailedOverAllNodes)
175181
throw ThrowOnBadBulk(response, $"BulkAll halted after attempted bulk failed over all the active nodes");
176182

183+
ThrowOnExhaustedRetries();
177184
return await RetryDocuments(page, ++backOffRetries, buffer).ConfigureAwait(false);
178185
case PipelineFailure.CouldNotStartSniffOnStartup:
179186
case PipelineFailure.BadAuthentication:
180187
case PipelineFailure.NoNodesAttempted:
181188
case PipelineFailure.SniffFailure:
182189
case PipelineFailure.Unexpected:
183-
throw ThrowOnBadBulk(response,
184-
$"BulkAll halted after {nameof(PipelineFailure)}{failureReason.GetStringValue()} from _bulk");
190+
throw ThrowOnBadBulk(response, $"BulkAll halted after {nameof(PipelineFailure)}.{reason} from _bulk");
185191
case PipelineFailure.BadResponse:
186192
case PipelineFailure.PingFailure:
187193
case PipelineFailure.MaxTimeoutReached:
188194
case PipelineFailure.BadRequest:
189195
default:
196+
ThrowOnExhaustedRetries();
190197
return await RetryDocuments(page, ++backOffRetries, buffer).ConfigureAwait(false);
191198
}
199+
200+
void ThrowOnExhaustedRetries()
201+
{
202+
if (_partitionedBulkRequest.ContinueAfterDroppedDocuments || backOffRetries < _backOffRetries) return;
203+
204+
throw ThrowOnBadBulk(response,
205+
$"BulkAll halted after {nameof(PipelineFailure)}.{reason} from _bulk and exhausting retries ({backOffRetries})"
206+
);
207+
}
192208
}
193209

194210
private async Task<BulkAllResponse> RetryDocuments(long page, int backOffRetries, IList<T> retryDocuments)

src/Nest/Document/Multiple/BulkAll/BulkAllRequest.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,12 @@ public interface IBulkAllRequest<T> where T : class
8585
/// non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1)
8686
/// </summary>
8787
int? WaitForActiveShards { get; set; }
88+
89+
/// <summary>
90+
/// Be notified every time a bulk response returns, this includes retries.
91+
/// <see cref="IObserver{T}.OnNext"/> is only called for successful batches.
92+
/// </summary>
93+
Action<BulkResponse> BulkResponseCallback { get; set; }
8894
}
8995

9096
public class BulkAllRequest<T> : IBulkAllRequest<T>
@@ -146,6 +152,9 @@ public BulkAllRequest(IEnumerable<T> documents)
146152

147153
/// <inheritdoc />
148154
public int? WaitForActiveShards { get; set; }
155+
156+
/// <inheritdoc />
157+
public Action<BulkResponse> BulkResponseCallback { get; set; }
149158
}
150159

151160
public class BulkAllDescriptor<T> : DescriptorBase<BulkAllDescriptor<T>, IBulkAllRequest<T>>, IBulkAllRequest<T>
@@ -177,6 +186,7 @@ public BulkAllDescriptor(IEnumerable<T> documents)
177186
int? IBulkAllRequest<T>.Size { get; set; }
178187
Time IBulkAllRequest<T>.Timeout { get; set; }
179188
int? IBulkAllRequest<T>.WaitForActiveShards { get; set; }
189+
Action<BulkResponse> IBulkAllRequest<T>.BulkResponseCallback { get; set; }
180190

181191
/// <inheritdoc cref="IBulkAllRequest{T}.MaxDegreeOfParallelism" />
182192
public BulkAllDescriptor<T> MaxDegreeOfParallelism(int? parallelism) =>
@@ -238,5 +248,9 @@ public BulkAllDescriptor<T> BackPressure(int maxConcurrency, int? backPressureFa
238248
/// <inheritdoc cref="IBulkAllRequest{T}.DroppedDocumentCallback" />
239249
public BulkAllDescriptor<T> DroppedDocumentCallback(Action<BulkResponseItemBase, T> callback) =>
240250
Assign(callback, (a, v) => a.DroppedDocumentCallback = v);
251+
252+
/// <inheritdoc cref="IBulkAllRequest{T}.BulkResponseCallback" />
253+
public BulkAllDescriptor<T> BulkResponseCallback(Action<BulkResponse> callback) =>
254+
Assign(callback, (a, v) => a.BulkResponseCallback = v);
241255
}
242256
}

src/Tests/Tests/Document/Multiple/BulkAll/BulkAllExceptionApiTests.cs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
using System;
22
using System.Threading;
33
using Elastic.Xunit.XunitPlumbing;
4+
using Elasticsearch.Net;
45
using FluentAssertions;
56
using Nest;
7+
using Tests.Core.Client.Settings;
68
using Tests.Core.ManagedElasticsearch.Clusters;
9+
using Tests.Domain.Extensions;
10+
using Tests.Framework.VirtualClustering;
711

812
namespace Tests.Document.Multiple.BulkAll
913
{
@@ -48,4 +52,65 @@ [I] public void WaitBulkAllThrowsAndIsCaught()
4852
}
4953
}
5054
}
55+
56+
57+
public class BulkAllBadRetriesApiTests : BulkAllApiTestsBase
58+
{
59+
public BulkAllBadRetriesApiTests(IntrusiveOperationCluster cluster) : base(cluster) { }
60+
61+
[U] public void Completes()
62+
{
63+
var client = VirtualClusterWith.Nodes(2)
64+
.ClientCalls(c => c.FailAlways())
65+
.StaticConnectionPool()
66+
.AllDefaults()
67+
.Client;
68+
69+
70+
var index = CreateIndexName();
71+
72+
var size = 1000;
73+
var pages = 10;
74+
var seenPages = 0;
75+
var numberOfDocuments = size * pages;
76+
var documents = CreateLazyStreamOfDocuments(numberOfDocuments);
77+
var requests = 0;
78+
79+
Exception ex = null;
80+
var tokenSource = new CancellationTokenSource();
81+
var observableBulk = client.BulkAll(documents, f => f
82+
.MaxDegreeOfParallelism(1)
83+
.BulkResponseCallback(r => Interlocked.Increment(ref requests))
84+
.BackOffTime(TimeSpan.FromMilliseconds(1))
85+
.BackOffRetries(2)
86+
.Size(size)
87+
.RefreshOnCompleted()
88+
.Index(index)
89+
.BufferToBulk((r, buffer) => r.IndexMany(buffer))
90+
, tokenSource.Token);
91+
92+
try
93+
{
94+
observableBulk.Wait(TimeSpan.FromSeconds(30), b =>
95+
{
96+
Interlocked.Increment(ref seenPages);
97+
});
98+
}
99+
catch (Exception e)
100+
{
101+
ex = e;
102+
}
103+
ex.Should().NotBeNull();
104+
105+
var clientException = ex.Should().BeOfType<ElasticsearchClientException>().Subject;
106+
107+
clientException.Message.Should()
108+
.StartWith("BulkAll halted after")
109+
.And.EndWith("from _bulk and exhausting retries (2)");
110+
111+
requests.Should().Be(3);
112+
// OnNext only called for successful batches.
113+
seenPages.Should().Be(0);
114+
}
115+
}
51116
}

src/Tests/Tests/Framework/VirtualClustering/VirtualizedCluster.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public VirtualizedCluster(TestableDateTimeProvider dateTimeProvider, ConnectionS
3030
}
3131

3232
public IConnectionPool ConnectionPool => Client.ConnectionSettings.ConnectionPool;
33-
private ElasticClient Client => _fixedRequestPipeline?.Client;
33+
public ElasticClient Client => _fixedRequestPipeline?.Client;
3434

3535
public VirtualizedCluster ClientProxiesTo(
3636
Func<IElasticClient, Func<RequestConfigurationDescriptor, IRequestConfiguration>, IResponse> sync,

0 commit comments

Comments
 (0)