Skip to content

Commit a5ed86d

Browse files
committed
review comments
1 parent d995da5 commit a5ed86d

File tree

5 files changed

+61
-61
lines changed

5 files changed

+61
-61
lines changed

server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,12 @@ private void handlePrimaryResult(final PrimaryResultT primaryResult) {
137137

138138
@Override
139139
public void onResponse(Void aVoid) {
140-
updateCheckPoints(primary.routingEntry(), primary::localCheckpoint, primary::globalCheckpoint);
140+
successfulShards.incrementAndGet();
141+
try {
142+
updateCheckPoints(primary.routingEntry(), primary::localCheckpoint, primary::globalCheckpoint);
143+
} finally {
144+
decPendingAndFinishIfNeeded();
145+
}
141146
}
142147

143148
@Override
@@ -186,7 +191,12 @@ private void performOnReplica(final ShardRouting shard, final ReplicaRequest rep
186191
new ActionListener<>() {
187192
@Override
188193
public void onResponse(ReplicaResponse response) {
189-
updateCheckPoints(shard, response::localCheckpoint, response::globalCheckpoint);
194+
successfulShards.incrementAndGet();
195+
try {
196+
updateCheckPoints(shard, response::localCheckpoint, response::globalCheckpoint);
197+
} finally {
198+
decPendingAndFinishIfNeeded();
199+
}
190200
}
191201

192202
@Override
@@ -213,7 +223,6 @@ public String toString() {
213223
}
214224

215225
private void updateCheckPoints(ShardRouting shard, LongSupplier localCheckpointSupplier, LongSupplier globalCheckpointSupplier) {
216-
successfulShards.incrementAndGet();
217226
try {
218227
primary.updateLocalCheckpointForShard(shard.allocationId().getId(), localCheckpointSupplier.getAsLong());
219228
primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), globalCheckpointSupplier.getAsLong());
@@ -223,8 +232,6 @@ private void updateCheckPoints(ShardRouting shard, LongSupplier localCheckpointS
223232
// fail the primary but fall through and let the rest of operation processing complete
224233
final String message = String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard);
225234
primary.failShard(message, e);
226-
} finally {
227-
decPendingAndFinishIfNeeded();
228235
}
229236
}
230237

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -344,15 +344,30 @@ public void handleException(TransportException exp) {
344344
} else {
345345
setPhase(replicationTask, "primary");
346346

347-
final ActionListener<Response> referenceClosingListener = ActionListener.wrap(response -> {
347+
final ActionListener<Response> responseListener = ActionListener.wrap(response -> {
348+
adaptResponse(response, primaryShardReference.indexShard);
349+
350+
if (syncGlobalCheckpointAfterOperation) {
351+
try {
352+
primaryShardReference.indexShard.maybeSyncGlobalCheckpoint("post-operation");
353+
} catch (final Exception e) {
354+
// only log non-closed exceptions
355+
if (ExceptionsHelper.unwrap(
356+
e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
357+
// intentionally swallow, a missed global checkpoint sync should not fail this operation
358+
logger.info(
359+
new ParameterizedMessage(
360+
"{} failed to execute post-operation global checkpoint sync",
361+
primaryShardReference.indexShard.shardId()), e);
362+
}
363+
}
364+
}
365+
348366
primaryShardReference.close(); // release shard operation lock before responding to caller
349367
setPhase(replicationTask, "finished");
350368
onCompletionListener.onResponse(response);
351369
}, e -> handleException(primaryShardReference, e));
352370

353-
final ActionListener<Response> responseListener = getResponseActionListener(referenceClosingListener,
354-
primaryShardReference.indexShard);
355-
356371
new ReplicationOperation<>(primaryRequest.getRequest(), primaryShardReference,
357372
ActionListener.map(responseListener, result -> result.finalResponseIfSuccessful),
358373
newReplicasProxy(), logger, actionName, primaryRequest.getPrimaryTerm()).execute();
@@ -375,25 +390,13 @@ public void onFailure(Exception e) {
375390

376391
}
377392

378-
protected ActionListener<Response> getResponseActionListener(ActionListener<Response> referenceClosingListener, IndexShard shard) {
379-
380-
return ActionListener.wrap(response -> {
381-
if (syncGlobalCheckpointAfterOperation) {
382-
try {
383-
shard.maybeSyncGlobalCheckpoint("post-operation");
384-
} catch (final Exception e) {
385-
// only log non-closed exceptions
386-
if (ExceptionsHelper.unwrap(
387-
e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
388-
// intentionally swallow, a missed global checkpoint sync should not fail this operation
389-
logger.info(
390-
new ParameterizedMessage(
391-
"{} failed to execute post-operation global checkpoint sync", shard.shardId()), e);
392-
}
393-
}
394-
}
395-
referenceClosingListener.onResponse(response);
396-
}, referenceClosingListener::onFailure);
393+
// allows subclasses to adapt the response
394+
protected void adaptResponse(Response response, IndexShard indexShard) {
395+
396+
}
397+
398+
protected ActionListener<Response> wrapResponseActionListener(ActionListener<Response> listener, IndexShard shard) {
399+
return listener;
397400
}
398401

399402
public static class PrimaryResult<ReplicaRequest extends ReplicationRequest<ReplicaRequest>,

test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -601,16 +601,20 @@ protected ReplicationAction(Request request, ActionListener<Response> listener,
601601
public void execute() {
602602
try {
603603
new ReplicationOperation<>(request, new PrimaryRef(),
604-
ActionListener.map(wrapListener(listener, getPrimaryShard()), result -> result.finalResponse),
604+
ActionListener.map(listener, result -> {
605+
adaptResponse(result.finalResponse, getPrimaryShard());
606+
return result.finalResponse;
607+
}),
605608
new ReplicasRef(), logger, opType, primaryTerm)
606609
.execute();
607610
} catch (Exception e) {
608611
listener.onFailure(e);
609612
}
610613
}
611614

612-
protected ActionListener<Response> wrapListener(ActionListener<Response> listener, IndexShard indexShard) {
613-
return listener;
615+
// to be overridden by subclasses
616+
protected void adaptResponse(Response response, IndexShard indexShard) {
617+
614618
}
615619

616620
protected IndexShard getPrimaryShard() {

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -191,22 +191,15 @@ protected BulkShardOperationsResponse newResponseInstance(StreamInput in) throws
191191
}
192192

193193
@Override
194-
protected ActionListener<BulkShardOperationsResponse> getResponseActionListener(
195-
ActionListener<BulkShardOperationsResponse> referenceClosingListener, IndexShard shard) {
196-
ActionListener<BulkShardOperationsResponse> listener = super.getResponseActionListener(referenceClosingListener, shard);
197-
return wrapListener(listener, shard);
194+
protected void adaptResponse(BulkShardOperationsResponse response, IndexShard indexShard) {
195+
adaptBulkShardOperationsResponse(response, indexShard);
198196
}
199197

200-
public static ActionListener<BulkShardOperationsResponse> wrapListener(ActionListener<BulkShardOperationsResponse> listener,
201-
IndexShard shard) {
202-
203-
return ActionListener.wrap(response -> {
204-
final SeqNoStats seqNoStats = shard.seqNoStats();
205-
// return a fresh global checkpoint after the operations have been replicated for the shard follow task
206-
response.setGlobalCheckpoint(seqNoStats.getGlobalCheckpoint());
207-
response.setMaxSeqNo(seqNoStats.getMaxSeqNo());
208-
listener.onResponse(response);
209-
}, listener::onFailure);
198+
public static void adaptBulkShardOperationsResponse(BulkShardOperationsResponse response, IndexShard indexShard) {
199+
final SeqNoStats seqNoStats = indexShard.seqNoStats();
200+
// return a fresh global checkpoint after the operations have been replicated for the shard follow task
201+
response.setGlobalCheckpoint(seqNoStats.getGlobalCheckpoint());
202+
response.setMaxSeqNo(seqNoStats.getMaxSeqNo());
210203
}
211204

212205
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
1616
import org.elasticsearch.action.bulk.BulkItemResponse;
1717
import org.elasticsearch.action.delete.DeleteRequest;
18-
import org.elasticsearch.action.support.ActionTestUtils;
1918
import org.elasticsearch.action.support.PlainActionFuture;
2019
import org.elasticsearch.action.support.replication.ReplicationResponse;
2120
import org.elasticsearch.action.support.replication.TransportWriteAction;
@@ -677,32 +676,26 @@ protected void performOnPrimary(IndexShard primary, BulkShardOperationsRequest r
677676
try (Releasable ignored = permitFuture.get()) {
678677
ccrResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(), request.getHistoryUUID(),
679678
request.getOperations(), request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger);
680-
ActionTestUtils.assertNoFailureListener(result -> {
681-
TransportWriteActionTestHelper.performPostWriteActions(primary, request, ccrResult.location, logger);
682-
listener.onResponse(new PrimaryResult(ccrResult.replicaRequest(), ccrResult.finalResponseIfSuccessful));
683-
}).onResponse(ccrResult);
679+
TransportWriteActionTestHelper.performPostWriteActions(primary, request, ccrResult.location, logger);
684680
} catch (InterruptedException | ExecutionException | IOException e) {
685681
throw new AssertionError(e);
686682
}
683+
listener.onResponse(new PrimaryResult(ccrResult.replicaRequest(), ccrResult.finalResponseIfSuccessful));
687684
}
688685

689686
@Override
690-
protected ActionListener<BulkShardOperationsResponse> wrapListener(ActionListener<BulkShardOperationsResponse> listener,
691-
IndexShard indexShard) {
692-
return TransportBulkShardOperationsAction.wrapListener(listener, indexShard);
687+
protected void adaptResponse(BulkShardOperationsResponse response, IndexShard indexShard) {
688+
TransportBulkShardOperationsAction.adaptBulkShardOperationsResponse(response, indexShard);
693689
}
694690

695691
@Override
696692
protected void performOnReplica(BulkShardOperationsRequest request, IndexShard replica) throws Exception {
697-
final PlainActionFuture<Releasable> permitAcquiredFuture = new PlainActionFuture<>();
698-
replica.acquireReplicaOperationPermit(getPrimaryShard().getPendingPrimaryTerm(),
699-
getPrimaryShard().getLastKnownGlobalCheckpoint(), getPrimaryShard().getMaxSeqNoOfUpdatesOrDeletes(), permitAcquiredFuture,
700-
ThreadPool.Names.SAME, request);
701-
final Translog.Location location;
702-
try (Releasable ignored = permitAcquiredFuture.actionGet()) {
703-
location = TransportBulkShardOperationsAction.shardOperationOnReplica(request, replica, logger).location;
693+
try (Releasable ignored = PlainActionFuture.get(f -> replica.acquireReplicaOperationPermit(
694+
getPrimaryShard().getPendingPrimaryTerm(), getPrimaryShard().getLastKnownGlobalCheckpoint(),
695+
getPrimaryShard().getMaxSeqNoOfUpdatesOrDeletes(), f, ThreadPool.Names.SAME, request))) {
696+
Translog.Location location = TransportBulkShardOperationsAction.shardOperationOnReplica(request, replica, logger).location;
697+
TransportWriteActionTestHelper.performPostWriteActions(replica, request, location, logger);
704698
}
705-
TransportWriteActionTestHelper.performPostWriteActions(replica, request, location, logger);
706699
}
707700
}
708701

0 commit comments

Comments
 (0)