Skip to content

Commit b32abcb

Browse files
authored
Zen2: Add Cluster State Applier (#34257)
Adds the cluster state applier to Coordinator, and adds tests for cluster state acking.
1 parent c6b0f08 commit b32abcb

File tree

6 files changed

+695
-196
lines changed

6 files changed

+695
-196
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

Lines changed: 227 additions & 126 deletions
Large diffs are not rendered by default.

server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ public void onFaultyNode(DiscoveryNode faultyNode) {
9191
onPossibleCompletion();
9292
}
9393

94+
public boolean isCommitted() {
95+
return applyCommitRequest.isPresent();
96+
}
97+
9498
private void onPossibleCompletion() {
9599
if (isCompleted) {
96100
return;

server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020

2121
import org.elasticsearch.action.ActionListener;
2222
import org.elasticsearch.cluster.node.DiscoveryNode;
23+
import org.elasticsearch.common.component.AbstractComponent;
2324
import org.elasticsearch.common.io.stream.StreamInput;
25+
import org.elasticsearch.common.settings.Settings;
2426
import org.elasticsearch.threadpool.ThreadPool;
2527
import org.elasticsearch.transport.TransportException;
2628
import org.elasticsearch.transport.TransportRequestOptions;
@@ -29,19 +31,20 @@
2931
import org.elasticsearch.transport.TransportService;
3032

3133
import java.io.IOException;
32-
import java.util.function.Consumer;
34+
import java.util.function.BiConsumer;
3335
import java.util.function.Function;
3436

35-
public class PublicationTransportHandler {
37+
public class PublicationTransportHandler extends AbstractComponent {
3638

3739
public static final String PUBLISH_STATE_ACTION_NAME = "internal:cluster/coordination/publish_state";
3840
public static final String COMMIT_STATE_ACTION_NAME = "internal:cluster/coordination/commit_state";
3941

4042
private final TransportService transportService;
4143

42-
public PublicationTransportHandler(TransportService transportService,
44+
public PublicationTransportHandler(Settings settings, TransportService transportService,
4345
Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest,
44-
Consumer<ApplyCommitRequest> handleApplyCommit) {
46+
BiConsumer<ApplyCommitRequest, ActionListener<Void>> handleApplyCommit) {
47+
super(settings);
4548
this.transportService = transportService;
4649

4750
transportService.registerRequestHandler(PUBLISH_STATE_ACTION_NAME, ThreadPool.Names.GENERIC, false, false,
@@ -50,10 +53,27 @@ public PublicationTransportHandler(TransportService transportService,
5053

5154
transportService.registerRequestHandler(COMMIT_STATE_ACTION_NAME, ThreadPool.Names.GENERIC, false, false,
5255
ApplyCommitRequest::new,
53-
(request, channel, task) -> {
54-
handleApplyCommit.accept(request);
55-
channel.sendResponse(TransportResponse.Empty.INSTANCE);
56-
});
56+
(request, channel, task) -> handleApplyCommit.accept(request, new ActionListener<Void>() {
57+
58+
@Override
59+
public void onResponse(Void aVoid) {
60+
try {
61+
channel.sendResponse(TransportResponse.Empty.INSTANCE);
62+
} catch (IOException e) {
63+
logger.debug("failed to send response on commit", e);
64+
}
65+
}
66+
67+
@Override
68+
public void onFailure(Exception e) {
69+
try {
70+
channel.sendResponse(e);
71+
} catch (IOException ie) {
72+
e.addSuppressed(ie);
73+
logger.debug("failed to send response on commit", e);
74+
}
75+
}
76+
}));
5777
}
5878

5979
public void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,

0 commit comments

Comments
 (0)