From fb5657d03c81d5b88c796597b44081455ee1e881 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 24 Sep 2018 11:45:23 +0200 Subject: [PATCH 1/3] [CCR] Adjust list retryable errors The following changes were made: * Added ElasticsearchSecurityException. For in the case the current user has insufficient privileges while an index is being followed. Prior to following ccr checks whether the current user has sufficient privileges and if not the follow api fails with an error. * Added Index block exception. If the leader index gets closed, this exception is returned. * Added ClusterBlockException service unavailable. In case for example the leader cluster is without elected master. * Removed IndexNotFoundException. If the leader / follower index has been deleted, ccr will need to stop the shard follow tasks with an error. Closes #33954 Depends on #33983 --- .../xpack/ccr/action/ShardFollowNodeTask.java | 31 ++++- .../xpack/ccr/ShardChangesIT.java | 123 +++++++++++++++++- 2 files changed, 147 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 777efdd654b4f..fd43e63db9f17 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -6,18 +6,25 @@ package org.elasticsearch.xpack.ccr.action; +import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.action.support.TransportActions; +import org.elasticsearch.action.NoShardAvailableActionException; +import org.elasticsearch.action.UnavailableShardsException; +import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.transport.NetworkExceptionHelper; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.IndexClosedException; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; @@ -47,7 +54,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { private static final int DELAY_MILLIS = 50; - private static final Logger LOGGER = Loggers.getLogger(ShardFollowNodeTask.class); + private static final Logger LOGGER = LogManager.getLogger(ShardFollowNodeTask.class); private final String leaderIndex; private final ShardFollowTask params; @@ -375,9 +382,21 @@ static long computeDelay(int currentRetry, long maxRetryDelayInMillis) { } private static boolean shouldRetry(Exception e) { - return NetworkExceptionHelper.isConnectException(e) || - NetworkExceptionHelper.isCloseConnectionException(e) || - TransportActions.isShardNotAvailableException(e); + if (NetworkExceptionHelper.isConnectException(e)) { + return true; + } else if (NetworkExceptionHelper.isCloseConnectionException(e)) { + return true; + } + + final Throwable actual = ExceptionsHelper.unwrapCause(e); + return actual instanceof ShardNotFoundException || + actual instanceof IllegalIndexShardStateException || + actual instanceof NoShardAvailableActionException || + actual instanceof UnavailableShardsException || + actual instanceof AlreadyClosedException || + actual instanceof ElasticsearchSecurityException || // If user does not have sufficient privileges + actual instanceof ClusterBlockException || // If leader index is closed or no elected master + actual instanceof IndexClosedException; // If follow index is closed } // These methods are protected for testing purposes: diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java index 78715654a05e3..20f23eefaaaea 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java @@ -6,10 +6,13 @@ package org.elasticsearch.xpack.ccr; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.bulk.BulkProcessor; @@ -46,6 +49,9 @@ import org.elasticsearch.xpack.ccr.action.ShardFollowTask; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; +import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; +import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction.StatsRequest; +import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction.StatsResponses; import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction; import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction; import org.elasticsearch.xpack.core.ccr.action.UnfollowIndexAction; @@ -66,7 +72,10 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -529,6 +538,114 @@ public void testAttemptToChangeCcrFollowingIndexSetting() throws Exception { "this setting is managed via a dedicated API")); } + public void testCloseLeaderIndex() throws Exception { + assertAcked(client().admin().indices().prepareCreate("index1") + .setSettings(Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .build())); + ensureGreen("index1"); + + final FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2"); + final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest); + client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get(); + + client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get(); + assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L))); + + client().admin().indices().close(new CloseIndexRequest("index1")).actionGet(); + assertBusy(() -> { + StatsResponses response = client().execute(CcrStatsAction.INSTANCE, new StatsRequest()).actionGet(); + assertThat(response.getNodeFailures(), empty()); + assertThat(response.getTaskFailures(), empty()); + assertThat(response.getStatsResponses(), hasSize(1)); + assertThat(response.getStatsResponses().get(0).status().numberOfFailedFetches(), greaterThanOrEqualTo(1L)); + assertThat(response.getStatsResponses().get(0).status().fetchExceptions().size(), equalTo(1)); + ElasticsearchException exception = response.getStatsResponses().get(0).status() + .fetchExceptions().entrySet().iterator().next().getValue().v2(); + assertThat(exception.getMessage(), equalTo("blocked by: [FORBIDDEN/4/index closed];")); + }); + + client().admin().indices().open(new OpenIndexRequest("index1")).actionGet(); + client().prepareIndex("index1", "doc", "2").setSource("{}", XContentType.JSON).get(); + assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(2L))); + + unfollowIndex("index2"); + } + + public void testCloseFollowIndex() throws Exception { + assertAcked(client().admin().indices().prepareCreate("index1") + .setSettings(Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .build())); + ensureGreen("index1"); + + final FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2"); + final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest); + client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get(); + + client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get(); + assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L))); + + client().admin().indices().close(new CloseIndexRequest("index2")).actionGet(); + client().prepareIndex("index1", "doc", "2").setSource("{}", XContentType.JSON).get(); + assertBusy(() -> { + StatsResponses response = client().execute(CcrStatsAction.INSTANCE, new StatsRequest()).actionGet(); + assertThat(response.getNodeFailures(), empty()); + assertThat(response.getTaskFailures(), empty()); + assertThat(response.getStatsResponses(), hasSize(1)); + assertThat(response.getStatsResponses().get(0).status().numberOfFailedBulkOperations(), greaterThanOrEqualTo(1L)); + }); + client().admin().indices().open(new OpenIndexRequest("index2")).actionGet(); + assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(2L))); + + unfollowIndex("index2"); + } + + public void testDeleteLeaderIndex() throws Exception { + assertAcked(client().admin().indices().prepareCreate("index1") + .setSettings(Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .build())); + ensureGreen("index1"); + + final FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2"); + final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest); + client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get(); + + client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get(); + assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L))); + + client().admin().indices().delete(new DeleteIndexRequest("index1")).actionGet(); + ensureNoCcrTasks(); + } + + public void testDeleteFollowerIndex() throws Exception { + assertAcked(client().admin().indices().prepareCreate("index1") + .setSettings(Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .build())); + ensureGreen("index1"); + + final FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2"); + final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest); + client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get(); + + client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get(); + assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L))); + + client().admin().indices().delete(new DeleteIndexRequest("index2")).actionGet(); + client().prepareIndex("index1", "doc", "2").setSource("{}", XContentType.JSON).get(); + ensureNoCcrTasks(); + } + private CheckedRunnable assertTask(final int numberOfPrimaryShards, final Map numDocsPerShard) { return () -> { final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); @@ -571,10 +688,14 @@ private void unfollowIndex(String... indices) throws Exception { unfollowRequest.setFollowIndex(index); client().execute(UnfollowIndexAction.INSTANCE, unfollowRequest).get(); } + ensureNoCcrTasks(); + } + + private void ensureNoCcrTasks() throws Exception { assertBusy(() -> { final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - assertThat(tasks.tasks().size(), equalTo(0)); + assertThat(tasks.tasks(), empty()); ListTasksRequest listTasksRequest = new ListTasksRequest(); listTasksRequest.setDetailed(true); From a1673830a3a7bf9a3d1f7e20033ad14241170def Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 26 Sep 2018 16:38:46 +0200 Subject: [PATCH 2/3] iter --- .../elasticsearch/xpack/ccr/ShardChangesIT.java | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java index b25445512a13c..b6f92988cd2e5 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java @@ -558,14 +558,13 @@ public void testCloseLeaderIndex() throws Exception { .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) .build())); - ensureGreen("index1"); final FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2"); final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest); client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get(); client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get(); - assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L))); + atLeastDocsIndexed("index2", 1L); client().admin().indices().close(new CloseIndexRequest("index1")).actionGet(); assertBusy(() -> { @@ -582,7 +581,7 @@ public void testCloseLeaderIndex() throws Exception { client().admin().indices().open(new OpenIndexRequest("index1")).actionGet(); client().prepareIndex("index1", "doc", "2").setSource("{}", XContentType.JSON).get(); - assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(2L))); + atLeastDocsIndexed("index2", 2L); unfollowIndex("index2"); } @@ -594,14 +593,13 @@ public void testCloseFollowIndex() throws Exception { .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) .build())); - ensureGreen("index1"); final FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2"); final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest); client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get(); client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get(); - assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L))); + atLeastDocsIndexed("index2", 1L); client().admin().indices().close(new CloseIndexRequest("index2")).actionGet(); client().prepareIndex("index1", "doc", "2").setSource("{}", XContentType.JSON).get(); @@ -613,7 +611,7 @@ public void testCloseFollowIndex() throws Exception { assertThat(response.getStatsResponses().get(0).status().numberOfFailedBulkOperations(), greaterThanOrEqualTo(1L)); }); client().admin().indices().open(new OpenIndexRequest("index2")).actionGet(); - assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(2L))); + atLeastDocsIndexed("index2", 2L); unfollowIndex("index2"); } @@ -625,14 +623,13 @@ public void testDeleteLeaderIndex() throws Exception { .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) .build())); - ensureGreen("index1"); final FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2"); final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest); client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get(); client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get(); - assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L))); + atLeastDocsIndexed("index2", 1L); client().admin().indices().delete(new DeleteIndexRequest("index1")).actionGet(); ensureNoCcrTasks(); @@ -645,14 +642,13 @@ public void testDeleteFollowerIndex() throws Exception { .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) .build())); - ensureGreen("index1"); final FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2"); final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest); client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get(); client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get(); - assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L))); + atLeastDocsIndexed("index2", 1L); client().admin().indices().delete(new DeleteIndexRequest("index2")).actionGet(); client().prepareIndex("index1", "doc", "2").setSource("{}", XContentType.JSON).get(); From 327ef7282bf78413be77945a5b53f620a11e8bdc Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 27 Sep 2018 09:52:30 +0200 Subject: [PATCH 3/3] reverted atLeastDocsIndexed(...) change --- .../org/elasticsearch/xpack/ccr/ShardChangesIT.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java index b6f92988cd2e5..32dba7467e38a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java @@ -564,7 +564,7 @@ public void testCloseLeaderIndex() throws Exception { client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get(); client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get(); - atLeastDocsIndexed("index2", 1L); + assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L))); client().admin().indices().close(new CloseIndexRequest("index1")).actionGet(); assertBusy(() -> { @@ -581,7 +581,7 @@ public void testCloseLeaderIndex() throws Exception { client().admin().indices().open(new OpenIndexRequest("index1")).actionGet(); client().prepareIndex("index1", "doc", "2").setSource("{}", XContentType.JSON).get(); - atLeastDocsIndexed("index2", 2L); + assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(2L))); unfollowIndex("index2"); } @@ -599,7 +599,7 @@ public void testCloseFollowIndex() throws Exception { client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get(); client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get(); - atLeastDocsIndexed("index2", 1L); + assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L))); client().admin().indices().close(new CloseIndexRequest("index2")).actionGet(); client().prepareIndex("index1", "doc", "2").setSource("{}", XContentType.JSON).get(); @@ -611,7 +611,7 @@ public void testCloseFollowIndex() throws Exception { assertThat(response.getStatsResponses().get(0).status().numberOfFailedBulkOperations(), greaterThanOrEqualTo(1L)); }); client().admin().indices().open(new OpenIndexRequest("index2")).actionGet(); - atLeastDocsIndexed("index2", 2L); + assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(2L))); unfollowIndex("index2"); } @@ -629,7 +629,7 @@ public void testDeleteLeaderIndex() throws Exception { client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get(); client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get(); - atLeastDocsIndexed("index2", 1L); + assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L))); client().admin().indices().delete(new DeleteIndexRequest("index1")).actionGet(); ensureNoCcrTasks(); @@ -648,7 +648,7 @@ public void testDeleteFollowerIndex() throws Exception { client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get(); client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get(); - atLeastDocsIndexed("index2", 1L); + assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L))); client().admin().indices().delete(new DeleteIndexRequest("index2")).actionGet(); client().prepareIndex("index1", "doc", "2").setSource("{}", XContentType.JSON).get();