Skip to content

Commit 13c5e74

Browse files
committed
Added ServiceDisruptionScheme(s) and testAckedIndexing
This commit adds the notion of ServiceDisruptionScheme allowing for introducing disruptions in our test cluster. This abstraction as used in a couple of wrappers around the functionality offered by MockTransportService to simulate various network partions. There is also one implementation for causing a node to be slow in processing cluster state updates. This new mechnaism is integrated into existing tests DiscoveryWithNetworkFailuresTests. A new test called testAckedIndexing is added to verify retrieval of documents whose indexing was acked during various disruptions. Closes #6505
1 parent 5c3b0f6 commit 13c5e74

17 files changed

+1145
-156
lines changed

src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ public ClusterState execute(ClusterState currentState) {
340340

341341
@Override
342342
public void onFailure(String source, Throwable t) {
343-
logger.error("unexpected failure during [{}]", t, source);
343+
logger.error("unexpected failure during [{}]", t, source);
344344
}
345345

346346
@Override
@@ -406,8 +406,7 @@ public ClusterState execute(ClusterState currentState) {
406406
public void onFailure(String source, Throwable t) {
407407
if (t instanceof ClusterService.NoLongerMasterException) {
408408
logger.debug("not processing {} leave request as we are no longer master", node);
409-
}
410-
else {
409+
} else {
411410
logger.error("unexpected failure during [{}]", t, source);
412411
}
413412
}
@@ -446,8 +445,7 @@ public ClusterState execute(ClusterState currentState) {
446445
public void onFailure(String source, Throwable t) {
447446
if (t instanceof ClusterService.NoLongerMasterException) {
448447
logger.debug("not processing [{}] as we are no longer master", source);
449-
}
450-
else {
448+
} else {
451449
logger.error("unexpected failure during [{}]", t, source);
452450
}
453451
}
@@ -484,8 +482,7 @@ public ClusterState execute(ClusterState currentState) {
484482
public void onFailure(String source, Throwable t) {
485483
if (t instanceof ClusterService.NoLongerMasterException) {
486484
logger.debug("not processing [{}] as we are no longer master", source);
487-
}
488-
else {
485+
} else {
489486
logger.error("unexpected failure during [{}]", t, source);
490487
}
491488
}
@@ -594,7 +591,7 @@ void handleNewClusterStateFromMaster(ClusterState newClusterState, final Publish
594591
return;
595592
}
596593
if (master) {
597-
logger.debug("received cluster state from [{}] which is also master but with cluster name [{}]", newClusterState.nodes().masterNode(), incomingClusterName);
594+
logger.debug("received cluster state from [{}] which is also master but with cluster name [{}]", newClusterState.nodes().masterNode(), incomingClusterName);
598595
final ClusterState newState = newClusterState;
599596
clusterService.submitStateUpdateTask("zen-disco-master_receive_cluster_state_from_another_master [" + newState.nodes().masterNode() + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
600597
@Override
@@ -638,7 +635,6 @@ public void onFailure(String source, Throwable t) {
638635
final ProcessClusterState processClusterState = new ProcessClusterState(newClusterState, newStateProcessed);
639636
processNewClusterStates.add(processClusterState);
640637

641-
642638
assert newClusterState.nodes().masterNode() != null : "received a cluster state without a master";
643639
assert !newClusterState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock()) : "received a cluster state with a master block";
644640

@@ -1014,8 +1010,7 @@ public ClusterState execute(ClusterState currentState) {
10141010
public void onFailure(String source, Throwable t) {
10151011
if (t instanceof ClusterService.NoLongerMasterException) {
10161012
logger.debug("not processing [{}] as we are no longer master", source);
1017-
}
1018-
else {
1013+
} else {
10191014
logger.error("unexpected failure during [{}]", t, source);
10201015
}
10211016
}

src/main/java/org/elasticsearch/transport/TransportService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,10 @@ public void removeHandler(String action) {
257257
}
258258
}
259259

260+
protected TransportRequestHandler getHandler(String action) {
261+
return serverHandlers.get(action);
262+
}
263+
260264
class Adapter implements TransportServiceAdapter {
261265

262266
final MeanMetric rxMetric = new MeanMetric();

src/test/java/org/elasticsearch/discovery/DiscoveryWithNetworkFailuresTests.java

Lines changed: 257 additions & 140 deletions
Large diffs are not rendered by default.

src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
4444
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
4545
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
46-
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
4746
import static org.hamcrest.Matchers.equalTo;
4847

4948
public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {

src/test/java/org/elasticsearch/test/BackgroundIndexer.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.elasticsearch.client.Client;
2626
import org.elasticsearch.common.logging.ESLogger;
2727
import org.elasticsearch.common.logging.Loggers;
28-
import org.elasticsearch.recovery.RecoveryWhileUnderLoadTests;
2928
import org.junit.Assert;
3029

3130
import java.util.concurrent.CopyOnWriteArrayList;
@@ -40,7 +39,7 @@
4039

4140
public class BackgroundIndexer implements AutoCloseable {
4241

43-
private final ESLogger logger = Loggers.getLogger(RecoveryWhileUnderLoadTests.class);
42+
private final ESLogger logger = Loggers.getLogger(getClass());
4443

4544
final Thread[] writers;
4645
final CountDownLatch stopLatch;
@@ -218,7 +217,7 @@ public void continueIndexing(int numOfDocs) {
218217
setBudget(numOfDocs);
219218
}
220219

221-
/** Stop all background threads **/
220+
/** Stop all background threads * */
222221
public void stop() throws InterruptedException {
223222
if (stop.get()) {
224223
return;

src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
import org.elasticsearch.rest.RestStatus;
9292
import org.elasticsearch.search.SearchService;
9393
import org.elasticsearch.test.client.RandomizingClient;
94+
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
9495
import org.junit.*;
9596

9697
import java.io.IOException;
@@ -518,6 +519,7 @@ protected final void afterInternal() throws IOException {
518519
boolean success = false;
519520
try {
520521
logger.info("[{}#{}]: cleaning up after test", getTestClass().getSimpleName(), getTestName());
522+
clearDisruptionScheme();
521523
final Scope currentClusterScope = getCurrentClusterScope();
522524
try {
523525
if (currentClusterScope != Scope.TEST) {
@@ -631,6 +633,15 @@ protected int numberOfReplicas() {
631633
return between(minimumNumberOfReplicas(), maximumNumberOfReplicas());
632634
}
633635

636+
637+
public void setDisruptionScheme(ServiceDisruptionScheme scheme) {
638+
internalCluster().setDisruptionScheme(scheme);
639+
}
640+
641+
public void clearDisruptionScheme() {
642+
internalCluster().clearDisruptionScheme();
643+
}
644+
634645
/**
635646
* Returns a settings object used in {@link #createIndex(String...)} and {@link #prepareCreate(String)} and friends.
636647
* This method can be overwritten by subclasses to set defaults for the indices that are created by the test.

src/test/java/org/elasticsearch/test/InternalTestCluster.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import org.elasticsearch.search.SearchService;
7373
import org.elasticsearch.test.cache.recycler.MockBigArraysModule;
7474
import org.elasticsearch.test.cache.recycler.MockPageCacheRecyclerModule;
75+
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
7576
import org.elasticsearch.test.engine.MockEngineModule;
7677
import org.elasticsearch.test.store.MockFSIndexStoreModule;
7778
import org.elasticsearch.test.transport.AssertingLocalTransportModule;
@@ -175,6 +176,8 @@ public final class InternalTestCluster extends TestCluster {
175176

176177
private final boolean hasFilterCache;
177178

179+
private ServiceDisruptionScheme activeDisruptionScheme;
180+
178181
public InternalTestCluster(long clusterSeed, String clusterName) {
179182
this(clusterSeed, DEFAULT_MIN_NUM_DATA_NODES, DEFAULT_MAX_NUM_DATA_NODES, clusterName, NodeSettingsSource.EMPTY, DEFAULT_NUM_CLIENT_NODES, DEFAULT_ENABLE_RANDOM_BENCH_NODES);
180183
}
@@ -277,6 +280,10 @@ public String getClusterName() {
277280
return clusterName;
278281
}
279282

283+
public String[] getNodeNames() {
284+
return nodes.keySet().toArray(Strings.EMPTY_ARRAY);
285+
}
286+
280287
private static boolean isLocalTransportConfigured() {
281288
if ("local".equals(System.getProperty("es.node.mode", "network"))) {
282289
return true;
@@ -476,6 +483,7 @@ public synchronized void ensureAtMostNumDataNodes(int n) throws IOException {
476483
while (limit.hasNext()) {
477484
NodeAndClient next = limit.next();
478485
nodesToRemove.add(next);
486+
removeDistruptionSchemeFromNode(next);
479487
next.close();
480488
}
481489
for (NodeAndClient toRemove : nodesToRemove) {
@@ -639,6 +647,10 @@ public boolean apply(NodeAndClient nodeAndClient) {
639647
@Override
640648
public void close() {
641649
if (this.open.compareAndSet(true, false)) {
650+
if (activeDisruptionScheme != null) {
651+
activeDisruptionScheme.testClusterClosed();
652+
activeDisruptionScheme = null;
653+
}
642654
IOUtils.closeWhileHandlingException(nodes.values());
643655
nodes.clear();
644656
executor.shutdownNow();
@@ -811,6 +823,7 @@ public synchronized void beforeTest(Random random, double transportClientRatio)
811823
}
812824

813825
private synchronized void reset(boolean wipeData) throws IOException {
826+
clearDisruptionScheme();
814827
resetClients(); /* reset all clients - each test gets its own client based on the Random instance created above. */
815828
if (wipeData) {
816829
wipeDataDirectories();
@@ -1007,6 +1020,7 @@ public synchronized void stopRandomDataNode() throws IOException {
10071020
NodeAndClient nodeAndClient = getRandomNodeAndClient(new DataNodePredicate());
10081021
if (nodeAndClient != null) {
10091022
logger.info("Closing random node [{}] ", nodeAndClient.name);
1023+
removeDistruptionSchemeFromNode(nodeAndClient);
10101024
nodes.remove(nodeAndClient.name);
10111025
nodeAndClient.close();
10121026
}
@@ -1026,6 +1040,7 @@ public boolean apply(NodeAndClient nodeAndClient) {
10261040
});
10271041
if (nodeAndClient != null) {
10281042
logger.info("Closing filtered random node [{}] ", nodeAndClient.name);
1043+
removeDistruptionSchemeFromNode(nodeAndClient);
10291044
nodes.remove(nodeAndClient.name);
10301045
nodeAndClient.close();
10311046
}
@@ -1040,6 +1055,7 @@ public synchronized void stopCurrentMasterNode() throws IOException {
10401055
String masterNodeName = getMasterName();
10411056
assert nodes.containsKey(masterNodeName);
10421057
logger.info("Closing master node [{}] ", masterNodeName);
1058+
removeDistruptionSchemeFromNode(nodes.get(masterNodeName));
10431059
NodeAndClient remove = nodes.remove(masterNodeName);
10441060
remove.close();
10451061
}
@@ -1051,6 +1067,7 @@ public void stopRandomNonMasterNode() throws IOException {
10511067
NodeAndClient nodeAndClient = getRandomNodeAndClient(Predicates.not(new MasterNodePredicate(getMasterName())));
10521068
if (nodeAndClient != null) {
10531069
logger.info("Closing random non master node [{}] current master [{}] ", nodeAndClient.name, getMasterName());
1070+
removeDistruptionSchemeFromNode(nodeAndClient);
10541071
nodes.remove(nodeAndClient.name);
10551072
nodeAndClient.close();
10561073
}
@@ -1104,6 +1121,9 @@ private void restartAllNodes(boolean rollingRestart, RestartCallback callback) t
11041121
if (!callback.doRestart(nodeAndClient.name)) {
11051122
logger.info("Closing node [{}] during restart", nodeAndClient.name);
11061123
toRemove.add(nodeAndClient);
1124+
if (activeDisruptionScheme != null) {
1125+
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
1126+
}
11071127
nodeAndClient.close();
11081128
}
11091129
}
@@ -1118,18 +1138,33 @@ private void restartAllNodes(boolean rollingRestart, RestartCallback callback) t
11181138
for (NodeAndClient nodeAndClient : nodes.values()) {
11191139
callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient());
11201140
logger.info("Restarting node [{}] ", nodeAndClient.name);
1141+
if (activeDisruptionScheme != null) {
1142+
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
1143+
}
11211144
nodeAndClient.restart(callback);
1145+
if (activeDisruptionScheme != null) {
1146+
activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
1147+
}
11221148
}
11231149
} else {
11241150
int numNodesRestarted = 0;
11251151
for (NodeAndClient nodeAndClient : nodes.values()) {
11261152
callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient());
11271153
logger.info("Stopping node [{}] ", nodeAndClient.name);
1154+
if (activeDisruptionScheme != null) {
1155+
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
1156+
}
11281157
nodeAndClient.node.close();
11291158
}
11301159
for (NodeAndClient nodeAndClient : nodes.values()) {
11311160
logger.info("Starting node [{}] ", nodeAndClient.name);
1161+
if (activeDisruptionScheme != null) {
1162+
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
1163+
}
11321164
nodeAndClient.restart(callback);
1165+
if (activeDisruptionScheme != null) {
1166+
activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
1167+
}
11331168
}
11341169
}
11351170
}
@@ -1337,6 +1372,7 @@ private synchronized void publishNode(NodeAndClient nodeAndClient) {
13371372
dataDirToClean.addAll(Arrays.asList(nodeEnv.nodeDataLocations()));
13381373
}
13391374
nodes.put(nodeAndClient.name, nodeAndClient);
1375+
applyDisruptionSchemeToNode(nodeAndClient);
13401376
}
13411377

13421378
public void closeNonSharedNodes(boolean wipeData) throws IOException {
@@ -1358,6 +1394,33 @@ public boolean hasFilterCache() {
13581394
return hasFilterCache;
13591395
}
13601396

1397+
public void setDisruptionScheme(ServiceDisruptionScheme scheme) {
1398+
clearDisruptionScheme();
1399+
scheme.applyToCluster(this);
1400+
activeDisruptionScheme = scheme;
1401+
}
1402+
1403+
public void clearDisruptionScheme() {
1404+
if (activeDisruptionScheme != null) {
1405+
activeDisruptionScheme.removeFromCluster(this);
1406+
}
1407+
activeDisruptionScheme = null;
1408+
}
1409+
1410+
private void applyDisruptionSchemeToNode(NodeAndClient nodeAndClient) {
1411+
if (activeDisruptionScheme != null) {
1412+
assert nodes.containsKey(nodeAndClient.name);
1413+
activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
1414+
}
1415+
}
1416+
1417+
private void removeDistruptionSchemeFromNode(NodeAndClient nodeAndClient) {
1418+
if (activeDisruptionScheme != null) {
1419+
assert nodes.containsKey(nodeAndClient.name);
1420+
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
1421+
}
1422+
}
1423+
13611424
private synchronized Collection<NodeAndClient> dataNodeAndClients() {
13621425
return Collections2.filter(nodes.values(), new DataNodePredicate());
13631426
}

src/test/java/org/elasticsearch/test/TestCluster.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
2727
import org.elasticsearch.client.Client;
2828
import org.elasticsearch.cluster.metadata.IndexMetaData;
29+
import org.elasticsearch.common.Strings;
2930
import org.elasticsearch.common.logging.ESLogger;
3031
import org.elasticsearch.common.logging.Loggers;
3132
import org.elasticsearch.indices.IndexMissingException;

0 commit comments

Comments
 (0)