Skip to content

Commit 25226cc

Browse files
authored
Expose BulkResponseItems on BulkAllResponse (#3598)
This commit exposes the bulk response items on the BulkAllResponse. Closes #3487
1 parent a0eea41 commit 25226cc

File tree

3 files changed

+51
-26
lines changed

3 files changed

+51
-26
lines changed

src/Nest/Document/Multiple/BulkAll/BulkAllObservable.cs

Lines changed: 32 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -112,18 +112,18 @@ private async Task<IBulkAllResponse> BulkAsync(IList<T> buffer, long page, int b
112112
{
113113
_compositeCancelToken.ThrowIfCancellationRequested();
114114

115-
var r = _partitionedBulkRequest;
115+
var request = _partitionedBulkRequest;
116116
var response = await _client.BulkAsync(s =>
117117
{
118-
s.Index(r.Index).Type(r.Type);
119-
if (r.BufferToBulk != null) r.BufferToBulk(s, buffer);
118+
s.Index(request.Index).Type(request.Type);
119+
if (request.BufferToBulk != null) request.BufferToBulk(s, buffer);
120120
else s.IndexMany(buffer);
121-
if (!string.IsNullOrEmpty(r.Pipeline)) s.Pipeline(r.Pipeline);
121+
if (!string.IsNullOrEmpty(request.Pipeline)) s.Pipeline(request.Pipeline);
122122
#pragma warning disable 618
123-
if (r.Refresh.HasValue) s.Refresh(r.Refresh.Value);
123+
if (request.Refresh.HasValue) s.Refresh(request.Refresh.Value);
124124
#pragma warning restore 618
125-
if (r.Routing != null) s.Routing(r.Routing);
126-
if (r.WaitForActiveShards.HasValue) s.WaitForActiveShards(r.WaitForActiveShards.ToString());
125+
if (request.Routing != null) s.Routing(request.Routing);
126+
if (request.WaitForActiveShards.HasValue) s.WaitForActiveShards(request.WaitForActiveShards.ToString());
127127

128128
return s;
129129
}, _compositeCancelToken)
@@ -134,29 +134,38 @@ private async Task<IBulkAllResponse> BulkAsync(IList<T> buffer, long page, int b
134134
if (!response.ApiCall.Success)
135135
return await HandleBulkRequest(buffer, page, backOffRetries, response);
136136

137-
var documentsWithResponse = response.Items.Zip(buffer, Tuple.Create).ToList();
137+
var successfulDocuments = new List<Tuple<IBulkResponseItem, T>>();
138+
var retryableDocuments = new List<T>();
139+
var droppedDocuments = new List<Tuple<IBulkResponseItem, T>>();
138140

139-
HandleDroppedDocuments(documentsWithResponse, response);
141+
foreach (var documentWithResponse in response.Items.Zip(buffer, Tuple.Create))
142+
{
143+
if (documentWithResponse.Item1.IsValid)
144+
successfulDocuments.Add(documentWithResponse);
145+
else
146+
{
147+
if (_retryPredicate(documentWithResponse.Item1, documentWithResponse.Item2))
148+
retryableDocuments.Add(documentWithResponse.Item2);
149+
else
150+
droppedDocuments.Add(documentWithResponse);
151+
}
152+
}
140153

141-
var retryDocuments = documentsWithResponse
142-
.Where(x => !x.Item1.IsValid && _retryPredicate(x.Item1, x.Item2))
143-
.Select(x => x.Item2)
144-
.ToList();
154+
HandleDroppedDocuments(droppedDocuments, response);
145155

146-
if (retryDocuments.Count > 0 && backOffRetries < _backOffRetries)
147-
return await RetryDocuments(page, ++backOffRetries, retryDocuments);
148-
else if (retryDocuments.Count > 0)
156+
if (retryableDocuments.Count > 0 && backOffRetries < _backOffRetries)
157+
return await RetryDocuments(page, ++backOffRetries, retryableDocuments).ConfigureAwait(false);
158+
159+
if (retryableDocuments.Count > 0)
149160
throw ThrowOnBadBulk(response, $"Bulk indexing failed and after retrying {backOffRetries} times");
150161

151-
_partitionedBulkRequest.BackPressure?.Release();
152-
return new BulkAllResponse { Retries = backOffRetries, Page = page };
162+
request.BackPressure?.Release();
163+
164+
return new BulkAllResponse { Retries = backOffRetries, Page = page, Items = response.Items };
153165
}
154166

155-
private void HandleDroppedDocuments(List<Tuple<IBulkResponseItem, T>> documentsWithResponse, IBulkResponse response)
167+
private void HandleDroppedDocuments(List<Tuple<IBulkResponseItem, T>> droppedDocuments, IBulkResponse response)
156168
{
157-
var droppedDocuments = documentsWithResponse
158-
.Where(x => !x.Item1.IsValid && !_retryPredicate(x.Item1, x.Item2))
159-
.ToList();
160169
if (droppedDocuments.Count <= 0) return;
161170

162171
foreach (var dropped in droppedDocuments) _droppedDocumentCallBack(dropped.Item1, dropped.Item2);
@@ -185,7 +194,7 @@ private async Task<IBulkAllResponse> HandleBulkRequest(IList<T> buffer, long pag
185194
throw ThrowOnBadBulk(response,
186195
$"BulkAll halted after {nameof(PipelineFailure)}{failureReason.GetStringValue()} from _bulk");
187196
default:
188-
return await RetryDocuments(page, ++backOffRetries, buffer);
197+
return await RetryDocuments(page, ++backOffRetries, buffer).ConfigureAwait(false);
189198
}
190199
}
191200

src/Nest/Document/Multiple/BulkAll/BulkAllResponse.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
using Newtonsoft.Json;
1+
using System;
2+
using System.Collections.Generic;
3+
using Newtonsoft.Json;
24

35
namespace Nest
46
{
@@ -12,6 +14,9 @@ public interface IBulkAllResponse
1214

1315
/// <summary>The number of back off retries were needed to store this document.</summary>
1416
int Retries { get; }
17+
18+
/// <summary>The items returned from the bulk response</summary>
19+
IReadOnlyCollection<IBulkResponseItem> Items { get; }
1520
}
1621

1722
/// <inheritdoc />
@@ -26,5 +31,8 @@ public class BulkAllResponse : IBulkAllResponse
2631

2732
/// <inheritdoc />
2833
public int Retries { get; internal set; }
34+
35+
/// <inheritdoc />
36+
public IReadOnlyCollection<IBulkResponseItem> Items { get; internal set; }
2937
}
3038
}

src/Tests/Tests/Document/Multiple/BulkAll/BulkAndScrollApiTests.cs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ private void ScrollAll(string index, int size, int numberOfShards, int numberOfD
5050

5151
seenDocuments.Should().Be(numberOfDocuments);
5252
var groups = seenSlices.GroupBy(s => s).ToList();
53-
groups.Count().Should().Be(numberOfShards);
53+
groups.Count.Should().Be(numberOfShards);
5454
groups.Should().OnlyContain(g => g.Count() > 1);
5555
}
5656

@@ -70,7 +70,15 @@ private void BulkAll(string index, IEnumerable<SmallObject> documents, int size,
7070
.Index(index)
7171
);
7272
//we set up an observer
73-
var bulkObserver = observableBulk.Wait(TimeSpan.FromMinutes(5), b => Interlocked.Increment(ref seenPages));
73+
var bulkObserver = observableBulk.Wait(TimeSpan.FromMinutes(5), b =>
74+
{
75+
Interlocked.Increment(ref seenPages);
76+
foreach (var item in b.Items)
77+
{
78+
item.IsValid.Should().BeTrue();
79+
item.Id.Should().NotBeNullOrEmpty();
80+
}
81+
});
7482

7583
droppedDocuments.Take(10).Should().BeEmpty();
7684
bulkObserver.TotalNumberOfFailedBuffers.Should().Be(0, "All buffers are expected to be indexed");

0 commit comments

Comments
 (0)