Skip to content

Commit c79fbea

Browse files
authored
[Zen2] Implement basic cluster formation (#33668)
This PR integrates the following pieces of machinery in the Coordinator: - discovery - pre-voting - randomised election scheduling - joining (of a new master) - publication of cluster state updates Together, these things are everything needed to form a cluster. We therefore also add the start of a test suite that allows us to assert higher-level properties of the interactions between all these pieces of machinery, with as little fake behaviour as possible. We assert one such property: "a cluster successfully forms".
1 parent 01b3be9 commit c79fbea

File tree

17 files changed

+886
-88
lines changed

17 files changed

+886
-88
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/ApplyCommitRequest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public String toString() {
4949
return "ApplyCommitRequest{" +
5050
"term=" + term +
5151
", version=" + version +
52+
", sourceNode=" + sourceNode +
5253
'}';
5354
}
5455
}

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

Lines changed: 237 additions & 32 deletions
Large diffs are not rendered by default.

server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.elasticsearch.cluster.coordination;
2020

21+
import org.apache.logging.log4j.message.ParameterizedMessage;
2122
import org.elasticsearch.cluster.ClusterState;
2223
import org.elasticsearch.cluster.ClusterStateTaskConfig;
2324
import org.elasticsearch.cluster.ClusterStateTaskListener;
@@ -27,29 +28,37 @@
2728
import org.elasticsearch.cluster.service.MasterService;
2829
import org.elasticsearch.common.Priority;
2930
import org.elasticsearch.common.component.AbstractComponent;
31+
import org.elasticsearch.common.io.stream.StreamInput;
3032
import org.elasticsearch.common.settings.Settings;
3133
import org.elasticsearch.threadpool.ThreadPool;
34+
import org.elasticsearch.threadpool.ThreadPool.Names;
35+
import org.elasticsearch.transport.TransportException;
3236
import org.elasticsearch.transport.TransportResponse;
37+
import org.elasticsearch.transport.TransportResponse.Empty;
38+
import org.elasticsearch.transport.TransportResponseHandler;
3339
import org.elasticsearch.transport.TransportService;
3440

3541
import java.io.IOException;
3642
import java.util.HashMap;
3743
import java.util.List;
3844
import java.util.Map;
45+
import java.util.Optional;
3946
import java.util.function.BiConsumer;
47+
import java.util.function.Function;
4048
import java.util.function.LongSupplier;
4149

4250
public class JoinHelper extends AbstractComponent {
4351

4452
public static final String JOIN_ACTION_NAME = "internal:cluster/coordination/join";
53+
public static final String START_JOIN_ACTION_NAME = "internal:cluster/coordination/start_join";
4554

4655
private final MasterService masterService;
4756
private final TransportService transportService;
4857
private final JoinTaskExecutor joinTaskExecutor;
4958

5059
public JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
5160
TransportService transportService, LongSupplier currentTermSupplier,
52-
BiConsumer<JoinRequest, JoinCallback> joinHandler) {
61+
BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm) {
5362
super(settings);
5463
this.masterService = masterService;
5564
this.transportService = transportService;
@@ -100,6 +109,62 @@ public String toString() {
100109
return "JoinCallback{request=" + request + "}";
101110
}
102111
}));
112+
113+
transportService.registerRequestHandler(START_JOIN_ACTION_NAME, Names.GENERIC, false, false,
114+
StartJoinRequest::new,
115+
(request, channel, task) -> {
116+
final DiscoveryNode destination = request.getSourceNode();
117+
final JoinRequest joinRequest
118+
= new JoinRequest(transportService.getLocalNode(), Optional.of(joinLeaderInTerm.apply(request)));
119+
logger.debug("attempting to join {} with {}", destination, joinRequest);
120+
this.transportService.sendRequest(destination, JOIN_ACTION_NAME, joinRequest, new TransportResponseHandler<Empty>() {
121+
@Override
122+
public Empty read(StreamInput in) {
123+
return Empty.INSTANCE;
124+
}
125+
126+
@Override
127+
public void handleResponse(Empty response) {
128+
logger.debug("successfully joined {} with {}", destination, joinRequest);
129+
}
130+
131+
@Override
132+
public void handleException(TransportException exp) {
133+
logger.debug(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exp);
134+
}
135+
136+
@Override
137+
public String executor() {
138+
return Names.SAME;
139+
}
140+
});
141+
channel.sendResponse(Empty.INSTANCE);
142+
});
143+
}
144+
145+
public void sendStartJoinRequest(final StartJoinRequest startJoinRequest, final DiscoveryNode destination) {
146+
transportService.sendRequest(destination, START_JOIN_ACTION_NAME,
147+
startJoinRequest, new TransportResponseHandler<Empty>() {
148+
@Override
149+
public Empty read(StreamInput in) {
150+
return Empty.INSTANCE;
151+
}
152+
153+
@Override
154+
public void handleResponse(Empty response) {
155+
logger.debug("successful response to {} from {}", startJoinRequest, destination);
156+
}
157+
158+
@Override
159+
public void handleException(TransportException exp) {
160+
logger.debug(new ParameterizedMessage("failure in response to {} from {}", startJoinRequest, destination), exp);
161+
}
162+
163+
@Override
164+
public String executor() {
165+
return ThreadPool.Names.SAME;
166+
}
167+
});
103168
}
104169

105170
public interface JoinCallback {
@@ -211,7 +276,8 @@ public void close(Mode newMode) {
211276

212277
@Override
213278
public String toString() {
214-
return "CandidateJoinAccumulator{" + joinRequestAccumulator.keySet() + '}';
279+
return "CandidateJoinAccumulator{" + joinRequestAccumulator.keySet() +
280+
", closed=" + closed + '}';
215281
}
216282
}
217283
}

server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.cluster.ClusterState;
2424
import org.elasticsearch.cluster.coordination.CoordinationState.VoteCollection;
2525
import org.elasticsearch.cluster.node.DiscoveryNode;
26+
import org.elasticsearch.common.Nullable;
2627
import org.elasticsearch.common.collect.Tuple;
2728
import org.elasticsearch.common.component.AbstractComponent;
2829
import org.elasticsearch.common.lease.Releasable;
@@ -34,6 +35,7 @@
3435

3536
import java.util.Set;
3637
import java.util.concurrent.atomic.AtomicBoolean;
38+
import java.util.function.LongConsumer;
3739

3840
import static org.elasticsearch.cluster.coordination.CoordinationState.isElectionQuorum;
3941
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet;
@@ -44,16 +46,17 @@ public class PreVoteCollector extends AbstractComponent {
4446

4547
private final TransportService transportService;
4648
private final Runnable startElection;
49+
private final LongConsumer updateMaxTermSeen;
4750

48-
// Tuple for simple atomic updates
49-
private volatile Tuple<DiscoveryNode, PreVoteResponse> state; // DiscoveryNode component is null if there is currently no known leader
51+
// Tuple for simple atomic updates. null until the first call to `update()`.
52+
private volatile Tuple<DiscoveryNode, PreVoteResponse> state; // DiscoveryNode component is null if there is currently no known leader.
5053

51-
PreVoteCollector(final Settings settings, final PreVoteResponse preVoteResponse,
52-
final TransportService transportService, final Runnable startElection) {
54+
PreVoteCollector(final Settings settings, final TransportService transportService, final Runnable startElection,
55+
final LongConsumer updateMaxTermSeen) {
5356
super(settings);
54-
state = new Tuple<>(null, preVoteResponse);
5557
this.transportService = transportService;
5658
this.startElection = startElection;
59+
this.updateMaxTermSeen = updateMaxTermSeen;
5760

5861
// TODO does this need to be on the generic threadpool or can it use SAME?
5962
transportService.registerRequestHandler(REQUEST_PRE_VOTE_ACTION_NAME, Names.GENERIC, false, false,
@@ -74,7 +77,7 @@ public Releasable start(final ClusterState clusterState, final Iterable<Discover
7477
return preVotingRound;
7578
}
7679

77-
public void update(final PreVoteResponse preVoteResponse, final DiscoveryNode leader) {
80+
public void update(final PreVoteResponse preVoteResponse, @Nullable final DiscoveryNode leader) {
7881
logger.trace("updating with preVoteResponse={}, leader={}", preVoteResponse, leader);
7982
state = new Tuple<>(leader, preVoteResponse);
8083
}
@@ -156,7 +159,7 @@ private void handlePreVoteResponse(final PreVoteResponse response, final Discove
156159
return;
157160
}
158161

159-
// TODO the response carries the sender's current term. If an election starts then it should be in a higher term.
162+
updateMaxTermSeen.accept(response.getCurrentTerm());
160163

161164
if (response.getLastAcceptedTerm() > clusterState.term()
162165
|| (response.getLastAcceptedTerm() == clusterState.term()

server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ private void onPossibleCommitFailure() {
156156

157157
protected abstract Optional<ApplyCommitRequest> handlePublishResponse(DiscoveryNode sourceNode, PublishResponse publishResponse);
158158

159-
protected abstract void onPossibleJoin(DiscoveryNode sourceNode, PublishWithJoinResponse response);
159+
protected abstract void onJoin(Join join);
160160

161161
protected abstract void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,
162162
ActionListener<PublishWithJoinResponse> responseActionListener);
@@ -287,8 +287,10 @@ public void onResponse(PublishWithJoinResponse response) {
287287
return;
288288
}
289289

290-
// TODO: check if we need to pass the full response here or if it's sufficient to just pass the optional join.
291-
onPossibleJoin(discoveryNode, response);
290+
response.getJoin().ifPresent(join -> {
291+
assert discoveryNode.equals(join.getSourceNode());
292+
onJoin(join);
293+
});
292294

293295
assert state == PublicationTargetState.SENT_PUBLISH_REQUEST : state + " -> " + PublicationTargetState.WAITING_FOR_QUORUM;
294296
state = PublicationTargetState.WAITING_FOR_QUORUM;

server/src/main/java/org/elasticsearch/discovery/PeerFinder.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ public abstract class PeerFinder extends AbstractComponent {
7272
private final Map<TransportAddress, Peer> peersByAddress = newConcurrentMap();
7373
private Optional<DiscoveryNode> leader = Optional.empty();
7474

75-
PeerFinder(Settings settings, TransportService transportService,
76-
TransportAddressConnector transportAddressConnector, ConfiguredHostsResolver configuredHostsResolver) {
75+
public PeerFinder(Settings settings, TransportService transportService, TransportAddressConnector transportAddressConnector,
76+
ConfiguredHostsResolver configuredHostsResolver) {
7777
super(settings);
7878
findPeersDelay = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings);
7979
this.transportService = transportService;
@@ -95,6 +95,8 @@ public void activate(final DiscoveryNodes lastAcceptedNodes) {
9595
leader = Optional.empty();
9696
handleWakeUp(); // return value discarded: there are no known peers, so none can be disconnected
9797
}
98+
99+
onFoundPeersUpdated(); // trigger a check for a quorum already
98100
}
99101

100102
public void deactivate(DiscoveryNode leader) {
@@ -116,7 +118,7 @@ protected final boolean holdsLock() {
116118
return Thread.holdsLock(mutex);
117119
}
118120

119-
boolean assertInactiveWithNoKnownPeers() {
121+
private boolean assertInactiveWithNoKnownPeers() {
120122
assert active == false;
121123
assert peersByAddress.isEmpty() : peersByAddress.keySet();
122124
return true;
@@ -125,13 +127,24 @@ boolean assertInactiveWithNoKnownPeers() {
125127
PeersResponse handlePeersRequest(PeersRequest peersRequest) {
126128
synchronized (mutex) {
127129
assert peersRequest.getSourceNode().equals(getLocalNode()) == false;
130+
final List<DiscoveryNode> knownPeers;
128131
if (active) {
132+
assert leader.isPresent() == false : leader;
129133
startProbe(peersRequest.getSourceNode().getAddress());
130134
peersRequest.getKnownPeers().stream().map(DiscoveryNode::getAddress).forEach(this::startProbe);
131-
return new PeersResponse(Optional.empty(), getFoundPeersUnderLock(), currentTerm);
135+
knownPeers = getFoundPeersUnderLock();
132136
} else {
133-
return new PeersResponse(leader, Collections.emptyList(), currentTerm);
137+
assert leader.isPresent();
138+
knownPeers = Collections.emptyList();
134139
}
140+
return new PeersResponse(leader, knownPeers, currentTerm);
141+
}
142+
}
143+
144+
// exposed for checking invariant in o.e.c.c.Coordinator (public since this is a different package)
145+
public Optional<DiscoveryNode> getLeader() {
146+
synchronized (mutex) {
147+
return leader;
135148
}
136149
}
137150

@@ -247,7 +260,7 @@ protected void doRun() {
247260

248261
@Override
249262
public String toString() {
250-
return "PeerFinder::handleWakeUp";
263+
return "PeerFinder handling wakeup";
251264
}
252265
});
253266

server/src/main/java/org/elasticsearch/transport/TransportService.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -733,6 +733,11 @@ public void onFailure(Exception e) {
733733
"failed to notify channel of error message for action [{}]", action), inner);
734734
}
735735
}
736+
737+
@Override
738+
public String toString() {
739+
return "processing of [" + action + "][" + requestId + "]: " + request;
740+
}
736741
});
737742
}
738743

@@ -1049,6 +1054,11 @@ public void cancel() {
10491054
"cancel must be called after the requestId [" + requestId + "] has been removed from clientHandlers";
10501055
FutureUtils.cancel(future);
10511056
}
1057+
1058+
@Override
1059+
public String toString() {
1060+
return "TimeoutHandler for [" + action + "][" + requestId + "]";
1061+
}
10521062
}
10531063

10541064
static class TimeoutInfoHolder {
@@ -1176,7 +1186,17 @@ public void sendResponse(final TransportResponse response, TransportResponseOpti
11761186
if (ThreadPool.Names.SAME.equals(executor)) {
11771187
processResponse(handler, response);
11781188
} else {
1179-
threadPool.executor(executor).execute(() -> processResponse(handler, response));
1189+
threadPool.executor(executor).execute(new Runnable() {
1190+
@Override
1191+
public String toString() {
1192+
return "delivery of response to [" + action + "][" + requestId + "]: " + response;
1193+
}
1194+
1195+
@Override
1196+
public void run() {
1197+
DirectResponseChannel.this.processResponse(handler, response);
1198+
}
1199+
});
11801200
}
11811201
}
11821202
}

server/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,8 @@ public void testShrinkIndexPrimaryTerm() throws Exception {
162162
final List<Integer> factors = Arrays.asList(2, 3, 5, 7);
163163
final List<Integer> numberOfShardsFactors = randomSubsetOf(scaledRandomIntBetween(1, factors.size() - 1), factors);
164164
final int numberOfShards = numberOfShardsFactors.stream().reduce(1, (x, y) -> x * y);
165-
final int numberOfTargetShards = randomSubsetOf(numberOfShardsFactors).stream().reduce(1, (x, y) -> x * y);
165+
final int numberOfTargetShards = randomSubsetOf(randomInt(numberOfShardsFactors.size() - 1), numberOfShardsFactors)
166+
.stream().reduce(1, (x, y) -> x * y);
166167
internalCluster().ensureAtLeastNumDataNodes(2);
167168
prepareCreate("source").setSettings(Settings.builder().put(indexSettings()).put("number_of_shards", numberOfShards)).get();
168169

0 commit comments

Comments
 (0)