From b09a839f5f09a61dd0e81a3c8015bb8481393bc0 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 11 Jun 2018 14:11:22 +0200 Subject: [PATCH 1/2] [CCR] Made shard follow task more resilient against node failure and added a test that verifies we can close a node while following an index. --- .../ccr/action/ShardFollowTasksExecutor.java | 126 +++++++++++---- .../xpack/ccr/ShardChangesIT.java | 146 +++++++++++++----- .../ccr/action/ChunksCoordinatorTests.java | 45 +++--- 3 files changed, 231 insertions(+), 86 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index f022ced4fc342..5626d3bb6d85d 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -10,11 +10,13 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; -import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.NoShardAvailableActionException; +import org.elasticsearch.action.UnavailableShardsException; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.action.admin.indices.stats.ShardStats; @@ -31,17 +33,19 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.CountDown; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.ActionTransportException; import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest; @@ -58,6 +62,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; +import java.util.function.BooleanSupplier; import java.util.function.Consumer; import java.util.function.LongConsumer; import java.util.function.Supplier; @@ -141,7 +146,8 @@ void prepare(Client leaderClient, Client followerClient, ShardFollowNodeTask tas // TODO: check if both indices have the same history uuid if (leaderGlobalCheckPoint == followGlobalCheckPoint) { logger.debug("{} no write operations to fetch", followerShard); - retry(leaderClient, followerClient, task, params, followGlobalCheckPoint, imdVersionChecker); + retry(() -> prepare(leaderClient, followerClient, task, params, followGlobalCheckPoint, imdVersionChecker), + task::markAsFailed); } else { assert followGlobalCheckPoint < leaderGlobalCheckPoint : "followGlobalCheckPoint [" + followGlobalCheckPoint + "] is not below leaderGlobalCheckPoint [" + leaderGlobalCheckPoint + "]"; @@ -156,34 +162,47 @@ void prepare(Client leaderClient, Client followerClient, ShardFollowNodeTask tas task.markAsFailed(e); } }; - ChunksCoordinator coordinator = new ChunksCoordinator(followerClient, leaderClient, ccrExecutor, imdVersionChecker, - params.getMaxChunkSize(), params.getNumConcurrentChunks(), params.getProcessorMaxTranslogBytes(), leaderShard, - followerShard, handler); + Consumer scheduler = scheduleTask -> retry(scheduleTask, handler); + ChunksCoordinator coordinator = new ChunksCoordinator(followerClient, leaderClient, scheduler, ccrExecutor, + imdVersionChecker, params.getMaxChunkSize(), params.getNumConcurrentChunks(), + params.getProcessorMaxTranslogBytes(), leaderShard, followerShard, handler, task::isRunning); coordinator.createChucks(followGlobalCheckPoint, leaderGlobalCheckPoint); coordinator.start(); } }, task::markAsFailed); } - private void retry(Client leaderClient, Client followerClient, ShardFollowNodeTask task, ShardFollowTask params, - long followGlobalCheckPoint, - IndexMetadataVersionChecker imdVersionChecker) { + private void retry(Runnable task, Consumer errorHandler) { threadPool.schedule(RETRY_TIMEOUT, Ccr.CCR_THREAD_POOL_NAME, new AbstractRunnable() { @Override public void onFailure(Exception e) { - task.markAsFailed(e); + errorHandler.accept(e); } @Override protected void doRun() throws Exception { - prepare(leaderClient, followerClient, task, params, followGlobalCheckPoint, imdVersionChecker); + task.run(); } }); } private void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer handler, Consumer errorHandler) { + fetchGlobalCheckpoint(client, shardId, handler, errorHandler, 0); + } + + private void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer handler, Consumer errorHandler, + int attempt) { client.admin().indices().stats(new IndicesStatsRequest().indices(shardId.getIndexName()), ActionListener.wrap(r -> { IndexStats indexStats = r.getIndex(shardId.getIndexName()); + if (indexStats == null) { + if (attempt <= 5) { + retry(() -> fetchGlobalCheckpoint(client, shardId, handler, errorHandler, attempt + 1), errorHandler); + } else { + errorHandler.accept(new IllegalArgumentException("Cannot find shard stats for shard " + shardId)); + } + return; + } + Optional filteredShardStats = Arrays.stream(indexStats.getShards()) .filter(shardStats -> shardStats.getShardRouting().shardId().equals(shardId)) .filter(shardStats -> shardStats.getShardRouting().primary()) @@ -193,7 +212,11 @@ private void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer final long globalCheckPoint = filteredShardStats.get().getSeqNoStats().getGlobalCheckpoint(); handler.accept(globalCheckPoint); } else { - errorHandler.accept(new IllegalArgumentException("Cannot find shard stats for shard " + shardId)); + if (attempt <= 5) { + retry(() -> fetchGlobalCheckpoint(client, shardId, handler, errorHandler, attempt + 1), errorHandler); + } else { + errorHandler.accept(new IllegalArgumentException("Cannot find shard stats for shard " + shardId)); + } } }, errorHandler)); } @@ -213,16 +236,28 @@ static class ChunksCoordinator { private final ShardId leaderShard; private final ShardId followerShard; private final Consumer handler; + private final BooleanSupplier isRunning; + private final Consumer scheduler; private final CountDown countDown; private final Queue chunks = new ConcurrentLinkedQueue<>(); private final AtomicReference failureHolder = new AtomicReference<>(); - ChunksCoordinator(Client followerClient, Client leaderClient, Executor ccrExecutor, IndexMetadataVersionChecker imdVersionChecker, - long batchSize, int concurrentProcessors, long processorMaxTranslogBytes, ShardId leaderShard, - ShardId followerShard, Consumer handler) { + ChunksCoordinator(Client followerClient, + Client leaderClient, + Consumer scheduler, + Executor ccrExecutor, + IndexMetadataVersionChecker imdVersionChecker, + long batchSize, + int concurrentProcessors, + long processorMaxTranslogBytes, + ShardId leaderShard, + ShardId followerShard, + Consumer handler, + BooleanSupplier isRunning) { this.followerClient = followerClient; this.leaderClient = leaderClient; + this.scheduler = scheduler; this.ccrExecutor = ccrExecutor; this.imdVersionChecker = imdVersionChecker; this.batchSize = batchSize; @@ -231,6 +266,7 @@ static class ChunksCoordinator { this.leaderShard = leaderShard; this.followerShard = followerShard; this.handler = handler; + this.isRunning = isRunning; this.countDown = new CountDown(concurrentProcessors); } @@ -285,8 +321,8 @@ void processNextChunk() { postProcessChuck(e); } }; - ChunkProcessor processor = new ChunkProcessor(leaderClient, followerClient, chunks, ccrExecutor, imdVersionChecker, - leaderShard, followerShard, processorHandler); + ChunkProcessor processor = new ChunkProcessor(leaderClient, followerClient, scheduler, chunks, ccrExecutor, + imdVersionChecker, leaderShard, followerShard, processorHandler, isRunning); processor.start(chunk[0], chunk[1], processorMaxTranslogBytes); } @@ -313,23 +349,34 @@ static class ChunkProcessor { private final Queue chunks; private final Executor ccrExecutor; private final BiConsumer> indexVersionChecker; + private final BooleanSupplier isRunning; + private final Consumer scheduler; private final ShardId leaderShard; private final ShardId followerShard; private final Consumer handler; final AtomicInteger retryCounter = new AtomicInteger(0); - ChunkProcessor(Client leaderClient, Client followerClient, Queue chunks, Executor ccrExecutor, + ChunkProcessor(Client leaderClient, + Client followerClient, + Consumer scheduler, + Queue chunks, + Executor ccrExecutor, BiConsumer> indexVersionChecker, - ShardId leaderShard, ShardId followerShard, Consumer handler) { + ShardId leaderShard, + ShardId followerShard, + Consumer handler, + BooleanSupplier isRunning) { this.leaderClient = leaderClient; this.followerClient = followerClient; + this.scheduler = scheduler; this.chunks = chunks; this.ccrExecutor = ccrExecutor; this.indexVersionChecker = indexVersionChecker; this.leaderShard = leaderShard; this.followerShard = followerShard; this.handler = handler; + this.isRunning = isRunning; } void start(final long from, final long to, final long maxTranslogsBytes) { @@ -349,8 +396,8 @@ public void onResponse(ShardChangesAction.Response response) { public void onFailure(Exception e) { assert e != null; if (shouldRetry(e)) { - if (retryCounter.incrementAndGet() <= PROCESSOR_RETRY_LIMIT) { - start(from, to, maxTranslogsBytes); + if (canRetry()) { + scheduler.accept(() -> start(from, to, maxTranslogsBytes)); } else { handler.accept(new ElasticsearchException("retrying failed [" + retryCounter.get() + "] times, aborting...", e)); @@ -382,11 +429,15 @@ public void onFailure(Exception e) { protected void doRun() throws Exception { indexVersionChecker.accept(response.getIndexMetadataVersion(), e -> { if (e != null) { - if (shouldRetry(e) && retryCounter.incrementAndGet() <= PROCESSOR_RETRY_LIMIT) { - handleResponse(to, response); - } else { - handler.accept(new ElasticsearchException("retrying failed [" + retryCounter.get() + + if (shouldRetry(e)) { + if (canRetry()) { + scheduler.accept(() -> handleResponse(to, response)); + } else { + handler.accept(new ElasticsearchException("retrying failed [" + retryCounter.get() + "] times, aborting...", e)); + } + } else { + handler.accept(e); } return; } @@ -400,10 +451,17 @@ public void onResponse(final BulkShardOperationsResponse bulkShardOperationsResp @Override public void onFailure(final Exception e) { - // No retry mechanism here, because if a failure is being redirected to this place it is considered - // non recoverable. assert e != null; - handler.accept(e); + if (shouldRetry(e)) { + if (canRetry()) { + scheduler.accept(() -> handleResponse(to, response)); + } else { + handler.accept(new ElasticsearchException("retrying failed [" + retryCounter.get() + + "] times, aborting...", e)); + } + } else { + handler.accept(e); + } } } ); @@ -415,7 +473,15 @@ public void onFailure(final Exception e) { boolean shouldRetry(Exception e) { // TODO: What other exceptions should be retried? return NetworkExceptionHelper.isConnectException(e) || - NetworkExceptionHelper.isCloseConnectionException(e); + NetworkExceptionHelper.isCloseConnectionException(e) || + e instanceof ActionTransportException || + e instanceof NodeClosedException || + e instanceof UnavailableShardsException || + e instanceof NoShardAvailableActionException; + } + + boolean canRetry() { + return isRunning.getAsBoolean() && retryCounter.incrementAndGet() <= PROCESSOR_RETRY_LIMIT; } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java index ba9855b58736e..675e6e4704c33 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java @@ -10,12 +10,15 @@ import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.analysis.common.CommonAnalysisPlugin; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.support.XContentMapValues; @@ -24,6 +27,7 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.MockHttpTransport; @@ -37,6 +41,7 @@ import org.elasticsearch.xpack.core.XPackSettings; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -44,8 +49,11 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.Objects; +import static java.util.Collections.singletonMap; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; @@ -147,9 +155,9 @@ public void testGetOperationsBasedOnGlobalSequenceId() throws Exception { public void testFollowIndex() throws Exception { final int numberOfPrimaryShards = randomIntBetween(1, 3); final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, - Collections.singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); - ensureGreen("index1"); + ensureYellow("index1"); final FollowIndexAction.Request followRequest = new FollowIndexAction.Request(); followRequest.setLeaderIndex("index1"); @@ -209,10 +217,9 @@ public void testFollowIndex() throws Exception { public void testSyncMappings() throws Exception { final String leaderIndexSettings = getIndexSettings(2, - Collections.singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); - ensureGreen("index1"); - + ensureYellow("index1"); final FollowIndexAction.Request followRequest = new FollowIndexAction.Request(); followRequest.setLeaderIndex("index1"); followRequest.setFollowIndex("index2"); @@ -245,39 +252,95 @@ public void testSyncMappings() throws Exception { .get("index2").get("doc"); assertThat(XContentMapValues.extractValue("properties.f.type", mappingMetaData.sourceAsMap()), equalTo("integer")); assertThat(XContentMapValues.extractValue("properties.k.type", mappingMetaData.sourceAsMap()), equalTo("long")); + unfollowIndex("index2"); + } - final UnfollowIndexAction.Request unfollowRequest = new UnfollowIndexAction.Request(); - unfollowRequest.setFollowIndex("index2"); - client().execute(UnfollowIndexAction.INSTANCE, unfollowRequest).get(); + public void testFollowIndexAndCloseNode() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(3); + String leaderIndexSettings = getIndexSettings(3, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); - assertBusy(() -> { - final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - assertThat(tasks.tasks().size(), equalTo(0)); + String followerIndexSettings = getIndexSettings(3, singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true")); + assertAcked(client().admin().indices().prepareCreate("index2").setSource(followerIndexSettings, XContentType.JSON)); + ensureGreen("index1", "index2"); - ListTasksRequest listTasksRequest = new ListTasksRequest(); - listTasksRequest.setDetailed(true); - ListTasksResponse listTasksResponse = client().admin().cluster().listTasks(listTasksRequest).get(); - int numNodeTasks = 0; - for (TaskInfo taskInfo : listTasksResponse.getTasks()) { - if (taskInfo.getAction().startsWith(ListTasksAction.NAME) == false) { - numNodeTasks++; + AtomicBoolean run = new AtomicBoolean(true); + Thread thread = new Thread(() -> { + int counter = 0; + while (run.get()) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", counter++); + try { + client().prepareIndex("index1", "doc") + .setSource(source, XContentType.JSON) + .setTimeout(TimeValue.timeValueSeconds(1)) + .get(); + } catch (Exception e) { + logger.error("Error while indexing into leader index", e); + } + } + }); + thread.start(); + + final FollowIndexAction.Request followRequest = new FollowIndexAction.Request(); + followRequest.setLeaderIndex("index1"); + followRequest.setFollowIndex("index2"); + followRequest.setBatchSize(randomIntBetween(32, 2048)); + followRequest.setConcurrentProcessors(randomIntBetween(2, 10)); + client().execute(FollowIndexAction.INSTANCE, followRequest).get(); + + long maxNumDocsReplicated = Math.min(3000, randomLongBetween(followRequest.getBatchSize(), followRequest.getBatchSize() * 10)); + long minNumDocsReplicated = maxNumDocsReplicated / 3L; + logger.info("waiting for at least [{}] documents to be indexed and then stop a random data node", minNumDocsReplicated); + awaitBusy(() -> { + SearchRequest request = new SearchRequest("index2"); + request.source(new SearchSourceBuilder().size(0)); + SearchResponse response = client().search(request).actionGet(); + if (response.getHits().getTotalHits() >= minNumDocsReplicated) { + try { + internalCluster().stopRandomNonMasterNode(); + } catch (IOException e) { + throw new UncheckedIOException(e); } + return true; + } else { + return false; } - assertThat(numNodeTasks, equalTo(0)); + }, 30, TimeUnit.SECONDS); + + logger.info("waiting for at least [{}] documents to be indexed", maxNumDocsReplicated); + awaitBusy(() -> { + SearchRequest request = new SearchRequest("index2"); + request.source(new SearchSourceBuilder().size(0)); + SearchResponse response = client().search(request).actionGet(); + return response.getHits().getTotalHits() >= maxNumDocsReplicated; + }, 30, TimeUnit.SECONDS); + run.set(false); + thread.join(); + + refresh("index1"); + SearchRequest request1 = new SearchRequest("index1"); + request1.source(new SearchSourceBuilder().size(0)); + SearchResponse response1 = client().search(request1).actionGet(); + assertBusy(() -> { + refresh("index2"); + SearchRequest request2 = new SearchRequest("index2"); + request2.source(new SearchSourceBuilder().size(0)); + SearchResponse response2 = client().search(request2).actionGet(); + assertThat(response2.getHits().getTotalHits(), equalTo(response1.getHits().getTotalHits())); }); + unfollowIndex("index2"); } public void testFollowIndexWithNestedField() throws Exception { final String leaderIndexSettings = - getIndexSettingsWithNestedMapping(1, Collections.singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + getIndexSettingsWithNestedMapping(1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); final String followerIndexSettings = - getIndexSettingsWithNestedMapping(1, Collections.singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true")); + getIndexSettingsWithNestedMapping(1, singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true")); assertAcked(client().admin().indices().prepareCreate("index2").setSource(followerIndexSettings, XContentType.JSON)); - ensureGreen("index1", "index2"); + ensureYellow("index1", "index2"); final FollowIndexAction.Request followRequest = new FollowIndexAction.Request(); followRequest.setLeaderIndex("index1"); @@ -387,22 +450,28 @@ private void unfollowIndex(String index) throws Exception { final UnfollowIndexAction.Request unfollowRequest = new UnfollowIndexAction.Request(); unfollowRequest.setFollowIndex(index); client().execute(UnfollowIndexAction.INSTANCE, unfollowRequest).get(); - assertBusy(() -> { - final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - assertThat(tasks.tasks().size(), equalTo(0)); - - ListTasksRequest listTasksRequest = new ListTasksRequest(); - listTasksRequest.setDetailed(true); - ListTasksResponse listTasksResponse = client().admin().cluster().listTasks(listTasksRequest).get(); - int numNodeTasks = 0; - for (TaskInfo taskInfo : listTasksResponse.getTasks()) { - if (taskInfo.getAction().startsWith(ListTasksAction.NAME) == false) { - numNodeTasks++; + ListTasksResponse[] holder = new ListTasksResponse[1]; + try { + assertBusy(() -> { + final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + assertThat(tasks.tasks().size(), equalTo(0)); + + ListTasksRequest listTasksRequest = new ListTasksRequest(); + listTasksRequest.setDetailed(true); + ListTasksResponse listTasksResponse = holder[0] = client().admin().cluster().listTasks(listTasksRequest).get(); + int numNodeTasks = 0; + for (TaskInfo taskInfo : listTasksResponse.getTasks()) { + if (taskInfo.getAction().startsWith(ListTasksAction.NAME) == false) { + numNodeTasks++; + } } - } - assertThat(numNodeTasks, equalTo(0)); - }); + assertThat(numNodeTasks, equalTo(0)); + }); + } catch (AssertionError ae) { + logger.error("List tasks response contains unexpected tasks: {}", holder[0]); + throw ae; + } } private CheckedRunnable assertExpectedDocumentRunnable(final int value) { @@ -422,6 +491,7 @@ private String getIndexSettings(final int numberOfPrimaryShards, final Map additionalSetting : additionalIndexSettings.entrySet()) { builder.field(additionalSetting.getKey(), additionalSetting.getValue()); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java index 9af0d93e9e2bc..0c69a0b92fd2a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java @@ -56,8 +56,8 @@ public void testCreateChunks() { IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), followShardId.getIndex(), client, client); - ChunksCoordinator coordinator = new ChunksCoordinator(client, client, ccrExecutor, checker, 1024, 1, - Long.MAX_VALUE, leaderShardId, followShardId, e -> {}); + ChunksCoordinator coordinator = new ChunksCoordinator(client, client, Runnable::run, ccrExecutor, checker, 1024, 1, + Long.MAX_VALUE, leaderShardId, followShardId, e -> {}, () -> true); coordinator.createChucks(0, 1023); List result = new ArrayList<>(coordinator.getChunks()); assertThat(result.size(), equalTo(1)); @@ -116,8 +116,8 @@ public void testCoordinator() throws Exception { int batchSize = randomIntBetween(1, 1000); IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), followShardId.getIndex(), client, client); - ChunksCoordinator coordinator = new ChunksCoordinator(client, client, ccrExecutor, checker, batchSize, - concurrentProcessors, Long.MAX_VALUE, leaderShardId, followShardId, handler); + ChunksCoordinator coordinator = new ChunksCoordinator(client, client, Runnable::run, ccrExecutor, checker, batchSize, + concurrentProcessors, Long.MAX_VALUE, leaderShardId, followShardId, handler, () -> true); int numberOfOps = randomIntBetween(batchSize, batchSize * 20); long from = randomInt(1000); @@ -161,8 +161,8 @@ public void testCoordinator_failure() throws Exception { }; IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), followShardId.getIndex(), client, client); - ChunksCoordinator coordinator = new ChunksCoordinator(client, client, ccrExecutor, checker, 10, 1, Long.MAX_VALUE, - leaderShardId, followShardId, handler); + ChunksCoordinator coordinator = new ChunksCoordinator(client, client, Runnable::run, ccrExecutor, checker, 10, 1, + Long.MAX_VALUE, leaderShardId, followShardId, handler, () -> true); coordinator.createChucks(0, 19); assertThat(coordinator.getChunks().size(), equalTo(2)); @@ -194,8 +194,8 @@ public void testCoordinator_concurrent() throws Exception { }; IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), followShardId.getIndex(), client, client); - ChunksCoordinator coordinator = new ChunksCoordinator(client, client, ccrExecutor, checker, 1000, 4, Long.MAX_VALUE, - leaderShardId, followShardId, handler); + ChunksCoordinator coordinator = new ChunksCoordinator(client, client, Runnable::run, ccrExecutor, checker, 1000, + 4, Long.MAX_VALUE, leaderShardId, followShardId, handler, () -> true); coordinator.createChucks(0, 999999); assertThat(coordinator.getChunks().size(), equalTo(1000)); @@ -221,8 +221,8 @@ public void testChunkProcessor() { boolean[] invoked = new boolean[1]; Exception[] exception = new Exception[1]; Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; - ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, checker, leaderShardId, - followShardId, handler); + ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, Runnable::run, chunks, ccrExecutor, checker, + leaderShardId, followShardId, handler, () -> true); chunkProcessor.start(0, 10, Long.MAX_VALUE); assertThat(invoked[0], is(true)); assertThat(exception[0], nullValue()); @@ -244,8 +244,8 @@ public void testChunkProcessorRetry() { boolean[] invoked = new boolean[1]; Exception[] exception = new Exception[1]; Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; - ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, checker, leaderShardId, - followShardId, handler); + ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, Runnable::run, chunks, ccrExecutor, checker, + leaderShardId, followShardId, handler, () -> true); chunkProcessor.start(0, 10, Long.MAX_VALUE); assertThat(invoked[0], is(true)); assertThat(exception[0], nullValue()); @@ -268,14 +268,23 @@ public void testChunkProcessorRetryTooManyTimes() { boolean[] invoked = new boolean[1]; Exception[] exception = new Exception[1]; Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; - ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, checker, leaderShardId, - followShardId, handler); + ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, Runnable::run, chunks, ccrExecutor, checker, + leaderShardId, followShardId, handler, () -> true); chunkProcessor.start(0, 10, Long.MAX_VALUE); assertThat(invoked[0], is(true)); assertThat(exception[0], notNullValue()); assertThat(exception[0].getMessage(), equalTo("retrying failed [17] times, aborting...")); assertThat(exception[0].getCause().getMessage(), equalTo("connection exception")); assertThat(chunkProcessor.retryCounter.get(), equalTo(testRetryLimit)); + + chunkProcessor = new ChunkProcessor(client, client, Runnable::run, chunks, ccrExecutor, checker, + leaderShardId, followShardId, handler, () -> false); + chunkProcessor.start(0, 10, Long.MAX_VALUE); + assertThat(invoked[0], is(true)); + assertThat(exception[0], notNullValue()); + assertThat(exception[0].getMessage(), equalTo("retrying failed [0] times, aborting...")); + assertThat(exception[0].getCause().getMessage(), equalTo("connection exception")); + assertThat(chunkProcessor.retryCounter.get(), equalTo(0)); } public void testChunkProcessorNoneRetryableError() { @@ -293,8 +302,8 @@ public void testChunkProcessorNoneRetryableError() { boolean[] invoked = new boolean[1]; Exception[] exception = new Exception[1]; Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; - ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, checker, leaderShardId, - followShardId, handler); + ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, Runnable::run, chunks, ccrExecutor, checker, + leaderShardId, followShardId, handler, () -> true); chunkProcessor.start(0, 10, Long.MAX_VALUE); assertThat(invoked[0], is(true)); assertThat(exception[0], notNullValue()); @@ -333,8 +342,8 @@ public void testChunkProcessorExceedMaxTranslogsBytes() { Exception[] exception = new Exception[1]; Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; BiConsumer> versionChecker = (indexVersiuon, consumer) -> consumer.accept(null); - ChunkProcessor chunkProcessor = - new ChunkProcessor(client, client, chunks, ccrExecutor, versionChecker, leaderShardId, followShardId, handler); + ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, Runnable::run, chunks, ccrExecutor, + versionChecker, leaderShardId, followShardId, handler, () -> true); chunkProcessor.start(from, to, Long.MAX_VALUE); assertThat(invoked[0], is(true)); assertThat(exception[0], nullValue()); From 48e737136e512f924def4fe7e0b03863ab141bf8 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 12 Jun 2018 11:12:31 +0200 Subject: [PATCH 2/2] iter --- .../ccr/action/ShardFollowTasksExecutor.java | 63 ++++++++----------- 1 file changed, 25 insertions(+), 38 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 5626d3bb6d85d..66ff845870043 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -212,7 +212,7 @@ private void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer final long globalCheckPoint = filteredShardStats.get().getSeqNoStats().getGlobalCheckpoint(); handler.accept(globalCheckPoint); } else { - if (attempt <= 5) { + if (attempt <= PROCESSOR_RETRY_LIMIT) { retry(() -> fetchGlobalCheckpoint(client, shardId, handler, errorHandler, attempt + 1), errorHandler); } else { errorHandler.accept(new IllegalArgumentException("Cannot find shard stats for shard " + shardId)); @@ -344,6 +344,8 @@ Queue getChunks() { static class ChunkProcessor { + private static final Logger LOGGER = Loggers.getLogger(ChunkProcessor.class); + private final Client leaderClient; private final Client followerClient; private final Queue chunks; @@ -394,17 +396,7 @@ public void onResponse(ShardChangesAction.Response response) { @Override public void onFailure(Exception e) { - assert e != null; - if (shouldRetry(e)) { - if (canRetry()) { - scheduler.accept(() -> start(from, to, maxTranslogsBytes)); - } else { - handler.accept(new ElasticsearchException("retrying failed [" + retryCounter.get() + - "] times, aborting...", e)); - } - } else { - handler.accept(e); - } + retryOrFail(e, () -> start(from, to, maxTranslogsBytes)); } }); } @@ -429,39 +421,20 @@ public void onFailure(Exception e) { protected void doRun() throws Exception { indexVersionChecker.accept(response.getIndexMetadataVersion(), e -> { if (e != null) { - if (shouldRetry(e)) { - if (canRetry()) { - scheduler.accept(() -> handleResponse(to, response)); - } else { - handler.accept(new ElasticsearchException("retrying failed [" + retryCounter.get() + - "] times, aborting...", e)); - } - } else { - handler.accept(e); - } + retryOrFail(e, () -> handleResponse(to, response)); return; } final BulkShardOperationsRequest request = new BulkShardOperationsRequest(followerShard, response.getOperations()); followerClient.execute(BulkShardOperationsAction.INSTANCE, request, new ActionListener() { - @Override - public void onResponse(final BulkShardOperationsResponse bulkShardOperationsResponse) { - handler.accept(null); - } + @Override + public void onResponse(final BulkShardOperationsResponse bulkShardOperationsResponse) { + handler.accept(null); + } @Override public void onFailure(final Exception e) { - assert e != null; - if (shouldRetry(e)) { - if (canRetry()) { - scheduler.accept(() -> handleResponse(to, response)); - } else { - handler.accept(new ElasticsearchException("retrying failed [" + retryCounter.get() + - "] times, aborting...", e)); - } - } else { - handler.accept(e); - } + retryOrFail(e, () -> handleResponse(to, response)); } } ); @@ -470,6 +443,20 @@ public void onFailure(final Exception e) { }); } + void retryOrFail(Exception e, Runnable retryAction) { + assert e != null; + if (shouldRetry(e)) { + if (canRetry()) { + LOGGER.debug(() -> new ParameterizedMessage("{} Retrying [{}]...", leaderShard, retryCounter.get()), e); + scheduler.accept(retryAction); + } else { + handler.accept(new ElasticsearchException("retrying failed [" + retryCounter.get() + "] times, aborting...", e)); + } + } else { + handler.accept(e); + } + } + boolean shouldRetry(Exception e) { // TODO: What other exceptions should be retried? return NetworkExceptionHelper.isConnectException(e) || @@ -537,7 +524,7 @@ static final class IndexMetadataVersionChecker implements BiConsumer handler) { if (currentIndexMetadataVersion.get() >= minimumRequiredIndexMetadataVersion) { - LOGGER.debug("Current index metadata version [{}] is higher or equal than minimum required index metadata version [{}]", + LOGGER.trace("Current index metadata version [{}] is higher or equal than minimum required index metadata version [{}]", currentIndexMetadataVersion.get(), minimumRequiredIndexMetadataVersion); handler.accept(null); } else {