Skip to content

Commit 79c13b9

Browse files
bleskesmartijnvg
authored andcommitted
Added ServiceDisruptionScheme(s) and testAckedIndexing
This commit adds the notion of ServiceDisruptionScheme allowing for introducing disruptions in our test cluster. This abstraction as used in a couple of wrappers around the functionality offered by MockTransportService to simulate various network partions. There is also one implementation for causing a node to be slow in processing cluster state updates. This new mechnaism is integrated into existing tests DiscoveryWithNetworkFailuresTests. A new test called testAckedIndexing is added to verify retrieval of documents whose indexing was acked during various disruptions. Closes #6505
1 parent ab668b9 commit 79c13b9

17 files changed

+1156
-167
lines changed

src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ public ClusterState execute(ClusterState currentState) {
340340

341341
@Override
342342
public void onFailure(String source, Throwable t) {
343-
logger.error("unexpected failure during [{}]", t, source);
343+
logger.error("unexpected failure during [{}]", t, source);
344344
}
345345

346346
@Override
@@ -406,8 +406,7 @@ public ClusterState execute(ClusterState currentState) {
406406
public void onFailure(String source, Throwable t) {
407407
if (t instanceof ClusterService.NoLongerMasterException) {
408408
logger.debug("not processing {} leave request as we are no longer master", node);
409-
}
410-
else {
409+
} else {
411410
logger.error("unexpected failure during [{}]", t, source);
412411
}
413412
}
@@ -446,8 +445,7 @@ public ClusterState execute(ClusterState currentState) {
446445
public void onFailure(String source, Throwable t) {
447446
if (t instanceof ClusterService.NoLongerMasterException) {
448447
logger.debug("not processing [{}] as we are no longer master", source);
449-
}
450-
else {
448+
} else {
451449
logger.error("unexpected failure during [{}]", t, source);
452450
}
453451
}
@@ -484,8 +482,7 @@ public ClusterState execute(ClusterState currentState) {
484482
public void onFailure(String source, Throwable t) {
485483
if (t instanceof ClusterService.NoLongerMasterException) {
486484
logger.debug("not processing [{}] as we are no longer master", source);
487-
}
488-
else {
485+
} else {
489486
logger.error("unexpected failure during [{}]", t, source);
490487
}
491488
}
@@ -594,7 +591,7 @@ void handleNewClusterStateFromMaster(ClusterState newClusterState, final Publish
594591
return;
595592
}
596593
if (master) {
597-
logger.debug("received cluster state from [{}] which is also master but with cluster name [{}]", newClusterState.nodes().masterNode(), incomingClusterName);
594+
logger.debug("received cluster state from [{}] which is also master but with cluster name [{}]", newClusterState.nodes().masterNode(), incomingClusterName);
598595
final ClusterState newState = newClusterState;
599596
clusterService.submitStateUpdateTask("zen-disco-master_receive_cluster_state_from_another_master [" + newState.nodes().masterNode() + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
600597
@Override
@@ -638,7 +635,6 @@ public void onFailure(String source, Throwable t) {
638635
final ProcessClusterState processClusterState = new ProcessClusterState(newClusterState, newStateProcessed);
639636
processNewClusterStates.add(processClusterState);
640637

641-
642638
assert newClusterState.nodes().masterNode() != null : "received a cluster state without a master";
643639
assert !newClusterState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock()) : "received a cluster state with a master block";
644640

@@ -1014,8 +1010,7 @@ public ClusterState execute(ClusterState currentState) {
10141010
public void onFailure(String source, Throwable t) {
10151011
if (t instanceof ClusterService.NoLongerMasterException) {
10161012
logger.debug("not processing [{}] as we are no longer master", source);
1017-
}
1018-
else {
1013+
} else {
10191014
logger.error("unexpected failure during [{}]", t, source);
10201015
}
10211016
}

src/main/java/org/elasticsearch/transport/TransportService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,10 @@ public void removeHandler(String action) {
257257
}
258258
}
259259

260+
protected TransportRequestHandler getHandler(String action) {
261+
return serverHandlers.get(action);
262+
}
263+
260264
class Adapter implements TransportServiceAdapter {
261265

262266
final MeanMetric rxMetric = new MeanMetric();

src/test/java/org/elasticsearch/discovery/DiscoveryWithNetworkFailuresTests.java

Lines changed: 257 additions & 140 deletions
Large diffs are not rendered by default.

src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
4444
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
4545
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
46-
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
4746
import static org.hamcrest.Matchers.equalTo;
4847

4948
public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {

src/test/java/org/elasticsearch/test/BackgroundIndexer.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.elasticsearch.client.Client;
2626
import org.elasticsearch.common.logging.ESLogger;
2727
import org.elasticsearch.common.logging.Loggers;
28-
import org.elasticsearch.recovery.RecoveryWhileUnderLoadTests;
2928
import org.junit.Assert;
3029

3130
import java.util.concurrent.CopyOnWriteArrayList;
@@ -40,7 +39,7 @@
4039

4140
public class BackgroundIndexer implements AutoCloseable {
4241

43-
private final ESLogger logger = Loggers.getLogger(RecoveryWhileUnderLoadTests.class);
42+
private final ESLogger logger = Loggers.getLogger(getClass());
4443

4544
final Thread[] writers;
4645
final CountDownLatch stopLatch;
@@ -218,7 +217,7 @@ public void continueIndexing(int numOfDocs) {
218217
setBudget(numOfDocs);
219218
}
220219

221-
/** Stop all background threads **/
220+
/** Stop all background threads * */
222221
public void stop() throws InterruptedException {
223222
if (stop.get()) {
224223
return;

src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
import org.elasticsearch.rest.RestStatus;
8585
import org.elasticsearch.search.SearchService;
8686
import org.elasticsearch.test.client.RandomizingClient;
87+
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
8788
import org.junit.*;
8889

8990
import java.io.IOException;
@@ -332,7 +333,7 @@ private void randomIndexTemplate() throws IOException {
332333
if (randomBoolean()) {
333334
mappings.startObject(IdFieldMapper.NAME)
334335
.field("index", randomFrom("not_analyzed", "no"))
335-
.endObject();
336+
.endObject();
336337
}
337338
mappings.startObject(FieldNamesFieldMapper.NAME)
338339
.startObject("fielddata")
@@ -456,7 +457,7 @@ private static ImmutableSettings.Builder setRandomMerge(Random random, Immutable
456457
case 3:
457458
builder.put(MergeSchedulerModule.MERGE_SCHEDULER_TYPE_KEY, ConcurrentMergeSchedulerProvider.class);
458459
final int maxThreadCount = RandomInts.randomIntBetween(random, 1, 4);
459-
final int maxMergeCount = RandomInts.randomIntBetween(random, maxThreadCount, maxThreadCount+4);
460+
final int maxMergeCount = RandomInts.randomIntBetween(random, maxThreadCount, maxThreadCount + 4);
460461
builder.put(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT, maxMergeCount);
461462
builder.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, maxThreadCount);
462463
break;
@@ -514,6 +515,7 @@ protected final void afterInternal() throws IOException {
514515
boolean success = false;
515516
try {
516517
logger.info("[{}#{}]: cleaning up after test", getTestClass().getSimpleName(), getTestName());
518+
clearDisruptionScheme();
517519
final Scope currentClusterScope = getCurrentClusterScope();
518520
try {
519521
if (currentClusterScope != Scope.TEST) {
@@ -621,6 +623,15 @@ protected int numberOfReplicas() {
621623
return between(minimumNumberOfReplicas(), maximumNumberOfReplicas());
622624
}
623625

626+
627+
public void setDisruptionScheme(ServiceDisruptionScheme scheme) {
628+
internalCluster().setDisruptionScheme(scheme);
629+
}
630+
631+
public void clearDisruptionScheme() {
632+
internalCluster().clearDisruptionScheme();
633+
}
634+
624635
/**
625636
* Returns a settings object used in {@link #createIndex(String...)} and {@link #prepareCreate(String)} and friends.
626637
* This method can be overwritten by subclasses to set defaults for the indices that are created by the test.
@@ -1076,8 +1087,7 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, IndexReque
10761087
* @param forceRefresh if <tt>true</tt> all involved indices are refreshed once the documents are indexed. Additionally if <tt>true</tt>
10771088
* some empty dummy documents are may be randomly inserted into the document list and deleted once all documents are indexed.
10781089
* This is useful to produce deleted documents on the server side.
1079-
* @param builders the documents to index.
1080-
*
1090+
* @param builders the documents to index.
10811091
* @see #indexRandom(boolean, boolean, java.util.List)
10821092
*/
10831093
public void indexRandom(boolean forceRefresh, List<IndexRequestBuilder> builders) throws InterruptedException, ExecutionException {
@@ -1091,10 +1101,10 @@ public void indexRandom(boolean forceRefresh, List<IndexRequestBuilder> builders
10911101
* segment or if only one document is in a segment etc. This method prevents issues like this by randomizing the index
10921102
* layout.
10931103
*
1094-
* @param forceRefresh if <tt>true</tt> all involved indices are refreshed once the documents are indexed.
1104+
* @param forceRefresh if <tt>true</tt> all involved indices are refreshed once the documents are indexed.
10951105
* @param dummyDocuments if <tt>true</tt> some empty dummy documents are may be randomly inserted into the document list and deleted once
10961106
* all documents are indexed. This is useful to produce deleted documents on the server side.
1097-
* @param builders the documents to index.
1107+
* @param builders the documents to index.
10981108
*/
10991109
public void indexRandom(boolean forceRefresh, boolean dummyDocuments, List<IndexRequestBuilder> builders) throws InterruptedException, ExecutionException {
11001110
Random random = getRandom();
@@ -1107,7 +1117,7 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, List<Index
11071117
builders = new ArrayList<>(builders);
11081118
final String[] indices = indicesSet.toArray(new String[0]);
11091119
// inject some bogus docs
1110-
final int numBogusDocs = scaledRandomIntBetween(1, builders.size()*2);
1120+
final int numBogusDocs = scaledRandomIntBetween(1, builders.size() * 2);
11111121
final int unicodeLen = between(1, 10);
11121122
for (int i = 0; i < numBogusDocs; i++) {
11131123
String id = randomRealisticUnicodeOfLength(unicodeLen);
@@ -1159,10 +1169,10 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, List<Index
11591169
}
11601170
assertThat(actualErrors, emptyIterable());
11611171
if (!bogusIds.isEmpty()) {
1162-
// delete the bogus types again - it might trigger merges or at least holes in the segments and enforces deleted docs!
1163-
for (Tuple<String, String> doc : bogusIds) {
1164-
assertTrue("failed to delete a dummy doc", client().prepareDelete(doc.v1(), RANDOM_BOGUS_TYPE, doc.v2()).get().isFound());
1165-
}
1172+
// delete the bogus types again - it might trigger merges or at least holes in the segments and enforces deleted docs!
1173+
for (Tuple<String, String> doc : bogusIds) {
1174+
assertTrue("failed to delete a dummy doc", client().prepareDelete(doc.v1(), RANDOM_BOGUS_TYPE, doc.v2()).get().isFound());
1175+
}
11661176
}
11671177
if (forceRefresh) {
11681178
assertNoFailures(client().admin().indices().prepareRefresh(indices).setIndicesOptions(IndicesOptions.lenientExpandOpen()).execute().get());

src/test/java/org/elasticsearch/test/InternalTestCluster.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.elasticsearch.cluster.node.DiscoveryNode;
4444
import org.elasticsearch.cluster.node.DiscoveryNodes;
4545
import org.elasticsearch.cluster.routing.ShardRouting;
46+
import org.elasticsearch.common.Strings;
4647
import org.elasticsearch.common.io.FileSystemUtils;
4748
import org.elasticsearch.common.lease.Releasables;
4849
import org.elasticsearch.common.logging.ESLogger;
@@ -69,6 +70,7 @@
6970
import org.elasticsearch.search.SearchService;
7071
import org.elasticsearch.test.cache.recycler.MockBigArraysModule;
7172
import org.elasticsearch.test.cache.recycler.MockPageCacheRecyclerModule;
73+
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
7274
import org.elasticsearch.test.engine.MockEngineModule;
7375
import org.elasticsearch.test.store.MockFSIndexStoreModule;
7476
import org.elasticsearch.test.transport.AssertingLocalTransportModule;
@@ -169,6 +171,8 @@ public final class InternalTestCluster extends TestCluster {
169171

170172
private final boolean hasFilterCache;
171173

174+
private ServiceDisruptionScheme activeDisruptionScheme;
175+
172176
public InternalTestCluster(long clusterSeed, String clusterName) {
173177
this(clusterSeed, DEFAULT_MIN_NUM_DATA_NODES, DEFAULT_MAX_NUM_DATA_NODES, clusterName, NodeSettingsSource.EMPTY, DEFAULT_NUM_CLIENT_NODES, DEFAULT_ENABLE_RANDOM_BENCH_NODES);
174178
}
@@ -244,6 +248,10 @@ public String getClusterName() {
244248
return clusterName;
245249
}
246250

251+
public String[] getNodeNames() {
252+
return nodes.keySet().toArray(Strings.EMPTY_ARRAY);
253+
}
254+
247255
private static boolean isLocalTransportConfigured() {
248256
if ("local".equals(System.getProperty("es.node.mode", "network"))) {
249257
return true;
@@ -427,6 +435,7 @@ public synchronized void ensureAtMostNumDataNodes(int n) throws IOException {
427435
while (limit.hasNext()) {
428436
NodeAndClient next = limit.next();
429437
nodesToRemove.add(next);
438+
removeDistruptionSchemeFromNode(next);
430439
next.close();
431440
}
432441
for (NodeAndClient toRemove : nodesToRemove) {
@@ -590,6 +599,10 @@ public boolean apply(NodeAndClient nodeAndClient) {
590599
@Override
591600
public void close() {
592601
if (this.open.compareAndSet(true, false)) {
602+
if (activeDisruptionScheme != null) {
603+
activeDisruptionScheme.testClusterClosed();
604+
activeDisruptionScheme = null;
605+
}
593606
IOUtils.closeWhileHandlingException(nodes.values());
594607
nodes.clear();
595608
executor.shutdownNow();
@@ -754,6 +767,7 @@ public synchronized void beforeTest(Random random, double transportClientRatio)
754767
}
755768

756769
private synchronized void reset(boolean wipeData) throws IOException {
770+
clearDisruptionScheme();
757771
resetClients(); /* reset all clients - each test gets its own client based on the Random instance created above. */
758772
if (wipeData) {
759773
wipeDataDirectories();
@@ -950,6 +964,7 @@ public synchronized void stopRandomDataNode() throws IOException {
950964
NodeAndClient nodeAndClient = getRandomNodeAndClient(new DataNodePredicate());
951965
if (nodeAndClient != null) {
952966
logger.info("Closing random node [{}] ", nodeAndClient.name);
967+
removeDistruptionSchemeFromNode(nodeAndClient);
953968
nodes.remove(nodeAndClient.name);
954969
nodeAndClient.close();
955970
}
@@ -969,6 +984,7 @@ public boolean apply(NodeAndClient nodeAndClient) {
969984
});
970985
if (nodeAndClient != null) {
971986
logger.info("Closing filtered random node [{}] ", nodeAndClient.name);
987+
removeDistruptionSchemeFromNode(nodeAndClient);
972988
nodes.remove(nodeAndClient.name);
973989
nodeAndClient.close();
974990
}
@@ -983,6 +999,7 @@ public synchronized void stopCurrentMasterNode() throws IOException {
983999
String masterNodeName = getMasterName();
9841000
assert nodes.containsKey(masterNodeName);
9851001
logger.info("Closing master node [{}] ", masterNodeName);
1002+
removeDistruptionSchemeFromNode(nodes.get(masterNodeName));
9861003
NodeAndClient remove = nodes.remove(masterNodeName);
9871004
remove.close();
9881005
}
@@ -994,6 +1011,7 @@ public void stopRandomNonMasterNode() throws IOException {
9941011
NodeAndClient nodeAndClient = getRandomNodeAndClient(Predicates.not(new MasterNodePredicate(getMasterName())));
9951012
if (nodeAndClient != null) {
9961013
logger.info("Closing random non master node [{}] current master [{}] ", nodeAndClient.name, getMasterName());
1014+
removeDistruptionSchemeFromNode(nodeAndClient);
9971015
nodes.remove(nodeAndClient.name);
9981016
nodeAndClient.close();
9991017
}
@@ -1047,6 +1065,9 @@ private void restartAllNodes(boolean rollingRestart, RestartCallback callback) t
10471065
if (!callback.doRestart(nodeAndClient.name)) {
10481066
logger.info("Closing node [{}] during restart", nodeAndClient.name);
10491067
toRemove.add(nodeAndClient);
1068+
if (activeDisruptionScheme != null) {
1069+
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
1070+
}
10501071
nodeAndClient.close();
10511072
}
10521073
}
@@ -1061,18 +1082,33 @@ private void restartAllNodes(boolean rollingRestart, RestartCallback callback) t
10611082
for (NodeAndClient nodeAndClient : nodes.values()) {
10621083
callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient());
10631084
logger.info("Restarting node [{}] ", nodeAndClient.name);
1085+
if (activeDisruptionScheme != null) {
1086+
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
1087+
}
10641088
nodeAndClient.restart(callback);
1089+
if (activeDisruptionScheme != null) {
1090+
activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
1091+
}
10651092
}
10661093
} else {
10671094
int numNodesRestarted = 0;
10681095
for (NodeAndClient nodeAndClient : nodes.values()) {
10691096
callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient());
10701097
logger.info("Stopping node [{}] ", nodeAndClient.name);
1098+
if (activeDisruptionScheme != null) {
1099+
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
1100+
}
10711101
nodeAndClient.node.close();
10721102
}
10731103
for (NodeAndClient nodeAndClient : nodes.values()) {
10741104
logger.info("Starting node [{}] ", nodeAndClient.name);
1105+
if (activeDisruptionScheme != null) {
1106+
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
1107+
}
10751108
nodeAndClient.restart(callback);
1109+
if (activeDisruptionScheme != null) {
1110+
activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
1111+
}
10761112
}
10771113
}
10781114
}
@@ -1280,6 +1316,7 @@ private synchronized void publishNode(NodeAndClient nodeAndClient) {
12801316
dataDirToClean.addAll(Arrays.asList(nodeEnv.nodeDataLocations()));
12811317
}
12821318
nodes.put(nodeAndClient.name, nodeAndClient);
1319+
applyDisruptionSchemeToNode(nodeAndClient);
12831320
}
12841321

12851322
public void closeNonSharedNodes(boolean wipeData) throws IOException {
@@ -1301,6 +1338,33 @@ public boolean hasFilterCache() {
13011338
return hasFilterCache;
13021339
}
13031340

1341+
public void setDisruptionScheme(ServiceDisruptionScheme scheme) {
1342+
clearDisruptionScheme();
1343+
scheme.applyToCluster(this);
1344+
activeDisruptionScheme = scheme;
1345+
}
1346+
1347+
public void clearDisruptionScheme() {
1348+
if (activeDisruptionScheme != null) {
1349+
activeDisruptionScheme.removeFromCluster(this);
1350+
}
1351+
activeDisruptionScheme = null;
1352+
}
1353+
1354+
private void applyDisruptionSchemeToNode(NodeAndClient nodeAndClient) {
1355+
if (activeDisruptionScheme != null) {
1356+
assert nodes.containsKey(nodeAndClient.name);
1357+
activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
1358+
}
1359+
}
1360+
1361+
private void removeDistruptionSchemeFromNode(NodeAndClient nodeAndClient) {
1362+
if (activeDisruptionScheme != null) {
1363+
assert nodes.containsKey(nodeAndClient.name);
1364+
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
1365+
}
1366+
}
1367+
13041368
private synchronized Collection<NodeAndClient> dataNodeAndClients() {
13051369
return Collections2.filter(nodes.values(), new DataNodePredicate());
13061370
}

0 commit comments

Comments
 (0)