Skip to content

Commit f49436d

Browse files
Introduce Delegating ActionListener Wrappers (#40129)
* Introduce Delegating ActionListener Wrappers * Dry up use cases of ActionListener that simply pass through the response or exception to another listener
1 parent 49fe782 commit f49436d

File tree

24 files changed

+206
-432
lines changed

24 files changed

+206
-432
lines changed

server/src/main/java/org/elasticsearch/action/ActionListener.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,53 @@ public void onFailure(Exception e) {
7272
};
7373
}
7474

75+
/**
76+
* Creates a listener that delegates all responses it receives to another listener.
77+
*
78+
* @param delegate ActionListener to wrap and delegate any exception to
79+
* @param bc BiConsumer invoked with delegate listener and exception
80+
* @param <T> Type of the listener
81+
* @return Delegating listener
82+
*/
83+
static <T> ActionListener<T> delegateResponse(ActionListener<T> delegate, BiConsumer<ActionListener<T>, Exception> bc) {
84+
return new ActionListener<T>() {
85+
86+
@Override
87+
public void onResponse(T r) {
88+
delegate.onResponse(r);
89+
}
90+
91+
@Override
92+
public void onFailure(Exception e) {
93+
bc.accept(delegate, e);
94+
}
95+
};
96+
}
97+
98+
/**
99+
* Creates a listener that delegates all exceptions it receives to another listener.
100+
*
101+
* @param delegate ActionListener to wrap and delegate any exception to
102+
* @param bc BiConsumer invoked with delegate listener and response
103+
* @param <T> Type of the delegating listener's response
104+
* @param <R> Type of the wrapped listeners
105+
* @return Delegating listener
106+
*/
107+
static <T, R> ActionListener<T> delegateFailure(ActionListener<R> delegate, BiConsumer<ActionListener<R>, T> bc) {
108+
return new ActionListener<T>() {
109+
110+
@Override
111+
public void onResponse(T r) {
112+
bc.accept(delegate, r);
113+
}
114+
115+
@Override
116+
public void onFailure(Exception e) {
117+
delegate.onFailure(e);
118+
}
119+
};
120+
}
121+
75122
/**
76123
* Creates a listener that listens for a response (or failure) and executes the
77124
* corresponding runnable when the response (or failure) is received.

server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ void getRunningTaskFromNode(Task thisTask, GetTaskRequest request, ActionListene
157157
// Shift to the generic thread pool and let it wait for the task to complete so we don't block any important threads.
158158
threadPool.generic().execute(new AbstractRunnable() {
159159
@Override
160-
protected void doRun() throws Exception {
160+
protected void doRun() {
161161
taskManager.waitForTaskCompletion(runningTask, waitForCompletionTimeout(request.getTimeout()));
162162
waitedForCompletion(thisTask, request, runningTask.taskInfo(clusterService.localNode().getId(), true), listener);
163163
}
@@ -180,26 +180,17 @@ public void onFailure(Exception e) {
180180
*/
181181
void waitedForCompletion(Task thisTask, GetTaskRequest request, TaskInfo snapshotOfRunningTask,
182182
ActionListener<GetTaskResponse> listener) {
183-
getFinishedTaskFromIndex(thisTask, request, new ActionListener<GetTaskResponse>() {
184-
@Override
185-
public void onResponse(GetTaskResponse response) {
186-
// We were able to load the task from the task index. Let's send that back.
187-
listener.onResponse(response);
188-
}
189-
190-
@Override
191-
public void onFailure(Exception e) {
183+
getFinishedTaskFromIndex(thisTask, request, ActionListener.delegateResponse(listener, (delegatedListener, e) -> {
192184
/*
193185
* We couldn't load the task from the task index. Instead of 404 we should use the snapshot we took after it finished. If
194186
* the error isn't a 404 then we'll just throw it back to the user.
195187
*/
196188
if (ExceptionsHelper.unwrap(e, ResourceNotFoundException.class) != null) {
197-
listener.onResponse(new GetTaskResponse(new TaskResult(true, snapshotOfRunningTask)));
189+
delegatedListener.onResponse(new GetTaskResponse(new TaskResult(true, snapshotOfRunningTask)));
198190
} else {
199-
listener.onFailure(e);
191+
delegatedListener.onFailure(e);
200192
}
201-
}
202-
});
193+
}));
203194
}
204195

205196
/**

server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.elasticsearch.action.support.master.AcknowledgedResponse;
2525
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
2626
import org.elasticsearch.cluster.ClusterState;
27-
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
2827
import org.elasticsearch.cluster.block.ClusterBlockException;
2928
import org.elasticsearch.cluster.block.ClusterBlockLevel;
3029
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@@ -69,17 +68,8 @@ protected ClusterBlockException checkBlock(DeleteRepositoryRequest request, Clus
6968
protected void masterOperation(final DeleteRepositoryRequest request, ClusterState state,
7069
final ActionListener<AcknowledgedResponse> listener) {
7170
repositoriesService.unregisterRepository(
72-
request,
73-
new ActionListener<ClusterStateUpdateResponse>() {
74-
@Override
75-
public void onResponse(ClusterStateUpdateResponse unregisterRepositoryResponse) {
76-
listener.onResponse(new AcknowledgedResponse(unregisterRepositoryResponse.isAcknowledged()));
77-
}
78-
79-
@Override
80-
public void onFailure(Exception e) {
81-
listener.onFailure(e);
82-
}
83-
});
71+
request, ActionListener.delegateFailure(listener,
72+
(delegatedListener, unregisterRepositoryResponse) ->
73+
delegatedListener.onResponse(new AcknowledgedResponse(unregisterRepositoryResponse.isAcknowledged()))));
8474
}
8575
}

server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.elasticsearch.action.support.master.AcknowledgedResponse;
2525
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
2626
import org.elasticsearch.cluster.ClusterState;
27-
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
2827
import org.elasticsearch.cluster.block.ClusterBlockException;
2928
import org.elasticsearch.cluster.block.ClusterBlockLevel;
3029
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@@ -68,17 +67,7 @@ protected ClusterBlockException checkBlock(PutRepositoryRequest request, Cluster
6867
@Override
6968
protected void masterOperation(final PutRepositoryRequest request, ClusterState state,
7069
final ActionListener<AcknowledgedResponse> listener) {
71-
repositoriesService.registerRepository(request, new ActionListener<ClusterStateUpdateResponse>() {
72-
73-
@Override
74-
public void onResponse(ClusterStateUpdateResponse response) {
75-
listener.onResponse(new AcknowledgedResponse(response.isAcknowledged()));
76-
}
77-
78-
@Override
79-
public void onFailure(Exception e) {
80-
listener.onFailure(e);
81-
}
82-
});
70+
repositoriesService.registerRepository(request, ActionListener.delegateFailure(listener,
71+
(delegatedListener, response) -> delegatedListener.onResponse(new AcknowledgedResponse(response.isAcknowledged()))));
8372
}
8473
}

server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/verify/TransportVerifyRepositoryAction.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@
3333
import org.elasticsearch.threadpool.ThreadPool;
3434
import org.elasticsearch.transport.TransportService;
3535

36-
import java.util.List;
37-
3836
/**
3937
* Transport action for verifying repository operation
4038
*/
@@ -70,16 +68,8 @@ protected ClusterBlockException checkBlock(VerifyRepositoryRequest request, Clus
7068
@Override
7169
protected void masterOperation(final VerifyRepositoryRequest request, ClusterState state,
7270
final ActionListener<VerifyRepositoryResponse> listener) {
73-
repositoriesService.verifyRepository(request.name(), new ActionListener<List<DiscoveryNode>>() {
74-
@Override
75-
public void onResponse(List<DiscoveryNode> verifyResponse) {
76-
listener.onResponse(new VerifyRepositoryResponse(verifyResponse.toArray(new DiscoveryNode[0])));
77-
}
78-
79-
@Override
80-
public void onFailure(Exception e) {
81-
listener.onFailure(e);
82-
}
83-
});
71+
repositoriesService.verifyRepository(request.name(), ActionListener.delegateFailure(listener,
72+
(delegatedListener, verifyResponse) ->
73+
delegatedListener.onResponse(new VerifyRepositoryResponse(verifyResponse.toArray(new DiscoveryNode[0])))));
8474
}
8575
}

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.elasticsearch.cluster.service.ClusterService;
3030
import org.elasticsearch.common.inject.Inject;
3131
import org.elasticsearch.snapshots.RestoreService;
32-
import org.elasticsearch.snapshots.RestoreService.RestoreCompletionResponse;
3332
import org.elasticsearch.threadpool.ThreadPool;
3433
import org.elasticsearch.transport.TransportService;
3534

@@ -75,20 +74,13 @@ protected ClusterBlockException checkBlock(RestoreSnapshotRequest request, Clust
7574
@Override
7675
protected void masterOperation(final RestoreSnapshotRequest request, final ClusterState state,
7776
final ActionListener<RestoreSnapshotResponse> listener) {
78-
restoreService.restoreSnapshot(request, new ActionListener<RestoreCompletionResponse>() {
79-
@Override
80-
public void onResponse(RestoreCompletionResponse restoreCompletionResponse) {
77+
restoreService.restoreSnapshot(request, ActionListener.delegateFailure(listener,
78+
(delegatedListener, restoreCompletionResponse) -> {
8179
if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) {
82-
RestoreClusterStateListener.createAndRegisterListener(clusterService, restoreCompletionResponse, listener);
80+
RestoreClusterStateListener.createAndRegisterListener(clusterService, restoreCompletionResponse, delegatedListener);
8381
} else {
84-
listener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.getRestoreInfo()));
82+
delegatedListener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.getRestoreInfo()));
8583
}
86-
}
87-
88-
@Override
89-
public void onFailure(Exception t) {
90-
listener.onFailure(t);
91-
}
92-
});
84+
}));
9385
}
9486
}

server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -118,19 +118,9 @@ protected void masterOperation(final Task task,
118118
.masterNodeTimeout(request.masterNodeTimeout())
119119
.waitForActiveShards(request.waitForActiveShards())
120120
.indices(concreteIndices);
121-
122-
indexStateService.closeIndices(closeRequest, new ActionListener<CloseIndexResponse>() {
123-
124-
@Override
125-
public void onResponse(final CloseIndexResponse response) {
126-
listener.onResponse(response);
127-
}
128-
129-
@Override
130-
public void onFailure(final Exception t) {
131-
logger.debug(() -> new ParameterizedMessage("failed to close indices [{}]", (Object) concreteIndices), t);
132-
listener.onFailure(t);
133-
}
134-
});
121+
indexStateService.closeIndices(closeRequest, ActionListener.delegateResponse(listener, (delegatedListener, t) -> {
122+
logger.debug(() -> new ParameterizedMessage("failed to close indices [{}]", (Object) concreteIndices), t);
123+
delegatedListener.onFailure(t);
124+
}));
135125
}
136126
}

server/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportResizeAction.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
2525
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
2626
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
27-
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
2827
import org.elasticsearch.action.support.ActionFilters;
2928
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
3029
import org.elasticsearch.client.Client;
@@ -97,25 +96,18 @@ protected void masterOperation(final ResizeRequest resizeRequest, final ClusterS
9796
// there is no need to fetch docs stats for split but we keep it simple and do it anyway for simplicity of the code
9897
final String sourceIndex = indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getSourceIndex());
9998
final String targetIndex = indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getTargetIndexRequest().index());
100-
client.admin().indices().prepareStats(sourceIndex).clear().setDocs(true).execute(new ActionListener<IndicesStatsResponse>() {
101-
@Override
102-
public void onResponse(IndicesStatsResponse indicesStatsResponse) {
99+
client.admin().indices().prepareStats(sourceIndex).clear().setDocs(true).execute(
100+
ActionListener.delegateFailure(listener, (delegatedListener, indicesStatsResponse) -> {
103101
CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(resizeRequest, state,
104-
(i) -> {
102+
i -> {
105103
IndexShardStats shard = indicesStatsResponse.getIndex(sourceIndex).getIndexShards().get(i);
106104
return shard == null ? null : shard.getPrimary().getDocs();
107105
}, sourceIndex, targetIndex);
108106
createIndexService.createIndex(
109-
updateRequest, ActionListener.map(listener,
107+
updateRequest, ActionListener.map(delegatedListener,
110108
response -> new ResizeResponse(response.isAcknowledged(), response.isShardsAcknowledged(), updateRequest.index()))
111109
);
112-
}
113-
114-
@Override
115-
public void onFailure(Exception e) {
116-
listener.onFailure(e);
117-
}
118-
});
110+
}));
119111

120112
}
121113

server/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.elasticsearch.action.support.ActionFilters;
2626
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
2727
import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction;
28-
import org.elasticsearch.action.support.master.AcknowledgedResponse;
2928
import org.elasticsearch.client.node.NodeClient;
3029
import org.elasticsearch.cluster.ClusterState;
3130
import org.elasticsearch.cluster.block.ClusterBlockException;
@@ -104,7 +103,7 @@ protected UpgradeResponse newResponse(UpgradeRequest request, int totalShards, i
104103
versions.put(index, new Tuple<>(version, luceneVersion));
105104
}
106105
}
107-
Map<String, Tuple<org.elasticsearch.Version, String>> updatedVersions = new HashMap<>();
106+
Map<String, Tuple<Version, String>> updatedVersions = new HashMap<>();
108107
MetaData metaData = clusterState.metaData();
109108
for (Map.Entry<String, Tuple<Version, org.apache.lucene.util.Version>> versionEntry : versions.entrySet()) {
110109
String index = versionEntry.getKey();
@@ -209,16 +208,7 @@ public void onFailure(Exception e) {
209208

210209
private void updateSettings(final UpgradeResponse upgradeResponse, final ActionListener<UpgradeResponse> listener) {
211210
UpgradeSettingsRequest upgradeSettingsRequest = new UpgradeSettingsRequest(upgradeResponse.versions());
212-
client.executeLocally(UpgradeSettingsAction.INSTANCE, upgradeSettingsRequest, new ActionListener<AcknowledgedResponse>() {
213-
@Override
214-
public void onResponse(AcknowledgedResponse updateSettingsResponse) {
215-
listener.onResponse(upgradeResponse);
216-
}
217-
218-
@Override
219-
public void onFailure(Exception e) {
220-
listener.onFailure(e);
221-
}
222-
});
211+
client.executeLocally(UpgradeSettingsAction.INSTANCE, upgradeSettingsRequest, ActionListener.delegateFailure(
212+
listener, (delegatedListener, updateSettingsResponse) -> delegatedListener.onResponse(upgradeResponse)));
223213
}
224214
}

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 9 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -658,7 +658,15 @@ ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis,
658658
return ActionListener.map(actionListener,
659659
response -> new BulkResponse(response.getItems(), response.getTook().getMillis(), ingestTookInMillis));
660660
} else {
661-
return new IngestBulkResponseListener(ingestTookInMillis, originalSlots, itemResponses, actionListener);
661+
return ActionListener.delegateFailure(actionListener, (delegatedListener, response) -> {
662+
BulkItemResponse[] items = response.getItems();
663+
for (int i = 0; i < items.length; i++) {
664+
itemResponses.add(originalSlots[i], response.getItems()[i]);
665+
}
666+
delegatedListener.onResponse(
667+
new BulkResponse(
668+
itemResponses.toArray(new BulkItemResponse[0]), response.getTook().getMillis(), ingestTookInMillis));
669+
});
662670
}
663671
}
664672

@@ -688,36 +696,4 @@ void markCurrentItemAsFailed(Exception e) {
688696
}
689697

690698
}
691-
692-
static final class IngestBulkResponseListener implements ActionListener<BulkResponse> {
693-
694-
private final long ingestTookInMillis;
695-
private final int[] originalSlots;
696-
private final List<BulkItemResponse> itemResponses;
697-
private final ActionListener<BulkResponse> actionListener;
698-
699-
IngestBulkResponseListener(long ingestTookInMillis, int[] originalSlots, List<BulkItemResponse> itemResponses,
700-
ActionListener<BulkResponse> actionListener) {
701-
this.ingestTookInMillis = ingestTookInMillis;
702-
this.itemResponses = itemResponses;
703-
this.actionListener = actionListener;
704-
this.originalSlots = originalSlots;
705-
}
706-
707-
@Override
708-
public void onResponse(BulkResponse response) {
709-
BulkItemResponse[] items = response.getItems();
710-
for (int i = 0; i < items.length; i++) {
711-
itemResponses.add(originalSlots[i], response.getItems()[i]);
712-
}
713-
actionListener.onResponse(new BulkResponse(
714-
itemResponses.toArray(new BulkItemResponse[itemResponses.size()]),
715-
response.getTook().getMillis(), ingestTookInMillis));
716-
}
717-
718-
@Override
719-
public void onFailure(Exception e) {
720-
actionListener.onFailure(e);
721-
}
722-
}
723699
}

0 commit comments

Comments
 (0)