diff --git a/core/src/test/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java b/core/src/test/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java index 22377ea1769c8..8fcc76e018a6c 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java @@ -34,13 +34,10 @@ public void testBulkIndexCreatesMapping() throws Exception { BulkRequestBuilder bulkBuilder = client().prepareBulk(); bulkBuilder.add(bulkAction.getBytes(StandardCharsets.UTF_8), 0, bulkAction.length(), null, null, XContentType.JSON); bulkBuilder.get(); - assertBusy(new Runnable() { - @Override - public void run() { - GetMappingsResponse mappingsResponse = client().admin().indices().prepareGetMappings().get(); - assertTrue(mappingsResponse.getMappings().containsKey("logstash-2014.03.30")); - assertTrue(mappingsResponse.getMappings().get("logstash-2014.03.30").containsKey("logs")); - } + assertBusy(() -> { + GetMappingsResponse mappingsResponse = client().admin().indices().prepareGetMappings().get(); + assertTrue(mappingsResponse.getMappings().containsKey("logstash-2014.03.30")); + assertTrue(mappingsResponse.getMappings().get("logstash-2014.03.30").containsKey("logs")); }); } } diff --git a/core/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java index ba488cecb38f8..29235329d6669 100644 --- a/core/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java @@ -275,12 +275,7 @@ public void testRetryOfAnAlreadyTimedOutRequest() throws Exception { transport.handleLocalError(requestId, new ConnectTransportException(node, "test exception")); // wait until the timeout was triggered and we actually tried to send for the second time - assertBusy(new Runnable() { - @Override - public void run() { - assertThat(transport.capturedRequests().length, equalTo(1)); - } - }); + assertBusy(() -> assertThat(transport.capturedRequests().length, equalTo(1))); // let it fail the second time too requestId = transport.capturedRequests()[0].requestId; diff --git a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java index 3fc67f3eb0ed0..31ffb026e3a7f 100644 --- a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java @@ -158,12 +158,9 @@ public void testSimpleMinimumMasterNodes() throws Exception { } internalCluster().stopRandomNonMasterNode(); - assertBusy(new Runnable() { - @Override - public void run() { - ClusterState state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); - assertThat(state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID), equalTo(true)); - } + assertBusy(() -> { + ClusterState state1 = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); + assertThat(state1.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID), equalTo(true)); }); logger.info("--> starting the previous master node again..."); @@ -405,12 +402,7 @@ public void onFailure(String source, Exception e) { latch.await(); assertThat(failure.get(), instanceOf(Discovery.FailedToCommitClusterStateException.class)); - assertBusy(new Runnable() { - @Override - public void run() { - assertThat(masterClusterService.state().nodes().getMasterNode(), nullValue()); - } - }); + assertBusy(() -> assertThat(masterClusterService.state().nodes().getMasterNode(), nullValue())); partition.stopDisrupting(); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationIT.java b/core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationIT.java index 853f0f6561226..e82dbf4d0e94c 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationIT.java @@ -68,12 +68,7 @@ public void testDelayedAllocationNodeLeavesAndComesBack() throws Exception { ensureGreen("test"); indexRandomData(); internalCluster().stopRandomNode(InternalTestCluster.nameFilter(findNodeWithShard())); - assertBusy(new Runnable() { - @Override - public void run() { - assertThat(client().admin().cluster().prepareState().all().get().getState().getRoutingNodes().unassigned().size() > 0, equalTo(true)); - } - }); + assertBusy(() -> assertThat(client().admin().cluster().prepareState().all().get().getState().getRoutingNodes().unassigned().size() > 0, equalTo(true))); assertThat(client().admin().cluster().prepareHealth().get().getDelayedUnassignedShards(), equalTo(1)); internalCluster().startNode(); // this will use the same data location as the stopped node ensureGreen("test"); @@ -114,12 +109,7 @@ public void testDelayedAllocationChangeWithSettingTo100ms() throws Exception { ensureGreen("test"); indexRandomData(); internalCluster().stopRandomNode(InternalTestCluster.nameFilter(findNodeWithShard())); - assertBusy(new Runnable() { - @Override - public void run() { - assertThat(client().admin().cluster().prepareState().all().get().getState().getRoutingNodes().unassigned().size() > 0, equalTo(true)); - } - }); + assertBusy(() -> assertThat(client().admin().cluster().prepareState().all().get().getState().getRoutingNodes().unassigned().size() > 0, equalTo(true))); assertThat(client().admin().cluster().prepareHealth().get().getDelayedUnassignedShards(), equalTo(1)); assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueMillis(100))).get()); ensureGreen("test"); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java index 51ddc0f3fd9b3..93ac2878abc0f 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java @@ -57,12 +57,9 @@ public void testRerouteOccursOnDiskPassingHighWatermark() throws Exception { List nodes = internalCluster().startNodes(3); // Wait for all 3 nodes to be up - assertBusy(new Runnable() { - @Override - public void run() { - NodesStatsResponse resp = client().admin().cluster().prepareNodesStats().get(); - assertThat(resp.getNodes().size(), equalTo(3)); - } + assertBusy(() -> { + NodesStatsResponse resp = client().admin().cluster().prepareNodesStats().get(); + assertThat(resp.getNodes().size(), equalTo(3)); }); // Start with all nodes at 50% usage @@ -86,13 +83,10 @@ public void run() { ensureGreen("test"); // Block until the "fake" cluster info is retrieved at least once - assertBusy(new Runnable() { - @Override - public void run() { - ClusterInfo info = cis.getClusterInfo(); - logger.info("--> got: {} nodes", info.getNodeLeastAvailableDiskUsages().size()); - assertThat(info.getNodeLeastAvailableDiskUsages().size(), greaterThan(0)); - } + assertBusy(() -> { + ClusterInfo info = cis.getClusterInfo(); + logger.info("--> got: {} nodes", info.getNodeLeastAvailableDiskUsages().size()); + assertThat(info.getNodeLeastAvailableDiskUsages().size(), greaterThan(0)); }); final List realNodeNames = new ArrayList<>(); @@ -113,21 +107,18 @@ public void run() { // Retrieve the count of shards on each node final Map nodesToShardCount = new HashMap<>(); - assertBusy(new Runnable() { - @Override - public void run() { - ClusterStateResponse resp = client().admin().cluster().prepareState().get(); - Iterator iter = resp.getState().getRoutingNodes().iterator(); - while (iter.hasNext()) { - RoutingNode node = iter.next(); - logger.info("--> node {} has {} shards", - node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); - nodesToShardCount.put(node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); - } - assertThat("node1 has 5 shards", nodesToShardCount.get(realNodeNames.get(0)), equalTo(5)); - assertThat("node2 has 5 shards", nodesToShardCount.get(realNodeNames.get(1)), equalTo(5)); - assertThat("node3 has 0 shards", nodesToShardCount.get(realNodeNames.get(2)), equalTo(0)); + assertBusy(() -> { + ClusterStateResponse resp12 = client().admin().cluster().prepareState().get(); + Iterator iter12 = resp12.getState().getRoutingNodes().iterator(); + while (iter12.hasNext()) { + RoutingNode node = iter12.next(); + logger.info("--> node {} has {} shards", + node.nodeId(), resp12.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); + nodesToShardCount.put(node.nodeId(), resp12.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); } + assertThat("node1 has 5 shards", nodesToShardCount.get(realNodeNames.get(0)), equalTo(5)); + assertThat("node2 has 5 shards", nodesToShardCount.get(realNodeNames.get(1)), equalTo(5)); + assertThat("node3 has 0 shards", nodesToShardCount.get(realNodeNames.get(2)), equalTo(0)); }); // Update the disk usages so one node is now back under the high watermark @@ -138,21 +129,18 @@ public void run() { // Retrieve the count of shards on each node nodesToShardCount.clear(); - assertBusy(new Runnable() { - @Override - public void run() { - ClusterStateResponse resp = client().admin().cluster().prepareState().get(); - Iterator iter = resp.getState().getRoutingNodes().iterator(); - while (iter.hasNext()) { - RoutingNode node = iter.next(); - logger.info("--> node {} has {} shards", - node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); - nodesToShardCount.put(node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); - } - assertThat("node1 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(0)), greaterThanOrEqualTo(3)); - assertThat("node2 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(1)), greaterThanOrEqualTo(3)); - assertThat("node3 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(2)), greaterThanOrEqualTo(3)); + assertBusy(() -> { + ClusterStateResponse resp1 = client().admin().cluster().prepareState().get(); + Iterator iter1 = resp1.getState().getRoutingNodes().iterator(); + while (iter1.hasNext()) { + RoutingNode node = iter1.next(); + logger.info("--> node {} has {} shards", + node.nodeId(), resp1.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); + nodesToShardCount.put(node.nodeId(), resp1.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); } + assertThat("node1 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(0)), greaterThanOrEqualTo(3)); + assertThat("node2 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(1)), greaterThanOrEqualTo(3)); + assertThat("node3 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(2)), greaterThanOrEqualTo(3)); }); } } diff --git a/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index 72db2911fc023..142123bb48349 100644 --- a/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -229,12 +229,9 @@ public void run() { assertThat("wrong pool size", pool.getPoolSize(), equalTo(max)); assertThat("wrong active size", pool.getActiveCount(), equalTo(max)); barrier.await(); - assertBusy(new Runnable() { - @Override - public void run() { - assertThat("wrong active count", pool.getActiveCount(), equalTo(0)); - assertThat("idle threads didn't shrink below max. (" + pool.getPoolSize() + ")", pool.getPoolSize(), lessThan(max)); - } + assertBusy(() -> { + assertThat("wrong active count", pool.getActiveCount(), equalTo(0)); + assertThat("idle threads didn't shrink below max. (" + pool.getPoolSize() + ")", pool.getPoolSize(), lessThan(max)); }); terminate(pool); } diff --git a/core/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java b/core/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java index 3ed105080b30b..17b43a079dc53 100644 --- a/core/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java +++ b/core/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java @@ -264,12 +264,7 @@ public void run() { // the timeout handler is added post execution (and quickly cancelled). We have allow for this // and use assert busy - assertBusy(new Runnable() { - @Override - public void run() { - assertThat(timer.getQueue().size(), equalTo(0)); - } - }, 5, TimeUnit.SECONDS); + assertBusy(() -> assertThat(timer.getQueue().size(), equalTo(0)), 5, TimeUnit.SECONDS); assertThat(timeoutCalled.get(), equalTo(false)); assertTrue(terminate(executor)); assertTrue(terminate(threadPool)); diff --git a/core/src/test/java/org/elasticsearch/discovery/AbstractDisruptionTestCase.java b/core/src/test/java/org/elasticsearch/discovery/AbstractDisruptionTestCase.java index f1b7415c67953..50dfd92d82e15 100644 --- a/core/src/test/java/org/elasticsearch/discovery/AbstractDisruptionTestCase.java +++ b/core/src/test/java/org/elasticsearch/discovery/AbstractDisruptionTestCase.java @@ -197,35 +197,29 @@ void assertNoMaster(final String node, TimeValue maxWaitTime) throws Exception { } void assertNoMaster(final String node, @Nullable final ClusterBlock expectedBlocks, TimeValue maxWaitTime) throws Exception { - assertBusy(new Runnable() { - @Override - public void run() { - ClusterState state = getNodeClusterState(node); - final DiscoveryNodes nodes = state.nodes(); - assertNull("node [" + node + "] still has [" + nodes.getMasterNode() + "] as master", nodes.getMasterNode()); - if (expectedBlocks != null) { - for (ClusterBlockLevel level : expectedBlocks.levels()) { - assertTrue("node [" + node + "] does have level [" + level + "] in it's blocks", state.getBlocks().hasGlobalBlock - (level)); - } + assertBusy(() -> { + ClusterState state = getNodeClusterState(node); + final DiscoveryNodes nodes = state.nodes(); + assertNull("node [" + node + "] still has [" + nodes.getMasterNode() + "] as master", nodes.getMasterNode()); + if (expectedBlocks != null) { + for (ClusterBlockLevel level : expectedBlocks.levels()) { + assertTrue("node [" + node + "] does have level [" + level + "] in it's blocks", state.getBlocks().hasGlobalBlock + (level)); } } }, maxWaitTime.getMillis(), TimeUnit.MILLISECONDS); } void assertDifferentMaster(final String node, final String oldMasterNode) throws Exception { - assertBusy(new Runnable() { - @Override - public void run() { - ClusterState state = getNodeClusterState(node); - String masterNode = null; - if (state.nodes().getMasterNode() != null) { - masterNode = state.nodes().getMasterNode().getName(); - } - logger.trace("[{}] master is [{}]", node, state.nodes().getMasterNode()); - assertThat("node [" + node + "] still has [" + masterNode + "] as master", - oldMasterNode, not(equalTo(masterNode))); + assertBusy(() -> { + ClusterState state = getNodeClusterState(node); + String masterNode = null; + if (state.nodes().getMasterNode() != null) { + masterNode = state.nodes().getMasterNode().getName(); } + logger.trace("[{}] master is [{}]", node, state.nodes().getMasterNode()); + assertThat("node [" + node + "] still has [" + masterNode + "] as master", + oldMasterNode, not(equalTo(masterNode))); }, 10, TimeUnit.SECONDS); } diff --git a/core/src/test/java/org/elasticsearch/document/ShardInfoIT.java b/core/src/test/java/org/elasticsearch/document/ShardInfoIT.java index 84166bb3f962e..5a5f279985f4c 100644 --- a/core/src/test/java/org/elasticsearch/document/ShardInfoIT.java +++ b/core/src/test/java/org/elasticsearch/document/ShardInfoIT.java @@ -128,24 +128,21 @@ private void assertShardInfo(ReplicationResponse response, int expectedTotal, in } private void ensureActiveShardCopies(final int shardId, final int copyCount) throws Exception { - assertBusy(new Runnable() { - @Override - public void run() { - ClusterState state = client().admin().cluster().prepareState().get().getState(); - assertThat(state.routingTable().index("idx"), not(nullValue())); - assertThat(state.routingTable().index("idx").shard(shardId), not(nullValue())); - assertThat(state.routingTable().index("idx").shard(shardId).activeShards().size(), equalTo(copyCount)); - - ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth("idx") - .setWaitForNoRelocatingShards(true) - .get(); - assertThat(healthResponse.isTimedOut(), equalTo(false)); - - RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("idx") - .setActiveOnly(true) - .get(); - assertThat(recoveryResponse.shardRecoveryStates().get("idx").size(), equalTo(0)); - } + assertBusy(() -> { + ClusterState state = client().admin().cluster().prepareState().get().getState(); + assertThat(state.routingTable().index("idx"), not(nullValue())); + assertThat(state.routingTable().index("idx").shard(shardId), not(nullValue())); + assertThat(state.routingTable().index("idx").shard(shardId).activeShards().size(), equalTo(copyCount)); + + ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth("idx") + .setWaitForNoRelocatingShards(true) + .get(); + assertThat(healthResponse.isTimedOut(), equalTo(false)); + + RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("idx") + .setActiveOnly(true) + .get(); + assertThat(recoveryResponse.shardRecoveryStates().get("idx").size(), equalTo(0)); }); } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index a5e5ecd8aa6e3..174f68da4b75f 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -39,6 +39,7 @@ import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; @@ -83,7 +84,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; -import java.util.function.Supplier; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -458,7 +458,7 @@ public void testStressMaybeFlushOrRollTranslogGeneration() throws Exception { threads[i].start(); } barrier.await(); - final Runnable check; + final CheckedRunnable check; if (flush) { final FlushStats flushStats = shard.flushStats(); final long total = flushStats.getTotal(); diff --git a/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java b/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java index fb524f27591e6..0c34bb54ebe58 100644 --- a/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java @@ -405,14 +405,11 @@ protected Cancellable scheduleTask(ThreadPool threadPool) { imc.forceCheck(); // We must assertBusy because the writeIndexingBufferAsync is done in background (REFRESH) thread pool: - assertBusy(new Runnable() { - @Override - public void run() { - try (Engine.Searcher s2 = shard.acquireSearcher("index")) { - // 100 buffered deletes will easily exceed our 1 KB indexing buffer so it should trigger a write: - final long indexingBufferBytes2 = shard.getIndexBufferRAMBytesUsed(); - assertTrue(indexingBufferBytes2 < indexingBufferBytes1); - } + assertBusy(() -> { + try (Engine.Searcher s2 = shard.acquireSearcher("index")) { + // 100 buffered deletes will easily exceed our 1 KB indexing buffer so it should trigger a write: + final long indexingBufferBytes2 = shard.getIndexBufferRAMBytesUsed(); + assertTrue(indexingBufferBytes2 < indexingBufferBytes1); } }); } diff --git a/core/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java b/core/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java index 484f6e5db76aa..80001ed16ae21 100644 --- a/core/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java +++ b/core/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java @@ -381,16 +381,13 @@ public void testBucketBreaker() throws Exception { /** Issues a cache clear and waits 30 seconds for the field data breaker to be cleared */ public void clearFieldData() throws Exception { client().admin().indices().prepareClearCache().setFieldDataCache(true).execute().actionGet(); - assertBusy(new Runnable() { - @Override - public void run() { - NodesStatsResponse resp = client().admin().cluster().prepareNodesStats() - .clear().setBreaker(true).get(new TimeValue(15, TimeUnit.SECONDS)); - for (NodeStats nStats : resp.getNodes()) { - assertThat("fielddata breaker never reset back to 0", - nStats.getBreaker().getStats(CircuitBreaker.FIELDDATA).getEstimated(), - equalTo(0L)); - } + assertBusy(() -> { + NodesStatsResponse resp = client().admin().cluster().prepareNodesStats() + .clear().setBreaker(true).get(new TimeValue(15, TimeUnit.SECONDS)); + for (NodeStats nStats : resp.getNodes()) { + assertThat("fielddata breaker never reset back to 0", + nStats.getBreaker().getStats(CircuitBreaker.FIELDDATA).getEstimated(), + equalTo(0L)); } }, 30, TimeUnit.SECONDS); } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index 7542545bc3ab8..cf1449fecd6a5 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -269,17 +269,13 @@ public void testRerouteRecovery() throws Exception { logger.info("--> waiting for recovery to start both on source and target"); final Index index = resolveIndex(INDEX_NAME); - assertBusy(new Runnable() { - @Override - public void run() { - - IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeA); - assertThat(indicesService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsSource(), - equalTo(1)); - indicesService = internalCluster().getInstance(IndicesService.class, nodeB); - assertThat(indicesService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsTarget(), - equalTo(1)); - } + assertBusy(() -> { + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeA); + assertThat(indicesService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsSource(), + equalTo(1)); + indicesService = internalCluster().getInstance(IndicesService.class, nodeB); + assertThat(indicesService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsTarget(), + equalTo(1)); }); logger.info("--> request recoveries"); @@ -318,19 +314,16 @@ public void run() { logger.info("--> checking throttling increases"); final long finalNodeAThrottling = nodeAThrottling; final long finalNodeBThrottling = nodeBThrottling; - assertBusy(new Runnable() { - @Override - public void run() { - NodesStatsResponse statsResponse = client().admin().cluster().prepareNodesStats().clear().setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)).get(); - assertThat(statsResponse.getNodes(), hasSize(2)); - for (NodeStats nodeStats : statsResponse.getNodes()) { - final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats(); - if (nodeStats.getNode().getName().equals(nodeA)) { - assertThat("node A throttling should increase", recoveryStats.throttleTime().millis(), greaterThan(finalNodeAThrottling)); - } - if (nodeStats.getNode().getName().equals(nodeB)) { - assertThat("node B throttling should increase", recoveryStats.throttleTime().millis(), greaterThan(finalNodeBThrottling)); - } + assertBusy(() -> { + NodesStatsResponse statsResponse1 = client().admin().cluster().prepareNodesStats().clear().setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)).get(); + assertThat(statsResponse1.getNodes(), hasSize(2)); + for (NodeStats nodeStats : statsResponse1.getNodes()) { + final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats(); + if (nodeStats.getNode().getName().equals(nodeA)) { + assertThat("node A throttling should increase", recoveryStats.throttleTime().millis(), greaterThan(finalNodeAThrottling)); + } + if (nodeStats.getNode().getName().equals(nodeB)) { + assertThat("node B throttling should increase", recoveryStats.throttleTime().millis(), greaterThan(finalNodeBThrottling)); } } }); diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java index 4f893c946ecb4..7a65541cb5eaf 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java @@ -157,12 +157,7 @@ Timer createObj() { Timer lastRead = streamer.serializeDeserialize(); final long time = lastRead.time(); assertThat(time, lessThanOrEqualTo(timer.time())); - assertBusy(new Runnable() { - @Override - public void run() { - assertThat("timer timer should progress compared to captured one ", time, lessThan(timer.time())); - } - }); + assertBusy(() -> assertThat("timer timer should progress compared to captured one ", time, lessThan(timer.time()))); assertThat("captured time shouldn't change", lastRead.time(), equalTo(time)); if (randomBoolean()) { diff --git a/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java b/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java index c38c20e0c256a..f138afe35b0df 100644 --- a/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java +++ b/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java @@ -266,23 +266,20 @@ public void onFailure(Exception e) { } }); // ...and wait for mappings to be available on master - assertBusy(new Runnable() { - @Override - public void run() { - ImmutableOpenMap indexMappings = client().admin().indices().prepareGetMappings("index").get().getMappings().get("index"); - assertNotNull(indexMappings); - MappingMetaData typeMappings = indexMappings.get("type"); - assertNotNull(typeMappings); - Object properties; - try { - properties = typeMappings.getSourceAsMap().get("properties"); - } catch (IOException e) { - throw new AssertionError(e); - } - assertNotNull(properties); - Object fieldMapping = ((Map) properties).get("field"); - assertNotNull(fieldMapping); + assertBusy(() -> { + ImmutableOpenMap indexMappings = client().admin().indices().prepareGetMappings("index").get().getMappings().get("index"); + assertNotNull(indexMappings); + MappingMetaData typeMappings = indexMappings.get("type"); + assertNotNull(typeMappings); + Object properties; + try { + properties = typeMappings.getSourceAsMap().get("properties"); + } catch (IOException e) { + throw new AssertionError(e); } + assertNotNull(properties); + Object fieldMapping = ((Map) properties).get("field"); + assertNotNull(fieldMapping); }); final AtomicReference docIndexResponse = new AtomicReference<>(); @@ -307,17 +304,14 @@ public void onFailure(Exception e) { // Now make sure the indexing request finishes successfully disruption.stopDisrupting(); - assertBusy(new Runnable() { - @Override - public void run() { - assertThat(putMappingResponse.get(), instanceOf(PutMappingResponse.class)); - PutMappingResponse resp = (PutMappingResponse) putMappingResponse.get(); - assertTrue(resp.isAcknowledged()); - assertThat(docIndexResponse.get(), instanceOf(IndexResponse.class)); - IndexResponse docResp = (IndexResponse) docIndexResponse.get(); - assertEquals(Arrays.toString(docResp.getShardInfo().getFailures()), - 1, docResp.getShardInfo().getTotal()); - } + assertBusy(() -> { + assertThat(putMappingResponse.get(), instanceOf(PutMappingResponse.class)); + PutMappingResponse resp = (PutMappingResponse) putMappingResponse.get(); + assertTrue(resp.isAcknowledged()); + assertThat(docIndexResponse.get(), instanceOf(IndexResponse.class)); + IndexResponse docResp = (IndexResponse) docIndexResponse.get(); + assertEquals(Arrays.toString(docResp.getShardInfo().getFailures()), + 1, docResp.getShardInfo().getTotal()); }); } @@ -387,17 +381,14 @@ public void onFailure(Exception e) { }); final Index index = resolveIndex("index"); // Wait for mappings to be available on master - assertBusy(new Runnable() { - @Override - public void run() { - final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, master); - final IndexService indexService = indicesService.indexServiceSafe(index); - assertNotNull(indexService); - final MapperService mapperService = indexService.mapperService(); - DocumentMapper mapper = mapperService.documentMapper("type"); - assertNotNull(mapper); - assertNotNull(mapper.mappers().getMapper("field")); - } + assertBusy(() -> { + final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, master); + final IndexService indexService = indicesService.indexServiceSafe(index); + assertNotNull(indexService); + final MapperService mapperService = indexService.mapperService(); + DocumentMapper mapper = mapperService.documentMapper("type"); + assertNotNull(mapper); + assertNotNull(mapper.mappers().getMapper("field")); }); final AtomicReference docIndexResponse = new AtomicReference<>(); @@ -414,12 +405,7 @@ public void onFailure(Exception e) { }); // Wait for document to be indexed on primary - assertBusy(new Runnable() { - @Override - public void run() { - assertTrue(client().prepareGet("index", "type", "1").setPreference("_primary").get().isExists()); - } - }); + assertBusy(() -> assertTrue(client().prepareGet("index", "type", "1").setPreference("_primary").get().isExists())); // The mappings have not been propagated to the replica yet as a consequence the document count not be indexed // We wait on purpose to make sure that the document is not indexed because the shard operation is stalled @@ -430,17 +416,14 @@ public void run() { // Now make sure the indexing request finishes successfully disruption.stopDisrupting(); - assertBusy(new Runnable() { - @Override - public void run() { - assertThat(putMappingResponse.get(), instanceOf(PutMappingResponse.class)); - PutMappingResponse resp = (PutMappingResponse) putMappingResponse.get(); - assertTrue(resp.isAcknowledged()); - assertThat(docIndexResponse.get(), instanceOf(IndexResponse.class)); - IndexResponse docResp = (IndexResponse) docIndexResponse.get(); - assertEquals(Arrays.toString(docResp.getShardInfo().getFailures()), - 2, docResp.getShardInfo().getTotal()); // both shards should have succeeded - } + assertBusy(() -> { + assertThat(putMappingResponse.get(), instanceOf(PutMappingResponse.class)); + PutMappingResponse resp = (PutMappingResponse) putMappingResponse.get(); + assertTrue(resp.isAcknowledged()); + assertThat(docIndexResponse.get(), instanceOf(IndexResponse.class)); + IndexResponse docResp = (IndexResponse) docIndexResponse.get(); + assertEquals(Arrays.toString(docResp.getShardInfo().getFailures()), + 2, docResp.getShardInfo().getTotal()); // both shards should have succeeded }); } diff --git a/core/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java b/core/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java index f8cf9bd7a3c92..b0179a675dd93 100644 --- a/core/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java +++ b/core/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java @@ -269,12 +269,7 @@ public void testQueryCache() throws Exception { } indexRandom(true, builders); refresh(); - assertBusy(new Runnable() { - @Override - public void run() { - assertThat(client().admin().indices().prepareStats("idx").setRequestCache(true).get().getTotal().getRequestCache().getMemorySizeInBytes(), equalTo(0L)); - } - }); + assertBusy(() -> assertThat(client().admin().indices().prepareStats("idx").setRequestCache(true).get().getTotal().getRequestCache().getMemorySizeInBytes(), equalTo(0L))); for (int i = 0; i < 10; i++) { assertThat(client().prepareSearch("idx").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0).get().getHits().getTotalHits(), equalTo((long) numDocs)); diff --git a/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java b/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java index 4e1be614fa5d6..7f6155979c916 100644 --- a/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java +++ b/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java @@ -21,12 +21,10 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskListener; -import org.elasticsearch.cluster.LocalClusterUpdateTask; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; @@ -40,7 +38,6 @@ import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.env.Environment; @@ -378,12 +375,7 @@ public void testShardActiveElsewhereDoesNotDeleteAnother() throws Exception { // allocation filtering may not have immediate effect // TODO: we should add an easier to do this. It's too much of a song and dance.. Index index = resolveIndex("test"); - assertBusy(new Runnable() { - @Override - public void run() { - assertTrue(internalCluster().getInstance(IndicesService.class, node4).hasIndex(index)); - } - }); + assertBusy(() -> assertTrue(internalCluster().getInstance(IndicesService.class, node4).hasIndex(index))); // wait for 4 active shards - we should have lost one shard assertFalse(client().admin().cluster().prepareHealth().setWaitForActiveShards(4).get().isTimedOut()); diff --git a/core/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java b/core/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java index 57249e186db35..e1a7a07448f1b 100644 --- a/core/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java @@ -341,12 +341,9 @@ private void logSearchResponse(int numberOfShards, long numberOfDocs, int iterat } private void refreshAndAssert() throws Exception { - assertBusy(new Runnable() { - @Override - public void run() { - RefreshResponse actionGet = client().admin().indices().prepareRefresh().get(); - assertAllSuccessful(actionGet); - } + assertBusy(() -> { + RefreshResponse actionGet = client().admin().indices().prepareRefresh().get(); + assertAllSuccessful(actionGet); }, 5, TimeUnit.MINUTES); } } diff --git a/core/src/test/java/org/elasticsearch/search/child/ParentFieldLoadingIT.java b/core/src/test/java/org/elasticsearch/search/child/ParentFieldLoadingIT.java index d3a8946571bfb..45956deefd36f 100644 --- a/core/src/test/java/org/elasticsearch/search/child/ParentFieldLoadingIT.java +++ b/core/src/test/java/org/elasticsearch/search/child/ParentFieldLoadingIT.java @@ -30,9 +30,9 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.MergePolicyConfig; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.index.MergePolicyConfig; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; @@ -132,25 +132,22 @@ public void testChangingEagerParentFieldLoadingAtRuntime() throws Exception { .get(); assertAcked(putMappingResponse); Index test = resolveIndex("test"); - assertBusy(new Runnable() { - @Override - public void run() { - ClusterState clusterState = internalCluster().clusterService().state(); - ShardRouting shardRouting = clusterState.routingTable().index("test").shard(0).getShards().get(0); - String nodeName = clusterState.getNodes().get(shardRouting.currentNodeId()).getName(); - - boolean verified = false; - IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName); - IndexService indexService = indicesService.indexService(test); - if (indexService != null) { - MapperService mapperService = indexService.mapperService(); - DocumentMapper documentMapper = mapperService.documentMapper("child"); - if (documentMapper != null) { - verified = documentMapper.parentFieldMapper().fieldType().eagerGlobalOrdinals(); - } + assertBusy(() -> { + ClusterState clusterState = internalCluster().clusterService().state(); + ShardRouting shardRouting = clusterState.routingTable().index("test").shard(0).getShards().get(0); + String nodeName = clusterState.getNodes().get(shardRouting.currentNodeId()).getName(); + + boolean verified = false; + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName); + IndexService indexService = indicesService.indexService(test); + if (indexService != null) { + MapperService mapperService = indexService.mapperService(); + DocumentMapper documentMapper = mapperService.documentMapper("child"); + if (documentMapper != null) { + verified = documentMapper.parentFieldMapper().fieldType().eagerGlobalOrdinals(); } - assertTrue(verified); } + assertTrue(verified); }); // Need to add a new doc otherwise the refresh doesn't trigger a new searcher diff --git a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index 76a7bcc1a8faa..98def27ff19c3 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -208,19 +208,16 @@ public void testRestoreCustomMetadata() throws Exception { Client client = client(); createIndex("test-idx"); logger.info("--> add custom persistent metadata"); - updateClusterState(new ClusterStateUpdater() { - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - ClusterState.Builder builder = ClusterState.builder(currentState); - MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData()); - metadataBuilder.putCustom(SnapshottableMetadata.TYPE, new SnapshottableMetadata("before_snapshot_s")); - metadataBuilder.putCustom(NonSnapshottableMetadata.TYPE, new NonSnapshottableMetadata("before_snapshot_ns")); - metadataBuilder.putCustom(SnapshottableGatewayMetadata.TYPE, new SnapshottableGatewayMetadata("before_snapshot_s_gw")); - metadataBuilder.putCustom(NonSnapshottableGatewayMetadata.TYPE, new NonSnapshottableGatewayMetadata("before_snapshot_ns_gw")); - metadataBuilder.putCustom(SnapshotableGatewayNoApiMetadata.TYPE, new SnapshotableGatewayNoApiMetadata("before_snapshot_s_gw_noapi")); - builder.metaData(metadataBuilder); - return builder.build(); - } + updateClusterState(currentState -> { + ClusterState.Builder builder = ClusterState.builder(currentState); + MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData()); + metadataBuilder.putCustom(SnapshottableMetadata.TYPE, new SnapshottableMetadata("before_snapshot_s")); + metadataBuilder.putCustom(NonSnapshottableMetadata.TYPE, new NonSnapshottableMetadata("before_snapshot_ns")); + metadataBuilder.putCustom(SnapshottableGatewayMetadata.TYPE, new SnapshottableGatewayMetadata("before_snapshot_s_gw")); + metadataBuilder.putCustom(NonSnapshottableGatewayMetadata.TYPE, new NonSnapshottableGatewayMetadata("before_snapshot_ns_gw")); + metadataBuilder.putCustom(SnapshotableGatewayNoApiMetadata.TYPE, new SnapshotableGatewayNoApiMetadata("before_snapshot_s_gw_noapi")); + builder.metaData(metadataBuilder); + return builder.build(); }); logger.info("--> create repository"); @@ -235,27 +232,24 @@ public ClusterState execute(ClusterState currentState) throws Exception { assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").execute().actionGet().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS)); logger.info("--> change custom persistent metadata"); - updateClusterState(new ClusterStateUpdater() { - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - ClusterState.Builder builder = ClusterState.builder(currentState); - MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData()); - if (randomBoolean()) { - metadataBuilder.putCustom(SnapshottableMetadata.TYPE, new SnapshottableMetadata("after_snapshot_s")); - } else { - metadataBuilder.removeCustom(SnapshottableMetadata.TYPE); - } - metadataBuilder.putCustom(NonSnapshottableMetadata.TYPE, new NonSnapshottableMetadata("after_snapshot_ns")); - if (randomBoolean()) { - metadataBuilder.putCustom(SnapshottableGatewayMetadata.TYPE, new SnapshottableGatewayMetadata("after_snapshot_s_gw")); - } else { - metadataBuilder.removeCustom(SnapshottableGatewayMetadata.TYPE); - } - metadataBuilder.putCustom(NonSnapshottableGatewayMetadata.TYPE, new NonSnapshottableGatewayMetadata("after_snapshot_ns_gw")); - metadataBuilder.removeCustom(SnapshotableGatewayNoApiMetadata.TYPE); - builder.metaData(metadataBuilder); - return builder.build(); + updateClusterState(currentState -> { + ClusterState.Builder builder = ClusterState.builder(currentState); + MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData()); + if (randomBoolean()) { + metadataBuilder.putCustom(SnapshottableMetadata.TYPE, new SnapshottableMetadata("after_snapshot_s")); + } else { + metadataBuilder.removeCustom(SnapshottableMetadata.TYPE); + } + metadataBuilder.putCustom(NonSnapshottableMetadata.TYPE, new NonSnapshottableMetadata("after_snapshot_ns")); + if (randomBoolean()) { + metadataBuilder.putCustom(SnapshottableGatewayMetadata.TYPE, new SnapshottableGatewayMetadata("after_snapshot_s_gw")); + } else { + metadataBuilder.removeCustom(SnapshottableGatewayMetadata.TYPE); } + metadataBuilder.putCustom(NonSnapshottableGatewayMetadata.TYPE, new NonSnapshottableGatewayMetadata("after_snapshot_ns_gw")); + metadataBuilder.removeCustom(SnapshotableGatewayNoApiMetadata.TYPE); + builder.metaData(metadataBuilder); + return builder.build(); }); logger.info("--> delete repository"); @@ -510,15 +504,12 @@ public void testRestoreIndexWithMissingShards() throws Exception { client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-2") .setIndices("test-idx-all", "test-idx-none", "test-idx-some") .setWaitForCompletion(false).setPartial(true).execute().actionGet(); - assertBusy(new Runnable() { - @Override - public void run() { - SnapshotsStatusResponse snapshotsStatusResponse = client().admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-snap-2").get(); - List snapshotStatuses = snapshotsStatusResponse.getSnapshots(); - assertEquals(snapshotStatuses.size(), 1); - logger.trace("current snapshot status [{}]", snapshotStatuses.get(0)); - assertTrue(snapshotStatuses.get(0).getState().completed()); - } + assertBusy(() -> { + SnapshotsStatusResponse snapshotsStatusResponse = client().admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-snap-2").get(); + List snapshotStatuses = snapshotsStatusResponse.getSnapshots(); + assertEquals(snapshotStatuses.size(), 1); + logger.trace("current snapshot status [{}]", snapshotStatuses.get(0)); + assertTrue(snapshotStatuses.get(0).getState().completed()); }, 1, TimeUnit.MINUTES); SnapshotsStatusResponse snapshotsStatusResponse = client().admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-snap-2").get(); List snapshotStatuses = snapshotsStatusResponse.getSnapshots(); @@ -531,15 +522,12 @@ public void run() { // There is slight delay between snapshot being marked as completed in the cluster state and on the file system // After it was marked as completed in the cluster state - we need to check if it's completed on the file system as well - assertBusy(new Runnable() { - @Override - public void run() { - GetSnapshotsResponse response = client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap-2").get(); - assertThat(response.getSnapshots().size(), equalTo(1)); - SnapshotInfo snapshotInfo = response.getSnapshots().get(0); - assertTrue(snapshotInfo.state().completed()); - assertEquals(SnapshotState.PARTIAL, snapshotInfo.state()); - } + assertBusy(() -> { + GetSnapshotsResponse response = client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap-2").get(); + assertThat(response.getSnapshots().size(), equalTo(1)); + SnapshotInfo snapshotInfo = response.getSnapshots().get(0); + assertTrue(snapshotInfo.state().completed()); + assertEquals(SnapshotState.PARTIAL, snapshotInfo.state()); }, 1, TimeUnit.MINUTES); } else { logger.info("checking snapshot completion using wait_for_completion flag"); diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java index 8bfdbb739d744..0cd567dd145e6 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java @@ -82,12 +82,9 @@ public void testScheduledPing() throws Exception { serviceA.connectToNode(nodeB); serviceB.connectToNode(nodeA); - assertBusy(new Runnable() { - @Override - public void run() { - assertThat(nettyA.getPing().getSuccessfulPings(), greaterThan(100L)); - assertThat(nettyB.getPing().getSuccessfulPings(), greaterThan(100L)); - } + assertBusy(() -> { + assertThat(nettyA.getPing().getSuccessfulPings(), greaterThan(100L)); + assertThat(nettyB.getPing().getSuccessfulPings(), greaterThan(100L)); }); assertThat(nettyA.getPing().getFailedPings(), equalTo(0L)); assertThat(nettyB.getPing().getFailedPings(), equalTo(0L)); diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index 0d3e8131ab25c..39dde6c14552e 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -29,7 +29,6 @@ import com.carrotsearch.randomizedtesting.generators.RandomPicks; import com.carrotsearch.randomizedtesting.generators.RandomStrings; import com.carrotsearch.randomizedtesting.rules.TestRuleAdapter; - import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -48,6 +47,7 @@ import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.common.io.PathUtilsForTesting; @@ -692,14 +692,14 @@ public static T randomValueOtherThanMany(Predicate input, Supplier ran /** * Runs the code block for 10 seconds waiting for no assertion to trip. */ - public static void assertBusy(Runnable codeBlock) throws Exception { + public static void assertBusy(CheckedRunnable codeBlock) throws Exception { assertBusy(codeBlock, 10, TimeUnit.SECONDS); } /** * Runs the code block for the provided interval, waiting for no assertions to trip. */ - public static void assertBusy(Runnable codeBlock, long maxWaitTime, TimeUnit unit) throws Exception { + public static void assertBusy(CheckedRunnable codeBlock, long maxWaitTime, TimeUnit unit) throws Exception { long maxTimeInMillis = TimeUnit.MILLISECONDS.convert(maxWaitTime, unit); long iterations = Math.max(Math.round(Math.log10(maxTimeInMillis) / Math.log10(2)), 1); long timeInMillis = 1; diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index c4e191e75c590..b70638f96ed68 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -2016,12 +2016,9 @@ public void ensureEstimatedStats() { // in an assertBusy loop, so it will try for 10 seconds and // fail if it never reached 0 try { - assertBusy(new Runnable() { - @Override - public void run() { - CircuitBreaker reqBreaker = breakerService.getBreaker(CircuitBreaker.REQUEST); - assertThat("Request breaker not reset to 0 on node: " + name, reqBreaker.getUsed(), equalTo(0L)); - } + assertBusy(() -> { + CircuitBreaker reqBreaker = breakerService.getBreaker(CircuitBreaker.REQUEST); + assertThat("Request breaker not reset to 0 on node: " + name, reqBreaker.getUsed(), equalTo(0L)); }); } catch (Exception e) { fail("Exception during check for request breaker reset to 0: " + e);