4141import org .elasticsearch .indices .cluster .FakeThreadPoolMasterService ;
4242import org .elasticsearch .test .ESTestCase ;
4343import org .elasticsearch .test .disruption .DisruptableMockTransport ;
44+ import org .elasticsearch .test .disruption .DisruptableMockTransport .ConnectionStatus ;
4445import org .elasticsearch .test .junit .annotations .TestLogging ;
4546import org .elasticsearch .transport .TransportService ;
4647import org .hamcrest .Matcher ;
5960import static org .elasticsearch .cluster .coordination .CoordinationStateTests .clusterState ;
6061import static org .elasticsearch .cluster .coordination .CoordinationStateTests .setValue ;
6162import static org .elasticsearch .cluster .coordination .CoordinationStateTests .value ;
63+ import static org .elasticsearch .cluster .coordination .Coordinator .Mode .CANDIDATE ;
6264import static org .elasticsearch .cluster .coordination .Coordinator .Mode .FOLLOWER ;
65+ import static org .elasticsearch .cluster .coordination .LeaderChecker .LEADER_CHECK_INTERVAL_SETTING ;
66+ import static org .elasticsearch .cluster .coordination .LeaderChecker .LEADER_CHECK_RETRY_COUNT_SETTING ;
67+ import static org .elasticsearch .cluster .coordination .LeaderChecker .LEADER_CHECK_TIMEOUT_SETTING ;
6368import static org .elasticsearch .node .Node .NODE_NAME_SETTING ;
6469import static org .elasticsearch .transport .TransportService .HANDSHAKE_ACTION_NAME ;
6570import static org .elasticsearch .transport .TransportService .NOOP_TRANSPORT_INTERCEPTOR ;
6873import static org .hamcrest .Matchers .is ;
6974import static org .hamcrest .Matchers .not ;
7075
71- @ TestLogging ("org.elasticsearch.cluster.coordination:TRACE,org.elasticsearch.cluster. discovery:TRACE" )
76+ @ TestLogging ("org.elasticsearch.cluster.coordination:TRACE,org.elasticsearch.discovery:TRACE" )
7277public class CoordinatorTests extends ESTestCase {
7378
7479 public void testCanUpdateClusterStateAfterStabilisation () {
@@ -101,6 +106,40 @@ public void testNodesJoinAfterStableCluster() {
101106 assertEquals (currentTerm , newTerm );
102107 }
103108
109+ public void testLeaderDisconnectionDetectedQuickly () {
110+ final Cluster cluster = new Cluster (randomIntBetween (3 , 5 ));
111+ cluster .stabilise ();
112+
113+ final ClusterNode originalLeader = cluster .getAnyLeader ();
114+ logger .info ("--> disconnecting leader {}" , originalLeader );
115+ originalLeader .disconnect ();
116+
117+ synchronized (originalLeader .coordinator .mutex ) {
118+ originalLeader .coordinator .becomeCandidate ("simulated failure detection" ); // TODO remove once follower checker is integrated
119+ }
120+
121+ cluster .stabilise ();
122+ assertThat (cluster .getAnyLeader ().getId (), not (equalTo (originalLeader .getId ())));
123+ }
124+
125+ public void testUnresponsiveLeaderDetectedEventually () {
126+ final Cluster cluster = new Cluster (randomIntBetween (3 , 5 ));
127+ cluster .stabilise ();
128+
129+ final ClusterNode originalLeader = cluster .getAnyLeader ();
130+ logger .info ("--> partitioning leader {}" , originalLeader );
131+ originalLeader .partition ();
132+
133+ synchronized (originalLeader .coordinator .mutex ) {
134+ originalLeader .coordinator .becomeCandidate ("simulated failure detection" ); // TODO remove once follower checker is integrated
135+ }
136+
137+ cluster .stabilise (Cluster .DEFAULT_STABILISATION_TIME
138+ + (LEADER_CHECK_INTERVAL_SETTING .get (Settings .EMPTY ).millis () + LEADER_CHECK_TIMEOUT_SETTING .get (Settings .EMPTY ).millis ())
139+ * LEADER_CHECK_RETRY_COUNT_SETTING .get (Settings .EMPTY ));
140+ assertThat (cluster .getAnyLeader ().getId (), not (equalTo (originalLeader .getId ())));
141+ }
142+
104143 private static String nodeIdFromIndex (int nodeIndex ) {
105144 return "node" + nodeIndex ;
106145 }
@@ -115,6 +154,9 @@ class Cluster {
115154 Settings .builder ().put (NODE_NAME_SETTING .getKey (), "deterministic-task-queue" ).build ());
116155 private final VotingConfiguration initialConfiguration ;
117156
157+ private final Set <String > disconnectedNodes = new HashSet <>();
158+ private final Set <String > blackholedNodes = new HashSet <>();
159+
118160 Cluster (int initialNodeCount ) {
119161 logger .info ("--> creating cluster of {} nodes" , initialNodeCount );
120162
@@ -142,8 +184,12 @@ void addNodes(int newNodesCount) {
142184 }
143185
144186 void stabilise () {
187+ stabilise (DEFAULT_STABILISATION_TIME );
188+ }
189+
190+ void stabilise (long stabilisationTime ) {
145191 final long stabilisationStartTime = deterministicTaskQueue .getCurrentTimeMillis ();
146- while (deterministicTaskQueue .getCurrentTimeMillis () < stabilisationStartTime + DEFAULT_STABILISATION_TIME ) {
192+ while (deterministicTaskQueue .getCurrentTimeMillis () < stabilisationStartTime + stabilisationTime ) {
147193
148194 while (deterministicTaskQueue .hasRunnableTasks ()) {
149195 try {
@@ -182,16 +228,21 @@ private void assertUniqueLeaderAndExpectedModes() {
182228 }
183229
184230 final String nodeId = clusterNode .getId ();
185- assertThat (nodeId + " has the same term as the leader" , clusterNode .coordinator .getCurrentTerm (), is (leaderTerm ));
186- // TODO assert that all nodes have actually voted for the leader in this term
187-
188- assertThat (nodeId + " is a follower" , clusterNode .coordinator .getMode (), is (FOLLOWER ));
189- assertThat (nodeId + " is at the same accepted version as the leader" ,
190- Optional .of (clusterNode .coordinator .getLastAcceptedState ().getVersion ()), isPresentAndEqualToLeaderVersion );
191- assertThat (nodeId + " is at the same committed version as the leader" ,
192- clusterNode .coordinator .getLastCommittedState ().map (ClusterState ::getVersion ), isPresentAndEqualToLeaderVersion );
193- assertThat (clusterNode .coordinator .getLastCommittedState ().map (ClusterState ::getNodes ).map (dn -> dn .nodeExists (nodeId )),
194- equalTo (Optional .of (true )));
231+
232+ if (disconnectedNodes .contains (nodeId ) || blackholedNodes .contains (nodeId )) {
233+ assertThat (nodeId + " is a candidate" , clusterNode .coordinator .getMode (), is (CANDIDATE ));
234+ } else {
235+ assertThat (nodeId + " has the same term as the leader" , clusterNode .coordinator .getCurrentTerm (), is (leaderTerm ));
236+ // TODO assert that all nodes have actually voted for the leader in this term
237+
238+ assertThat (nodeId + " is a follower" , clusterNode .coordinator .getMode (), is (FOLLOWER ));
239+ assertThat (nodeId + " is at the same accepted version as the leader" ,
240+ Optional .of (clusterNode .coordinator .getLastAcceptedState ().getVersion ()), isPresentAndEqualToLeaderVersion );
241+ assertThat (nodeId + " is at the same committed version as the leader" ,
242+ clusterNode .coordinator .getLastCommittedState ().map (ClusterState ::getVersion ), isPresentAndEqualToLeaderVersion );
243+ assertThat (clusterNode .coordinator .getLastCommittedState ().map (ClusterState ::getNodes ).map (dn -> dn .nodeExists (nodeId )),
244+ equalTo (Optional .of (true )));
245+ }
195246 }
196247
197248 assertThat (leader .coordinator .getLastCommittedState ().map (ClusterState ::getNodes ).map (DiscoveryNodes ::getSize ),
@@ -204,6 +255,18 @@ ClusterNode getAnyLeader() {
204255 return randomFrom (allLeaders );
205256 }
206257
258+ private ConnectionStatus getConnectionStatus (DiscoveryNode sender , DiscoveryNode destination ) {
259+ ConnectionStatus connectionStatus ;
260+ if (blackholedNodes .contains (sender .getId ()) || blackholedNodes .contains (destination .getId ())) {
261+ connectionStatus = ConnectionStatus .BLACK_HOLE ;
262+ } else if (disconnectedNodes .contains (sender .getId ()) || disconnectedNodes .contains (destination .getId ())) {
263+ connectionStatus = ConnectionStatus .DISCONNECTED ;
264+ } else {
265+ connectionStatus = ConnectionStatus .CONNECTED ;
266+ }
267+ return connectionStatus ;
268+ }
269+
207270 class ClusterNode extends AbstractComponent {
208271 private final int nodeIndex ;
209272 private Coordinator coordinator ;
@@ -241,7 +304,7 @@ protected DiscoveryNode getLocalNode() {
241304
242305 @ Override
243306 protected ConnectionStatus getConnectionStatus (DiscoveryNode sender , DiscoveryNode destination ) {
244- return ConnectionStatus . CONNECTED ;
307+ return Cluster . this . getConnectionStatus ( sender , destination ) ;
245308 }
246309
247310 @ Override
@@ -264,6 +327,17 @@ protected void handle(DiscoveryNode sender, DiscoveryNode destination, String ac
264327 deterministicTaskQueue .scheduleNow (onNode (destination , doDelivery ));
265328 }
266329 }
330+
331+ @ Override
332+ protected void onBlackholedDuringSend (long requestId , String action , DiscoveryNode destination ) {
333+ if (action .equals (HANDSHAKE_ACTION_NAME )) {
334+ logger .trace ("ignoring blackhole and delivering {}" , getRequestDescription (requestId , action , destination ));
335+ // handshakes always have a timeout, and are sent in a blocking fashion, so we must respond with an exception.
336+ sendFromTo (destination , getLocalNode (), action , getDisconnectException (requestId , action , destination ));
337+ } else {
338+ super .onBlackholedDuringSend (requestId , action , destination );
339+ }
340+ }
267341 };
268342
269343 masterService = new FakeThreadPoolMasterService ("test" ,
@@ -290,7 +364,7 @@ String getId() {
290364 return localNode .getId ();
291365 }
292366
293- public DiscoveryNode getLocalNode () {
367+ DiscoveryNode getLocalNode () {
294368 return localNode ;
295369 }
296370
@@ -316,6 +390,14 @@ public void onFailure(String source, Exception e) {
316390 public String toString () {
317391 return localNode .toString ();
318392 }
393+
394+ void disconnect () {
395+ disconnectedNodes .add (localNode .getId ());
396+ }
397+
398+ void partition () {
399+ blackholedNodes .add (localNode .getId ());
400+ }
319401 }
320402
321403 private List <TransportAddress > provideUnicastHosts (HostsResolver ignored ) {
0 commit comments