From 50cc0e8e4de3e678efd01ad82864b0dd220821f4 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 18 Feb 2019 22:22:10 -0500 Subject: [PATCH 01/10] Remove retention leases when unfollowing This commit attempts to remove the retention leases on the leader shards when unfollowing an index. This is best effort, since the leader might not be available. --- .../xpack/ccr/CcrRetentionLeases.java | 70 ++++++++ .../ccr/action/TransportUnfollowAction.java | 158 ++++++++++++++++-- .../xpack/ccr/repository/CcrRepository.java | 62 +------ .../indexlifecycle/CCRIndexLifecycleIT.java | 3 - 4 files changed, 224 insertions(+), 69 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRetentionLeases.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRetentionLeases.java index 122fbdb969aa7..40ca030807867 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRetentionLeases.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRetentionLeases.java @@ -6,9 +6,20 @@ package org.elasticsearch.xpack.ccr; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; +import org.elasticsearch.index.seqno.RetentionLeaseActions; +import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException; +import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException; +import org.elasticsearch.index.shard.ShardId; import java.util.Locale; +import java.util.Optional; + +import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; public class CcrRetentionLeases { @@ -37,4 +48,63 @@ public static String retentionLeaseId( leaderIndex.getUUID()); } + public static Optional syncAddRetentionLease( + final ShardId leaderShardId, + final String retentionLeaseId, + final Client remoteClient, + final TimeValue timeout) { + try { + final PlainActionFuture response = new PlainActionFuture<>(); + asyncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient, response); + response.actionGet(timeout); + return Optional.empty(); + } catch (final RetentionLeaseAlreadyExistsException e) { + return Optional.of(e); + } + } + + public static void asyncAddRetentionLease( + final ShardId leaderShardId, + final String retentionLeaseId, + final Client remoteClient, + final ActionListener listener) { + final RetentionLeaseActions.AddRequest request = + new RetentionLeaseActions.AddRequest(leaderShardId, retentionLeaseId, RETAIN_ALL, "ccr"); + remoteClient.execute(RetentionLeaseActions.Add.INSTANCE, request, listener); + } + + public static Optional syncRenewRetentionLease( + final ShardId leaderShardId, + final String retentionLeaseId, + final Client remoteClient, + final TimeValue timeout) { + try { + final PlainActionFuture response = new PlainActionFuture<>(); + asyncRenewRetentionLease(leaderShardId, retentionLeaseId, remoteClient, response); + response.actionGet(timeout); + return Optional.empty(); + } catch (final RetentionLeaseNotFoundException e) { + return Optional.of(e); + } + } + + public static void asyncRenewRetentionLease( + final ShardId leaderShardId, + final String retentionLeaseId, + final Client remoteClient, + final ActionListener listener) { + final RetentionLeaseActions.RenewRequest request = + new RetentionLeaseActions.RenewRequest(leaderShardId, retentionLeaseId, RETAIN_ALL, "ccr"); + remoteClient.execute(RetentionLeaseActions.Renew.INSTANCE, request, listener); + } + + public static void asyncRemoveRetentionLease( + final ShardId leaderShardId, + final String retentionLeaseId, + final Client remoteClient, + final ActionListener listener) { + final RetentionLeaseActions.RemoveRequest request = new RetentionLeaseActions.RemoveRequest(leaderShardId, retentionLeaseId); + remoteClient.execute(RetentionLeaseActions.Remove.INSTANCE, request, listener); + } + } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java index 3a158aceddb2d..d066b3104c0ea 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java @@ -6,10 +6,15 @@ package org.elasticsearch.xpack.ccr.action; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.GroupedActionListener; +import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; @@ -20,22 +25,46 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.seqno.RetentionLeaseActions; +import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ccr.Ccr; +import org.elasticsearch.xpack.ccr.CcrRetentionLeases; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.core.ccr.action.UnfollowAction; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; + public class TransportUnfollowAction extends TransportMasterNodeAction { + private final Client client; + @Inject - public TransportUnfollowAction(TransportService transportService, ClusterService clusterService, - ThreadPool threadPool, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver) { - super(UnfollowAction.NAME, transportService, clusterService, threadPool, actionFilters, - UnfollowAction.Request::new, indexNameExpressionResolver); + public TransportUnfollowAction( + final TransportService transportService, + final ClusterService clusterService, + final ThreadPool threadPool, + final ActionFilters actionFilters, + final IndexNameExpressionResolver indexNameExpressionResolver, + final Client client) { + super( + UnfollowAction.NAME, + transportService, + clusterService, + threadPool, + actionFilters, + UnfollowAction.Request::new, + indexNameExpressionResolver); + this.client = Objects.requireNonNull(client); } @Override @@ -49,26 +78,131 @@ protected AcknowledgedResponse newResponse() { } @Override - protected void masterOperation(UnfollowAction.Request request, - ClusterState state, - ActionListener listener) throws Exception { + protected void masterOperation( + final UnfollowAction.Request request, + final ClusterState state, + final ActionListener listener) throws Exception { clusterService.submitStateUpdateTask("unfollow_action", new ClusterStateUpdateTask() { @Override - public ClusterState execute(ClusterState current) throws Exception { + public ClusterState execute(final ClusterState current) throws Exception { String followerIndex = request.getFollowerIndex(); return unfollow(followerIndex, current); } @Override - public void onFailure(String source, Exception e) { + public void onFailure(final String source, final Exception e) { listener.onFailure(e); } @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onResponse(new AcknowledgedResponse(true)); + public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) { + final IndexMetaData indexMetaData = oldState.metaData().index(request.getFollowerIndex()); + final Map ccrCustomMetaData = indexMetaData.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY); + final String remoteClusterName = ccrCustomMetaData.get(Ccr.CCR_CUSTOM_METADATA_REMOTE_CLUSTER_NAME_KEY); + final Client remoteClient = client.getRemoteClusterClient(remoteClusterName); + final String leaderIndexName = ccrCustomMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY); + final String leaderIndexUuid = ccrCustomMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY); + final Index leaderIndex = new Index(leaderIndexName, leaderIndexUuid); + final String retentionLeaseId = CcrRetentionLeases.retentionLeaseId( + oldState.getClusterName().value(), + indexMetaData.getIndex(), + remoteClusterName, + leaderIndex); + final int numberOfShards = IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(indexMetaData.getSettings()); + final GroupedActionListener groupListener = new GroupedActionListener<>( + new ActionListener>() { + + @Override + public void onResponse(Collection responses) { + logger.trace("removed retention lease [{}] on all leader primary shards", retentionLeaseId); + listener.onResponse(new AcknowledgedResponse(true)); + } + + @Override + public void onFailure(final Exception e) { + logger.warn("failure while removing retention lease [{}] on leader primary shards", retentionLeaseId); + listener.onFailure(e); + } + + }, + numberOfShards, + Collections.emptyList()); + for (int i = 0; i < numberOfShards; i++) { + final ShardId followerShardId = new ShardId(indexMetaData.getIndex(), i); + final ShardId leaderShardId = new ShardId(leaderIndex, i); + final AtomicInteger tryCounter = new AtomicInteger(1); + removeRetentionLeaseForShard( + leaderShardId, + retentionLeaseId, + remoteClient, + ActionListener.wrap( + groupListener::onResponse, + e -> handleException( + followerShardId, + retentionLeaseId, + leaderShardId, + remoteClient, + tryCounter, + groupListener, + e))); + } } + + private void handleException( + final ShardId followerShardId, + final String retentionLeaseId, + final ShardId leaderShardId, + final Client remoteClient, + final AtomicInteger tryCounter, + final GroupedActionListener groupListener, + final Exception e) { + final Throwable cause = ExceptionsHelper.unwrapCause(e); + if (cause instanceof RetentionLeaseNotFoundException) { + // treat as success + groupListener.onResponse(new RetentionLeaseActions.Response()); + } else if (TransportActions.isShardNotAvailableException(e) && tryCounter.get() < 16) { + logger.trace(new ParameterizedMessage( + "{} leader primary shard {} not available on try [{}] while removing retention lease [{}]", + followerShardId, + leaderShardId, + tryCounter.get(), + retentionLeaseId), + e); + tryCounter.incrementAndGet(); + removeRetentionLeaseForShard( + leaderShardId, + retentionLeaseId, + remoteClient, + ActionListener.wrap( + groupListener::onResponse, + // TODO: should this should exponentially backoff, or should we simply never retry? + inner -> handleException( + followerShardId, + retentionLeaseId, + leaderShardId, + remoteClient, + tryCounter, + groupListener, + inner))); + } else { + logger.warn(new ParameterizedMessage( + "{} failed to remove retention lease [{}] on leader primary shard {}", + followerShardId, + leaderShardId, + retentionLeaseId), + e); + } + } + + private void removeRetentionLeaseForShard( + final ShardId leaderShardId, + final String retentionLeaseId, + final Client remoteClient, + final ActionListener listener) { + CcrRetentionLeases.asyncRemoveRetentionLease(leaderShardId, retentionLeaseId, remoteClient, listener); + } + }); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 41cce3f5b0b06..cc88b687c6de8 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -44,7 +44,6 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.seqno.LocalCheckpointTracker; -import org.elasticsearch.index.seqno.RetentionLeaseActions; import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException; import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException; import org.elasticsearch.index.shard.IndexShard; @@ -70,6 +69,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; +import org.elasticsearch.xpack.ccr.CcrRetentionLeases; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.action.CcrRequests; import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction; @@ -94,9 +94,10 @@ import java.util.function.LongConsumer; import java.util.function.Supplier; -import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.retentionLeaseId; +import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.syncAddRetentionLease; +import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.syncRenewRetentionLease; /** @@ -326,7 +327,7 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { // we have to execute under the system context so that if security is enabled the renewal is authorized threadContext.markAsSystemContext(); - asyncRenewRetentionLease( + CcrRetentionLeases.asyncRenewRetentionLease( leaderShardId, retentionLeaseId, remoteClient, @@ -377,8 +378,9 @@ void acquireRetentionLeaseOnLeader( final Client remoteClient) { logger.trace( () -> new ParameterizedMessage("{} requesting leader to add retention lease [{}]", shardId, retentionLeaseId)); + final TimeValue timeout = ccrSettings.getRecoveryActionTimeout(); final Optional maybeAddAlready = - syncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient); + syncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient, timeout); maybeAddAlready.ifPresent(addAlready -> { logger.trace(() -> new ParameterizedMessage( "{} retention lease [{}] already exists, requesting a renewal", @@ -386,7 +388,7 @@ void acquireRetentionLeaseOnLeader( retentionLeaseId), addAlready); final Optional maybeRenewNotFound = - syncRenewRetentionLease(leaderShardId, retentionLeaseId, remoteClient); + syncRenewRetentionLease(leaderShardId, retentionLeaseId, remoteClient, timeout); maybeRenewNotFound.ifPresent(renewNotFound -> { logger.trace(() -> new ParameterizedMessage( "{} retention lease [{}] not found while attempting to renew, requesting a final add", @@ -394,7 +396,7 @@ void acquireRetentionLeaseOnLeader( retentionLeaseId), renewNotFound); final Optional maybeFallbackAddAlready = - syncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient); + syncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient, timeout); maybeFallbackAddAlready.ifPresent(fallbackAddAlready -> { /* * At this point we tried to add the lease and the retention lease already existed. By the time we tried to renew the @@ -407,54 +409,6 @@ void acquireRetentionLeaseOnLeader( }); } - private Optional syncAddRetentionLease( - final ShardId leaderShardId, - final String retentionLeaseId, - final Client remoteClient) { - try { - final PlainActionFuture response = new PlainActionFuture<>(); - asyncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient, response); - response.actionGet(ccrSettings.getRecoveryActionTimeout()); - return Optional.empty(); - } catch (final RetentionLeaseAlreadyExistsException e) { - return Optional.of(e); - } - } - - private void asyncAddRetentionLease( - final ShardId leaderShardId, - final String retentionLeaseId, - final Client remoteClient, - final ActionListener listener) { - final RetentionLeaseActions.AddRequest request = - new RetentionLeaseActions.AddRequest(leaderShardId, retentionLeaseId, RETAIN_ALL, "ccr"); - remoteClient.execute(RetentionLeaseActions.Add.INSTANCE, request, listener); - } - - private Optional syncRenewRetentionLease( - final ShardId leaderShardId, - final String retentionLeaseId, - final Client remoteClient) { - try { - final PlainActionFuture response = new PlainActionFuture<>(); - asyncRenewRetentionLease(leaderShardId, retentionLeaseId, remoteClient, response); - response.actionGet(ccrSettings.getRecoveryActionTimeout()); - return Optional.empty(); - } catch (final RetentionLeaseNotFoundException e) { - return Optional.of(e); - } - } - - private void asyncRenewRetentionLease( - final ShardId leaderShardId, - final String retentionLeaseId, - final Client remoteClient, - final ActionListener listener) { - final RetentionLeaseActions.RenewRequest request = - new RetentionLeaseActions.RenewRequest(leaderShardId, retentionLeaseId, RETAIN_ALL, "ccr"); - remoteClient.execute(RetentionLeaseActions.Renew.INSTANCE, request, listener); - } - // this setting is intentionally not registered, it is only used in tests public static final Setting RETENTION_LEASE_RENEW_INTERVAL_SETTING = Setting.timeSetting( diff --git a/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java index b3c93acb97b99..eaabab6d033c0 100644 --- a/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java +++ b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java @@ -326,9 +326,6 @@ public void testUnfollowInjectedBeforeShrink() throws Exception { } } - // Specifically, this is waiting for this bullet to be complete: - // - integrate shard history retention leases with cross-cluster replication - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37165") public void testCannotShrinkLeaderIndex() throws Exception { String indexName = "shrink-leader-test"; String shrunkenIndexName = "shrink-" + indexName; From 380d6770f0fcf898d0512c713645b01252b65543 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 18 Feb 2019 23:42:54 -0500 Subject: [PATCH 02/10] Add necessary permissions --- .../xpack/ccr/action/TransportUnfollowAction.java | 11 +++++++++-- .../security/authz/privilege/SystemPrivilege.java | 1 + .../core/security/authz/privilege/PrivilegeTests.java | 2 ++ .../security/authz/AuthorizationServiceTests.java | 1 + 4 files changed, 13 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java index d066b3104c0ea..74301b68315f2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ccr.action; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; @@ -25,6 +26,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.seqno.RetentionLeaseActions; @@ -158,6 +160,7 @@ private void handleException( final GroupedActionListener groupListener, final Exception e) { final Throwable cause = ExceptionsHelper.unwrapCause(e); + assert cause instanceof ElasticsearchSecurityException == false : e; if (cause instanceof RetentionLeaseNotFoundException) { // treat as success groupListener.onResponse(new RetentionLeaseActions.Response()); @@ -200,9 +203,13 @@ private void removeRetentionLeaseForShard( final String retentionLeaseId, final Client remoteClient, final ActionListener listener) { - CcrRetentionLeases.asyncRemoveRetentionLease(leaderShardId, retentionLeaseId, remoteClient, listener); + final ThreadContext threadContext = threadPool.getThreadContext(); + try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) { + // we have to execute under the system context so that if security is enabled the removal is authorized + threadContext.markAsSystemContext(); + CcrRetentionLeases.asyncRemoveRetentionLease(leaderShardId, retentionLeaseId, remoteClient, listener); + } } - }); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java index 523a810174912..dda81e6b86197 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java @@ -31,6 +31,7 @@ public final class SystemPrivilege extends Privilege { RetentionLeaseSyncAction.ACTION_NAME + "*", // needed for retention lease syncs RetentionLeaseBackgroundSyncAction.ACTION_NAME + "*", // needed for background retention lease syncs RetentionLeaseActions.Add.ACTION_NAME + "*", // needed for CCR to add retention leases + RetentionLeaseActions.Remove.ACTION_NAME + "*", // needed for CCR to remove retention leases RetentionLeaseActions.Renew.ACTION_NAME + "*", // needed for CCR to renew retention leases "indices:admin/settings/update" // needed for DiskThresholdMonitor.markIndicesReadOnly ); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/privilege/PrivilegeTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/privilege/PrivilegeTests.java index 46db9e83f7740..4af7dd2e57d62 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/privilege/PrivilegeTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/privilege/PrivilegeTests.java @@ -135,6 +135,8 @@ public void testSystem() throws Exception { assertThat(predicate.test("indices:admin/seq_no/retention_lease_background_sync[r]"), is(true)); assertThat(predicate.test("indices:admin/seq_no/add_retention_lease"), is(true)); assertThat(predicate.test("indices:admin/seq_no/add_retention_lease[s]"), is(true)); + assertThat(predicate.test("indices:admin/seq_no/remove_retention_lease"), is(true)); + assertThat(predicate.test("indices:admin/seq_no/remove_retention_lease[s]"), is(true)); assertThat(predicate.test("indices:admin/seq_no/renew_retention_lease"), is(true)); assertThat(predicate.test("indices:admin/seq_no/renew_retention_lease[s]"), is(true)); assertThat(predicate.test("indices:admin/settings/update"), is(true)); diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java index bde5949d378b3..b608a5694976b 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java @@ -262,6 +262,7 @@ public void testActionsForSystemUserIsAuthorized() throws IOException { "indices:admin/seq_no/retention_lease_sync", "indices:admin/seq_no/retention_lease_background_sync", "indices:admin/seq_no/add_retention_lease", + "indices:admin/seq_no/remove_retention_lease", "indices:admin/seq_no/renew_retention_lease", "indices:admin/settings/update" }; for (String action : actions) { From 1d5be64b73fc5ff43db8b7d5910711f41944e49d Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 18 Feb 2019 23:50:23 -0500 Subject: [PATCH 03/10] Add log message --- .../xpack/ccr/action/TransportUnfollowAction.java | 4 ++++ .../org/elasticsearch/xpack/ccr/repository/CcrRepository.java | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java index 74301b68315f2..8556d183b5a5e 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java @@ -135,6 +135,7 @@ public void onFailure(final Exception e) { final ShardId leaderShardId = new ShardId(leaderIndex, i); final AtomicInteger tryCounter = new AtomicInteger(1); removeRetentionLeaseForShard( + followerShardId, leaderShardId, retentionLeaseId, remoteClient, @@ -174,6 +175,7 @@ private void handleException( e); tryCounter.incrementAndGet(); removeRetentionLeaseForShard( + followerShardId, leaderShardId, retentionLeaseId, remoteClient, @@ -199,10 +201,12 @@ private void handleException( } private void removeRetentionLeaseForShard( + final ShardId followerShardId, final ShardId leaderShardId, final String retentionLeaseId, final Client remoteClient, final ActionListener listener) { + logger.trace("{} removing retention lease [{}] while unfollowing leader index", followerShardId, retentionLeaseId); final ThreadContext threadContext = threadPool.getThreadContext(); try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) { // we have to execute under the system context so that if security is enabled the removal is authorized diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index cc88b687c6de8..8d07d05cdb9eb 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -322,7 +322,7 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v // schedule renewals to run during the restore final Scheduler.Cancellable renewable = threadPool.scheduleWithFixedDelay( () -> { - logger.trace("{} background renewal of retention lease [{}] during restore", shardId, retentionLeaseId); + logger.trace("{} background renewal of retention lease [{}] during restore", indexShard.shardId(), retentionLeaseId); final ThreadContext threadContext = threadPool.getThreadContext(); try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { // we have to execute under the system context so that if security is enabled the renewal is authorized From cb4e87648b0fdf57be31129ffe9edb9d984be808 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 18 Feb 2019 23:53:49 -0500 Subject: [PATCH 04/10] Fix logging --- .../xpack/ccr/action/TransportUnfollowAction.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java index 8556d183b5a5e..f5b85eb505513 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java @@ -117,13 +117,20 @@ public void clusterStateProcessed(final String source, final ClusterState oldSta @Override public void onResponse(Collection responses) { - logger.trace("removed retention lease [{}] on all leader primary shards", retentionLeaseId); + logger.trace( + "[{}] removed retention lease [{}] on all leader primary shards", + indexMetaData.getIndex(), + retentionLeaseId); listener.onResponse(new AcknowledgedResponse(true)); } @Override public void onFailure(final Exception e) { - logger.warn("failure while removing retention lease [{}] on leader primary shards", retentionLeaseId); + logger.warn(new ParameterizedMessage( + "[{}] failure while removing retention lease [{}] on leader primary shards", + indexMetaData.getIndex(), + retentionLeaseId), + e); listener.onFailure(e); } From 8c172f9021392dc9718cc50547043a23c1c11576 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 19 Feb 2019 12:50:05 -0500 Subject: [PATCH 05/10] Iteration --- .../client/ccr/UnfollowResponse.java | 5 + .../ccr/action/TransportUnfollowAction.java | 92 +++++++---------- .../xpack/core/ccr/action/UnfollowAction.java | 98 ++++++++++++++++++- .../xpack/core/ccr/client/CcrClient.java | 6 +- 4 files changed, 137 insertions(+), 64 deletions(-) create mode 100644 client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/UnfollowResponse.java diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/UnfollowResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/UnfollowResponse.java new file mode 100644 index 0000000000000..d1fca766fc094 --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/UnfollowResponse.java @@ -0,0 +1,5 @@ +package org.elasticsearch.client.ccr; + +public class UnfollowResponse { + +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java index f5b85eb505513..dc0a7f1139d04 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java @@ -12,8 +12,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.GroupedActionListener; -import org.elasticsearch.action.support.TransportActions; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; @@ -44,9 +42,8 @@ import java.util.Collections; import java.util.Map; import java.util.Objects; -import java.util.concurrent.atomic.AtomicInteger; -public class TransportUnfollowAction extends TransportMasterNodeAction { +public class TransportUnfollowAction extends TransportMasterNodeAction { private final Client client; @@ -75,19 +72,19 @@ protected String executor() { } @Override - protected AcknowledgedResponse newResponse() { - return new AcknowledgedResponse(); + protected UnfollowAction.Response newResponse() { + return new UnfollowAction.Response(); } @Override protected void masterOperation( final UnfollowAction.Request request, final ClusterState state, - final ActionListener listener) throws Exception { + final ActionListener listener) { clusterService.submitStateUpdateTask("unfollow_action", new ClusterStateUpdateTask() { @Override - public ClusterState execute(final ClusterState current) throws Exception { + public ClusterState execute(final ClusterState current) { String followerIndex = request.getFollowerIndex(); return unfollow(followerIndex, current); } @@ -112,6 +109,7 @@ public void clusterStateProcessed(final String source, final ClusterState oldSta remoteClusterName, leaderIndex); final int numberOfShards = IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(indexMetaData.getSettings()); + final GroupedActionListener groupListener = new GroupedActionListener<>( new ActionListener>() { @@ -121,7 +119,7 @@ public void onResponse(Collection responses) { "[{}] removed retention lease [{}] on all leader primary shards", indexMetaData.getIndex(), retentionLeaseId); - listener.onResponse(new AcknowledgedResponse(true)); + listener.onResponse(new UnfollowAction.Response(true, null)); } @Override @@ -131,7 +129,7 @@ public void onFailure(final Exception e) { indexMetaData.getIndex(), retentionLeaseId), e); - listener.onFailure(e); + listener.onResponse(new UnfollowAction.Response(false, e)); } }, @@ -140,7 +138,6 @@ public void onFailure(final Exception e) { for (int i = 0; i < numberOfShards; i++) { final ShardId followerShardId = new ShardId(indexMetaData.getIndex(), i); final ShardId leaderShardId = new ShardId(leaderIndex, i); - final AtomicInteger tryCounter = new AtomicInteger(1); removeRetentionLeaseForShard( followerShardId, leaderShardId, @@ -152,75 +149,54 @@ public void onFailure(final Exception e) { followerShardId, retentionLeaseId, leaderShardId, - remoteClient, - tryCounter, groupListener, e))); } } + private void removeRetentionLeaseForShard( + final ShardId followerShardId, + final ShardId leaderShardId, + final String retentionLeaseId, + final Client remoteClient, + final ActionListener listener) { + logger.trace("{} removing retention lease [{}] while unfollowing leader index", followerShardId, retentionLeaseId); + final ThreadContext threadContext = threadPool.getThreadContext(); + try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) { + // we have to execute under the system context so that if security is enabled the removal is authorized + threadContext.markAsSystemContext(); + CcrRetentionLeases.asyncRemoveRetentionLease(leaderShardId, retentionLeaseId, remoteClient, listener); + } + } + private void handleException( final ShardId followerShardId, final String retentionLeaseId, final ShardId leaderShardId, - final Client remoteClient, - final AtomicInteger tryCounter, - final GroupedActionListener groupListener, + final ActionListener listener, final Exception e) { final Throwable cause = ExceptionsHelper.unwrapCause(e); assert cause instanceof ElasticsearchSecurityException == false : e; if (cause instanceof RetentionLeaseNotFoundException) { // treat as success - groupListener.onResponse(new RetentionLeaseActions.Response()); - } else if (TransportActions.isShardNotAvailableException(e) && tryCounter.get() < 16) { logger.trace(new ParameterizedMessage( - "{} leader primary shard {} not available on try [{}] while removing retention lease [{}]", - followerShardId, - leaderShardId, - tryCounter.get(), - retentionLeaseId), - e); - tryCounter.incrementAndGet(); - removeRetentionLeaseForShard( + "{} retention lease [{}] not found on {} while unfollowing", followerShardId, - leaderShardId, retentionLeaseId, - remoteClient, - ActionListener.wrap( - groupListener::onResponse, - // TODO: should this should exponentially backoff, or should we simply never retry? - inner -> handleException( - followerShardId, - retentionLeaseId, - leaderShardId, - remoteClient, - tryCounter, - groupListener, - inner))); + leaderShardId, + e)); + listener.onResponse(new RetentionLeaseActions.Response()); } else { logger.warn(new ParameterizedMessage( - "{} failed to remove retention lease [{}] on leader primary shard {}", - followerShardId, - leaderShardId, - retentionLeaseId), - e); + "{} failed to remove retention lease [{}] on {} while unfollowing", + followerShardId, + retentionLeaseId, + leaderShardId, + e)); + listener.onFailure(e); } } - private void removeRetentionLeaseForShard( - final ShardId followerShardId, - final ShardId leaderShardId, - final String retentionLeaseId, - final Client remoteClient, - final ActionListener listener) { - logger.trace("{} removing retention lease [{}] while unfollowing leader index", followerShardId, retentionLeaseId); - final ThreadContext threadContext = threadPool.getThreadContext(); - try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) { - // we have to execute under the system context so that if security is enabled the removal is authorized - threadContext.markAsSystemContext(); - CcrRetentionLeases.asyncRemoveRetentionLease(leaderShardId, retentionLeaseId, remoteClient, listener); - } - } }); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/UnfollowAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/UnfollowAction.java index 9bb440cc0895d..a3f57387c7b39 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/UnfollowAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/UnfollowAction.java @@ -6,20 +6,25 @@ package org.elasticsearch.xpack.core.ccr.action; +import org.elasticsearch.Version; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; +import java.util.Objects; import static org.elasticsearch.action.ValidateActions.addValidationError; -public class UnfollowAction extends Action { +public class UnfollowAction extends Action { public static final UnfollowAction INSTANCE = new UnfollowAction(); public static final String NAME = "indices:admin/xpack/ccr/unfollow"; @@ -29,8 +34,8 @@ private UnfollowAction() { } @Override - public AcknowledgedResponse newResponse() { - return new AcknowledgedResponse(); + public Response newResponse() { + return new Response(); } public static class Request extends AcknowledgedRequest implements IndicesRequest { @@ -76,4 +81,91 @@ public void writeTo(StreamOutput out) throws IOException { } } + public static class Response extends AcknowledgedResponse implements ToXContentObject { + + private boolean retentionLeasesRemoved; + @Nullable private Exception retentionLeasesRemovalFailureCause; + + public Response() { + + } + + public Response( + final boolean retentionLeasesRemoved, + final Exception retentionLeasesRemovalFailureCause) { + super(true); + this.retentionLeasesRemoved = retentionLeasesRemoved; + if (retentionLeasesRemoved && retentionLeasesRemovalFailureCause != null) { + throw new IllegalArgumentException( + "there should not be a failure cause when retention leases are removed", + retentionLeasesRemovalFailureCause); + } else if (retentionLeasesRemoved == false && retentionLeasesRemovalFailureCause == null) { + throw new IllegalArgumentException("there should be a failure cause when retention leases are not removed"); + } + this.retentionLeasesRemovalFailureCause = retentionLeasesRemovalFailureCause; + } + + @Override + public void readFrom(final StreamInput in) throws IOException { + super.readFrom(in); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + retentionLeasesRemoved = in.readBoolean(); + // noinspection StatementWithEmptyBody + if (retentionLeasesRemoved) { + + } else { + retentionLeasesRemovalFailureCause = in.readException(); + } + } + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + if (retentionLeasesRemoved) { + out.writeBoolean(true); + } else { + out.writeBoolean(false); + assert retentionLeasesRemovalFailureCause != null; + out.writeException(retentionLeasesRemovalFailureCause); + } + } + } + + @Override + protected void addCustomFields(XContentBuilder builder, Params params) throws IOException { + builder.field("acknowledged", acknowledged); + builder.field("retention_leases_removed", retentionLeasesRemoved); + if (retentionLeasesRemovalFailureCause != null) { + builder.field("retention_leases_removal_failure_cause", retentionLeasesRemovalFailureCause); + } + } + + @Override + public boolean equals(final Object that) { + if (this == that) return true; + if (that == null || getClass() != that.getClass()) return false; + final UnfollowAction.Response response = (UnfollowAction.Response) that; + return acknowledged == response.acknowledged && + retentionLeasesRemoved == response.retentionLeasesRemoved && + Objects.equals(retentionLeasesRemovalFailureCause, response.retentionLeasesRemovalFailureCause); + } + + @Override + public int hashCode() { + return Objects.hash(acknowledged, retentionLeasesRemoved, retentionLeasesRemovalFailureCause); + } + + @Override + public String toString() { + return "Response{" + + "acknowledged=" + acknowledged + + ", retentionLeasesRemoved=" + retentionLeasesRemoved + + ", retentionLeasesRemovalFailureCause=" + retentionLeasesRemovalFailureCause + + '}'; + } + + } + } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java index 43305b030be83..b4bc82392c210 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java @@ -86,12 +86,12 @@ public ActionFuture pauseFollow(final PauseFollowAction.Re return listener; } - public void unfollow(final UnfollowAction.Request request, final ActionListener listener) { + public void unfollow(final UnfollowAction.Request request, final ActionListener listener) { client.execute(UnfollowAction.INSTANCE, request, listener); } - public ActionFuture unfollow(final UnfollowAction.Request request) { - final PlainActionFuture listener = PlainActionFuture.newFuture(); + public ActionFuture unfollow(final UnfollowAction.Request request) { + final PlainActionFuture listener = PlainActionFuture.newFuture(); client.execute(UnfollowAction.INSTANCE, request, listener); return listener; } From fd795a2dcf97dc450656fd1b4d95dbd9194460c2 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 19 Feb 2019 21:03:53 -0500 Subject: [PATCH 06/10] wip --- .../org/elasticsearch/client/CcrClient.java | 9 +- .../client/ccr/UnfollowResponse.java | 91 ++++++++++++++++++- .../java/org/elasticsearch/client/CCRIT.java | 3 +- .../client/ccr/UnfollowResponseTests.java | 63 +++++++++++++ .../documentation/CCRDocumentationIT.java | 24 +++-- .../high-level/ccr/unfollow.asciidoc | 5 +- .../ccr/action/TransportUnfollowAction.java | 4 +- .../xpack/core/ccr/action/UnfollowAction.java | 10 +- 8 files changed, 189 insertions(+), 20 deletions(-) create mode 100644 client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/UnfollowResponseTests.java diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrClient.java index 9a17dabf39504..2d0f55a4d1b88 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrClient.java @@ -33,6 +33,7 @@ import org.elasticsearch.client.ccr.PutFollowResponse; import org.elasticsearch.client.ccr.ResumeFollowRequest; import org.elasticsearch.client.ccr.UnfollowRequest; +import org.elasticsearch.client.ccr.UnfollowResponse; import org.elasticsearch.client.core.AcknowledgedResponse; import java.io.IOException; @@ -197,12 +198,12 @@ public void resumeFollowAsync(ResumeFollowRequest request, * @return the response * @throws IOException in case there is a problem sending the request or parsing back the response */ - public AcknowledgedResponse unfollow(UnfollowRequest request, RequestOptions options) throws IOException { + public UnfollowResponse unfollow(UnfollowRequest request, RequestOptions options) throws IOException { return restHighLevelClient.performRequestAndParseEntity( request, CcrRequestConverters::unfollow, options, - AcknowledgedResponse::fromXContent, + UnfollowResponse::fromXContent, Collections.emptySet() ); } @@ -220,12 +221,12 @@ public AcknowledgedResponse unfollow(UnfollowRequest request, RequestOptions opt */ public void unfollowAsync(UnfollowRequest request, RequestOptions options, - ActionListener listener) { + ActionListener listener) { restHighLevelClient.performRequestAsyncAndParseEntity( request, CcrRequestConverters::unfollow, options, - AcknowledgedResponse::fromXContent, + UnfollowResponse::fromXContent, listener, Collections.emptySet() ); diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/UnfollowResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/UnfollowResponse.java index d1fca766fc094..72759ef165c7f 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/UnfollowResponse.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/UnfollowResponse.java @@ -1,5 +1,94 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.elasticsearch.client.ccr; -public class UnfollowResponse { +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Objects; + +public final class UnfollowResponse { + + static final ParseField ACKNOWLEDGED = new ParseField("acknowledged"); + static final ParseField RETENTION_LEASES_REMOVED = new ParseField("retention_leases_removed"); + static final ParseField RETENTION_LEASES_REMOVAL_FAILURE_CAUSE = new ParseField("retention_leases_removal_failure_cause"); + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "unfollow_response", true, args -> new UnfollowResponse((boolean) args[0], (boolean) args[1], (ElasticsearchException) args[2])); + + static { + PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ACKNOWLEDGED); + PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), RETENTION_LEASES_REMOVED); + PARSER.declareObject( + ConstructingObjectParser.constructorArg(), + (p, c) -> ElasticsearchException.fromXContent(p), + RETENTION_LEASES_REMOVAL_FAILURE_CAUSE); + } + + public static UnfollowResponse fromXContent(final XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + private final boolean acknowledged; + + public boolean isAcknowledged() { + return acknowledged; + } + + private final boolean retentionLeasesRemoved; + + public boolean isRetentionLeasesRemoved() { + return retentionLeasesRemoved; + } + + private final ElasticsearchException retentionLeasesRemovalFailureCause; + + public ElasticsearchException retentionLeasesRemovalFailureCause() { + return retentionLeasesRemovalFailureCause; + } + + UnfollowResponse( + final boolean acknowledged, + final boolean retentionLeasesRemoved, + final ElasticsearchException retentionLeasesRemovalFailureCause) { + this.acknowledged = acknowledged; + this.retentionLeasesRemoved = retentionLeasesRemoved; + this.retentionLeasesRemovalFailureCause = retentionLeasesRemovalFailureCause; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + UnfollowResponse that = (UnfollowResponse) o; + return acknowledged == that.acknowledged && + retentionLeasesRemoved == that.retentionLeasesRemoved && + Objects.equals(retentionLeasesRemovalFailureCause, that.retentionLeasesRemovalFailureCause); + } + + @Override + public int hashCode() { + return Objects.hash(acknowledged, retentionLeasesRemoved, retentionLeasesRemovalFailureCause); + } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java index ee2685dee6d92..cd9a8e14df138 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java @@ -44,6 +44,7 @@ import org.elasticsearch.client.ccr.PutFollowResponse; import org.elasticsearch.client.ccr.ResumeFollowRequest; import org.elasticsearch.client.ccr.UnfollowRequest; +import org.elasticsearch.client.ccr.UnfollowResponse; import org.elasticsearch.client.core.AcknowledgedResponse; import org.elasticsearch.client.indices.CreateIndexRequest; import org.elasticsearch.client.indices.CreateIndexResponse; @@ -177,7 +178,7 @@ public void testIndexFollowing() throws Exception { assertThat(closeIndexReponse.isAcknowledged(), is(true)); UnfollowRequest unfollowRequest = new UnfollowRequest("follower"); - AcknowledgedResponse unfollowResponse = execute(unfollowRequest, ccrClient::unfollow, ccrClient::unfollowAsync); + UnfollowResponse unfollowResponse = execute(unfollowRequest, ccrClient::unfollow, ccrClient::unfollowAsync); assertThat(unfollowResponse.isAcknowledged(), is(true)); } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/UnfollowResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/UnfollowResponseTests.java new file mode 100644 index 0000000000000..cef96fc81f0c7 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/UnfollowResponseTests.java @@ -0,0 +1,63 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.ccr; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester; + +public class UnfollowResponseTests extends ESTestCase { + + public void testFromXContent() throws IOException { + xContentTester( + this::createParser, + this::createTestInstance, + UnfollowResponseTests::toXContent, + UnfollowResponse::fromXContent).supportsUnknownFields(true) + .test(); + } + + private UnfollowResponse createTestInstance() { + final boolean retentionLeasesRemoved = randomBoolean(); + return new UnfollowResponse( + randomBoolean(), + retentionLeasesRemoved, + retentionLeasesRemoved ? null : new ElasticsearchException("failure")); + } + + public static void toXContent(final UnfollowResponse response, final XContentBuilder builder) throws IOException { + builder.startObject(); + { + builder.field(UnfollowResponse.ACKNOWLEDGED.getPreferredName(), response.isAcknowledged()); + builder.field(UnfollowResponse.RETENTION_LEASES_REMOVED.getPreferredName(), response.isRetentionLeasesRemoved()); + if (response.retentionLeasesRemovalFailureCause() != null) { + builder.field( + UnfollowResponse.RETENTION_LEASES_REMOVAL_FAILURE_CAUSE.getPreferredName(), + response.retentionLeasesRemovalFailureCause()); + } + } + builder.endObject(); + } + +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CCRDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CCRDocumentationIT.java index 2e54d1c4a1a7c..808b2b0f0f90c 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CCRDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CCRDocumentationIT.java @@ -20,6 +20,7 @@ package org.elasticsearch.client.documentation; import org.apache.http.util.EntityUtils; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; @@ -48,6 +49,7 @@ import org.elasticsearch.client.ccr.PutFollowResponse; import org.elasticsearch.client.ccr.ResumeFollowRequest; import org.elasticsearch.client.ccr.UnfollowRequest; +import org.elasticsearch.client.ccr.UnfollowResponse; import org.elasticsearch.client.core.AcknowledgedResponse; import org.elasticsearch.client.indices.CreateIndexRequest; import org.elasticsearch.client.indices.CreateIndexResponse; @@ -338,12 +340,15 @@ public void testUnfollow() throws Exception { // end::ccr-unfollow-request // tag::ccr-unfollow-execute - AcknowledgedResponse response = - client.ccr().unfollow(request, RequestOptions.DEFAULT); + UnfollowResponse response = client.ccr().unfollow(request, RequestOptions.DEFAULT); // end::ccr-unfollow-execute // tag::ccr-unfollow-response boolean acknowledged = response.isAcknowledged(); // <1> + boolean retentionLeasesRemoved = response.isRetentionLeasesRemoved(); // <2> + if (retentionLeasesRemoved == false) { + ElasticsearchException retentionLeasesRemovalFailureCause = response.retentionLeasesRemovalFailureCause(); // <3> + } // end::ccr-unfollow-response // Delete, put follow index, pause and close, so that it can be unfollowed again: @@ -366,16 +371,20 @@ public void testUnfollow() throws Exception { } // tag::ccr-unfollow-execute-listener - ActionListener listener = - new ActionListener() { + ActionListener listener = + new ActionListener() { @Override - public void onResponse(AcknowledgedResponse response) { + public void onResponse(UnfollowResponse response) { boolean acknowledged = response.isAcknowledged(); // <1> + boolean retentionLeasesRemoved = response.isRetentionLeasesRemoved(); // <2> + if (retentionLeasesRemoved == false) { + ElasticsearchException retentionLeasesRemovalFailureCause = response.retentionLeasesRemovalFailureCause(); // <3> + } } @Override public void onFailure(Exception e) { - // <2> + // <4> } }; // end::ccr-unfollow-execute-listener @@ -385,8 +394,7 @@ public void onFailure(Exception e) { listener = new LatchedActionListener<>(listener, latch); // tag::ccr-unfollow-execute-async - client.ccr() - .unfollowAsync(request, RequestOptions.DEFAULT, listener); // <1> + client.ccr().unfollowAsync(request, RequestOptions.DEFAULT, listener); // <1> // end::ccr-unfollow-execute-async assertTrue(latch.await(30L, TimeUnit.SECONDS)); diff --git a/docs/java-rest/high-level/ccr/unfollow.asciidoc b/docs/java-rest/high-level/ccr/unfollow.asciidoc index 779b8c3f586c4..c9c8a51dbf365 100644 --- a/docs/java-rest/high-level/ccr/unfollow.asciidoc +++ b/docs/java-rest/high-level/ccr/unfollow.asciidoc @@ -29,7 +29,10 @@ The returned +{response}+ indicates if the unfollow request was received. -------------------------------------------------- include-tagged::{doc-tests-file}[{api}-response] -------------------------------------------------- -<1> Whether or not the unfollow was acknowledge. +<1> Whether or not the unfollow was acknowledged. +<2> Whether or not the retention leases on the leader were successfully removed. +<3> Indicates the cause of the failure to remove the retention leases on the + leader, in case of failure. include::../execution.asciidoc[] diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java index dc0a7f1139d04..eee098805fcea 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java @@ -119,7 +119,7 @@ public void onResponse(Collection responses) { "[{}] removed retention lease [{}] on all leader primary shards", indexMetaData.getIndex(), retentionLeaseId); - listener.onResponse(new UnfollowAction.Response(true, null)); + listener.onResponse(new UnfollowAction.Response(true, true, null)); } @Override @@ -129,7 +129,7 @@ public void onFailure(final Exception e) { indexMetaData.getIndex(), retentionLeaseId), e); - listener.onResponse(new UnfollowAction.Response(false, e)); + listener.onResponse(new UnfollowAction.Response(true, false, e)); } }, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/UnfollowAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/UnfollowAction.java index a3f57387c7b39..211f8a2254b6b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/UnfollowAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/UnfollowAction.java @@ -91,9 +91,10 @@ public Response() { } public Response( + final boolean acknowledged, final boolean retentionLeasesRemoved, final Exception retentionLeasesRemovalFailureCause) { - super(true); + super(acknowledged); this.retentionLeasesRemoved = retentionLeasesRemoved; if (retentionLeasesRemoved && retentionLeasesRemovalFailureCause != null) { throw new IllegalArgumentException( @@ -110,12 +111,15 @@ public void readFrom(final StreamInput in) throws IOException { super.readFrom(in); if (in.getVersion().onOrAfter(Version.V_8_0_0)) { retentionLeasesRemoved = in.readBoolean(); - // noinspection StatementWithEmptyBody if (retentionLeasesRemoved) { - + retentionLeasesRemovalFailureCause = null; } else { retentionLeasesRemovalFailureCause = in.readException(); } + } else { + // the response is from an old version that did not attempt to remove retention leases, treat as success + retentionLeasesRemoved = true; + retentionLeasesRemovalFailureCause = null; } } From 70ae2961f6c04bf980f7d39f1572b4160c8c311a Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 19 Feb 2019 22:59:53 -0500 Subject: [PATCH 07/10] Add tests --- .../org/elasticsearch/client/CcrClient.java | 9 +- .../client/ccr/UnfollowResponse.java | 94 ------- .../java/org/elasticsearch/client/CCRIT.java | 3 +- .../client/ccr/UnfollowResponseTests.java | 63 ----- .../documentation/CCRDocumentationIT.java | 24 +- .../xpack/ccr/CcrRetentionLeases.java | 50 ++++ .../ccr/action/TransportUnfollowAction.java | 18 +- .../xpack/ccr/CcrRetentionLeaseIT.java | 238 +++++++++++++++--- .../xpack/core/ccr/action/UnfollowAction.java | 102 +------- .../xpack/core/ccr/client/CcrClient.java | 14 +- 10 files changed, 290 insertions(+), 325 deletions(-) delete mode 100644 client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/UnfollowResponse.java delete mode 100644 client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/UnfollowResponseTests.java diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrClient.java index 2d0f55a4d1b88..9a17dabf39504 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrClient.java @@ -33,7 +33,6 @@ import org.elasticsearch.client.ccr.PutFollowResponse; import org.elasticsearch.client.ccr.ResumeFollowRequest; import org.elasticsearch.client.ccr.UnfollowRequest; -import org.elasticsearch.client.ccr.UnfollowResponse; import org.elasticsearch.client.core.AcknowledgedResponse; import java.io.IOException; @@ -198,12 +197,12 @@ public void resumeFollowAsync(ResumeFollowRequest request, * @return the response * @throws IOException in case there is a problem sending the request or parsing back the response */ - public UnfollowResponse unfollow(UnfollowRequest request, RequestOptions options) throws IOException { + public AcknowledgedResponse unfollow(UnfollowRequest request, RequestOptions options) throws IOException { return restHighLevelClient.performRequestAndParseEntity( request, CcrRequestConverters::unfollow, options, - UnfollowResponse::fromXContent, + AcknowledgedResponse::fromXContent, Collections.emptySet() ); } @@ -221,12 +220,12 @@ public UnfollowResponse unfollow(UnfollowRequest request, RequestOptions options */ public void unfollowAsync(UnfollowRequest request, RequestOptions options, - ActionListener listener) { + ActionListener listener) { restHighLevelClient.performRequestAsyncAndParseEntity( request, CcrRequestConverters::unfollow, options, - UnfollowResponse::fromXContent, + AcknowledgedResponse::fromXContent, listener, Collections.emptySet() ); diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/UnfollowResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/UnfollowResponse.java deleted file mode 100644 index 72759ef165c7f..0000000000000 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/UnfollowResponse.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.client.ccr; - -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.common.ParseField; -import org.elasticsearch.common.xcontent.ConstructingObjectParser; -import org.elasticsearch.common.xcontent.XContentParser; - -import java.io.IOException; -import java.util.Objects; - -public final class UnfollowResponse { - - static final ParseField ACKNOWLEDGED = new ParseField("acknowledged"); - static final ParseField RETENTION_LEASES_REMOVED = new ParseField("retention_leases_removed"); - static final ParseField RETENTION_LEASES_REMOVAL_FAILURE_CAUSE = new ParseField("retention_leases_removal_failure_cause"); - - private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( - "unfollow_response", true, args -> new UnfollowResponse((boolean) args[0], (boolean) args[1], (ElasticsearchException) args[2])); - - static { - PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ACKNOWLEDGED); - PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), RETENTION_LEASES_REMOVED); - PARSER.declareObject( - ConstructingObjectParser.constructorArg(), - (p, c) -> ElasticsearchException.fromXContent(p), - RETENTION_LEASES_REMOVAL_FAILURE_CAUSE); - } - - public static UnfollowResponse fromXContent(final XContentParser parser) throws IOException { - return PARSER.parse(parser, null); - } - - private final boolean acknowledged; - - public boolean isAcknowledged() { - return acknowledged; - } - - private final boolean retentionLeasesRemoved; - - public boolean isRetentionLeasesRemoved() { - return retentionLeasesRemoved; - } - - private final ElasticsearchException retentionLeasesRemovalFailureCause; - - public ElasticsearchException retentionLeasesRemovalFailureCause() { - return retentionLeasesRemovalFailureCause; - } - - UnfollowResponse( - final boolean acknowledged, - final boolean retentionLeasesRemoved, - final ElasticsearchException retentionLeasesRemovalFailureCause) { - this.acknowledged = acknowledged; - this.retentionLeasesRemoved = retentionLeasesRemoved; - this.retentionLeasesRemovalFailureCause = retentionLeasesRemovalFailureCause; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - UnfollowResponse that = (UnfollowResponse) o; - return acknowledged == that.acknowledged && - retentionLeasesRemoved == that.retentionLeasesRemoved && - Objects.equals(retentionLeasesRemovalFailureCause, that.retentionLeasesRemovalFailureCause); - } - - @Override - public int hashCode() { - return Objects.hash(acknowledged, retentionLeasesRemoved, retentionLeasesRemovalFailureCause); - } - -} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java index cd9a8e14df138..ee2685dee6d92 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java @@ -44,7 +44,6 @@ import org.elasticsearch.client.ccr.PutFollowResponse; import org.elasticsearch.client.ccr.ResumeFollowRequest; import org.elasticsearch.client.ccr.UnfollowRequest; -import org.elasticsearch.client.ccr.UnfollowResponse; import org.elasticsearch.client.core.AcknowledgedResponse; import org.elasticsearch.client.indices.CreateIndexRequest; import org.elasticsearch.client.indices.CreateIndexResponse; @@ -178,7 +177,7 @@ public void testIndexFollowing() throws Exception { assertThat(closeIndexReponse.isAcknowledged(), is(true)); UnfollowRequest unfollowRequest = new UnfollowRequest("follower"); - UnfollowResponse unfollowResponse = execute(unfollowRequest, ccrClient::unfollow, ccrClient::unfollowAsync); + AcknowledgedResponse unfollowResponse = execute(unfollowRequest, ccrClient::unfollow, ccrClient::unfollowAsync); assertThat(unfollowResponse.isAcknowledged(), is(true)); } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/UnfollowResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/UnfollowResponseTests.java deleted file mode 100644 index cef96fc81f0c7..0000000000000 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/UnfollowResponseTests.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.client.ccr; - -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.test.ESTestCase; - -import java.io.IOException; - -import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester; - -public class UnfollowResponseTests extends ESTestCase { - - public void testFromXContent() throws IOException { - xContentTester( - this::createParser, - this::createTestInstance, - UnfollowResponseTests::toXContent, - UnfollowResponse::fromXContent).supportsUnknownFields(true) - .test(); - } - - private UnfollowResponse createTestInstance() { - final boolean retentionLeasesRemoved = randomBoolean(); - return new UnfollowResponse( - randomBoolean(), - retentionLeasesRemoved, - retentionLeasesRemoved ? null : new ElasticsearchException("failure")); - } - - public static void toXContent(final UnfollowResponse response, final XContentBuilder builder) throws IOException { - builder.startObject(); - { - builder.field(UnfollowResponse.ACKNOWLEDGED.getPreferredName(), response.isAcknowledged()); - builder.field(UnfollowResponse.RETENTION_LEASES_REMOVED.getPreferredName(), response.isRetentionLeasesRemoved()); - if (response.retentionLeasesRemovalFailureCause() != null) { - builder.field( - UnfollowResponse.RETENTION_LEASES_REMOVAL_FAILURE_CAUSE.getPreferredName(), - response.retentionLeasesRemovalFailureCause()); - } - } - builder.endObject(); - } - -} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CCRDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CCRDocumentationIT.java index 808b2b0f0f90c..2e54d1c4a1a7c 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CCRDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CCRDocumentationIT.java @@ -20,7 +20,6 @@ package org.elasticsearch.client.documentation; import org.apache.http.util.EntityUtils; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; @@ -49,7 +48,6 @@ import org.elasticsearch.client.ccr.PutFollowResponse; import org.elasticsearch.client.ccr.ResumeFollowRequest; import org.elasticsearch.client.ccr.UnfollowRequest; -import org.elasticsearch.client.ccr.UnfollowResponse; import org.elasticsearch.client.core.AcknowledgedResponse; import org.elasticsearch.client.indices.CreateIndexRequest; import org.elasticsearch.client.indices.CreateIndexResponse; @@ -340,15 +338,12 @@ public void testUnfollow() throws Exception { // end::ccr-unfollow-request // tag::ccr-unfollow-execute - UnfollowResponse response = client.ccr().unfollow(request, RequestOptions.DEFAULT); + AcknowledgedResponse response = + client.ccr().unfollow(request, RequestOptions.DEFAULT); // end::ccr-unfollow-execute // tag::ccr-unfollow-response boolean acknowledged = response.isAcknowledged(); // <1> - boolean retentionLeasesRemoved = response.isRetentionLeasesRemoved(); // <2> - if (retentionLeasesRemoved == false) { - ElasticsearchException retentionLeasesRemovalFailureCause = response.retentionLeasesRemovalFailureCause(); // <3> - } // end::ccr-unfollow-response // Delete, put follow index, pause and close, so that it can be unfollowed again: @@ -371,20 +366,16 @@ public void testUnfollow() throws Exception { } // tag::ccr-unfollow-execute-listener - ActionListener listener = - new ActionListener() { + ActionListener listener = + new ActionListener() { @Override - public void onResponse(UnfollowResponse response) { + public void onResponse(AcknowledgedResponse response) { boolean acknowledged = response.isAcknowledged(); // <1> - boolean retentionLeasesRemoved = response.isRetentionLeasesRemoved(); // <2> - if (retentionLeasesRemoved == false) { - ElasticsearchException retentionLeasesRemovalFailureCause = response.retentionLeasesRemovalFailureCause(); // <3> - } } @Override public void onFailure(Exception e) { - // <4> + // <2> } }; // end::ccr-unfollow-execute-listener @@ -394,7 +385,8 @@ public void onFailure(Exception e) { listener = new LatchedActionListener<>(listener, latch); // tag::ccr-unfollow-execute-async - client.ccr().unfollowAsync(request, RequestOptions.DEFAULT, listener); // <1> + client.ccr() + .unfollowAsync(request, RequestOptions.DEFAULT, listener); // <1> // end::ccr-unfollow-execute-async assertTrue(latch.await(30L, TimeUnit.SECONDS)); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRetentionLeases.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRetentionLeases.java index 40ca030807867..6afef8c42aa8b 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRetentionLeases.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRetentionLeases.java @@ -48,6 +48,16 @@ public static String retentionLeaseId( leaderIndex.getUUID()); } + /** + * Synchronously requests to add a retention lease with the specified retention lease ID on the specified leader shard using the given + * remote client. Note that this method will block up to the specified timeout. + * + * @param leaderShardId the leader shard ID + * @param retentionLeaseId the retention lease ID + * @param remoteClient the remote client on which to execute this request + * @param timeout the timeout + * @return an optional exception indicating whether or not the retention lease already exists + */ public static Optional syncAddRetentionLease( final ShardId leaderShardId, final String retentionLeaseId, @@ -63,6 +73,16 @@ public static Optional syncAddRetentionLea } } + /** + * Asynchronously requests to add a retention lease with the specified retention lease ID on the specified leader shard using the given + * remote client. Note that this method will return immediately, with the specified listener callback invoked to indicate a response + * or failure. + * + * @param leaderShardId the leader shard ID + * @param retentionLeaseId the retention lease ID + * @param remoteClient the remote client on which to execute this request + * @param listener the listener + */ public static void asyncAddRetentionLease( final ShardId leaderShardId, final String retentionLeaseId, @@ -73,6 +93,16 @@ public static void asyncAddRetentionLease( remoteClient.execute(RetentionLeaseActions.Add.INSTANCE, request, listener); } + /** + * Synchronously requests to renew a retention lease with the specified retention lease ID on the specified leader shard using the given + * remote client. Note that this method will block up to the specified timeout. + * + * @param leaderShardId the leader shard ID + * @param retentionLeaseId the retention lease ID + * @param remoteClient the remote client on which to execute this request + * @param timeout the timeout + * @return an optional exception indicating whether or not the retention lease already exists + */ public static Optional syncRenewRetentionLease( final ShardId leaderShardId, final String retentionLeaseId, @@ -88,6 +118,16 @@ public static Optional syncRenewRetentionLease( } } + /** + * Asynchronously requests to renew a retention lease with the specified retention lease ID on the specified leader shard using the + * given remote client. Note that this method will return immediately, with the specified listener callback invoked to indicate a + * response or failure. + * + * @param leaderShardId the leader shard ID + * @param retentionLeaseId the retention lease ID + * @param remoteClient the remote client on which to execute this request + * @param listener the listener + */ public static void asyncRenewRetentionLease( final ShardId leaderShardId, final String retentionLeaseId, @@ -98,6 +138,16 @@ public static void asyncRenewRetentionLease( remoteClient.execute(RetentionLeaseActions.Renew.INSTANCE, request, listener); } + /** + * Asynchronously requests to remove a retention lease with the specified retention lease ID on the specified leader shard using the + * given remote client. Note that this method will return immediately, with the specified listener callback invoked to indicate a + * response or failure. + * + * @param leaderShardId the leader shard ID + * @param retentionLeaseId the retention lease ID + * @param remoteClient the remote client on which to execute this request + * @param listener the listener + */ public static void asyncRemoveRetentionLease( final ShardId leaderShardId, final String retentionLeaseId, diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java index eee098805fcea..ae14990064770 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java @@ -7,11 +7,13 @@ package org.elasticsearch.xpack.ccr.action; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.GroupedActionListener; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; @@ -43,7 +45,7 @@ import java.util.Map; import java.util.Objects; -public class TransportUnfollowAction extends TransportMasterNodeAction { +public class TransportUnfollowAction extends TransportMasterNodeAction { private final Client client; @@ -72,15 +74,15 @@ protected String executor() { } @Override - protected UnfollowAction.Response newResponse() { - return new UnfollowAction.Response(); + protected AcknowledgedResponse newResponse() { + return new AcknowledgedResponse(); } @Override protected void masterOperation( final UnfollowAction.Request request, final ClusterState state, - final ActionListener listener) { + final ActionListener listener) { clusterService.submitStateUpdateTask("unfollow_action", new ClusterStateUpdateTask() { @Override @@ -114,12 +116,12 @@ public void clusterStateProcessed(final String source, final ClusterState oldSta new ActionListener>() { @Override - public void onResponse(Collection responses) { + public void onResponse(final Collection responses) { logger.trace( "[{}] removed retention lease [{}] on all leader primary shards", indexMetaData.getIndex(), retentionLeaseId); - listener.onResponse(new UnfollowAction.Response(true, true, null)); + listener.onResponse(new AcknowledgedResponse(true)); } @Override @@ -129,7 +131,9 @@ public void onFailure(final Exception e) { indexMetaData.getIndex(), retentionLeaseId), e); - listener.onResponse(new UnfollowAction.Response(true, false, e)); + final ElasticsearchException wrapper = new ElasticsearchException(e); + wrapper.addMetadata("es.failed_to_remove_retention_leases", retentionLeaseId); + listener.onFailure(wrapper); } }, diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java index c42887e6b52f1..e04a190d622e0 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java @@ -7,9 +7,12 @@ package org.elasticsearch.xpack.ccr; import com.carrotsearch.hppc.cursors.ObjectCursor; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; @@ -29,15 +32,22 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.seqno.RetentionLease; +import org.elasticsearch.index.seqno.RetentionLeaseActions; import org.elasticsearch.index.seqno.RetentionLeases; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardClosedException; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.snapshots.RestoreInfo; import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.CcrIntegTestCase; import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction; import org.elasticsearch.xpack.ccr.repository.CcrRepository; +import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; +import org.elasticsearch.xpack.core.ccr.action.UnfollowAction; import java.io.IOException; import java.util.ArrayList; @@ -45,17 +55,23 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; +import static java.util.Collections.singletonMap; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.retentionLeaseId; import static org.hamcrest.Matchers.arrayWithSize; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; @@ -148,26 +164,19 @@ public void testRetentionLeaseIsTakenAtTheStartOfRecovery() throws Exception { final PlainActionFuture future = PlainActionFuture.newFuture(); restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future)); - final ClusterStateResponse leaderIndexClusterState = - leaderClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(leaderIndex).get(); - final String leaderUUID = leaderIndexClusterState.getState().metaData().index(leaderIndex).getIndexUUID(); - // ensure that a retention lease has been put in place on each shard assertBusy(() -> { final IndicesStatsResponse stats = leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); assertNotNull(stats.getShards()); assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas))); - final List shardStats = getShardStats(stats); + final List shardsStats = getShardsStats(stats); for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { - final RetentionLeases currentRetentionLeases = shardStats.get(i).getRetentionLeaseStats().retentionLeases(); + final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases(); assertThat(currentRetentionLeases.leases(), hasSize(1)); - final ClusterStateResponse followerIndexClusterState = - followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get(); - final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID(); final RetentionLease retentionLease = currentRetentionLeases.leases().iterator().next(); - assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, followerUUID, leaderIndex, leaderUUID))); + assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, leaderIndex))); } }); @@ -215,7 +224,6 @@ public void testRetentionLeaseIsRenewedDuringRecovery() throws Exception { } connection.sendRequest(requestId, action, request, options); }); - } } @@ -223,10 +231,6 @@ public void testRetentionLeaseIsRenewedDuringRecovery() throws Exception { final PlainActionFuture future = PlainActionFuture.newFuture(); restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future)); - final ClusterStateResponse leaderIndexClusterState = - leaderClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(leaderIndex).get(); - final String leaderUUID = leaderIndexClusterState.getState().metaData().index(leaderIndex).getIndexUUID(); - try { // ensure that a retention lease has been put in place on each shard, and grab a copy of them final List retentionLeases = new ArrayList<>(); @@ -236,16 +240,13 @@ public void testRetentionLeaseIsRenewedDuringRecovery() throws Exception { leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); assertNotNull(stats.getShards()); assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas))); - final List shardStats = getShardStats(stats); + final List shardsStats = getShardsStats(stats); for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { - final RetentionLeases currentRetentionLeases = shardStats.get(i).getRetentionLeaseStats().retentionLeases(); + final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases(); assertThat(currentRetentionLeases.leases(), hasSize(1)); - final ClusterStateResponse followerIndexClusterState = - followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get(); - final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID(); final RetentionLease retentionLease = currentRetentionLeases.leases().iterator().next(); - assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, followerUUID, leaderIndex, leaderUUID))); + assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, leaderIndex))); retentionLeases.add(currentRetentionLeases); } }); @@ -256,16 +257,13 @@ public void testRetentionLeaseIsRenewedDuringRecovery() throws Exception { leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); assertNotNull(stats.getShards()); assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas))); - final List shardStats = getShardStats(stats); + final List shardsStats = getShardsStats(stats); for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { - final RetentionLeases currentRetentionLeases = shardStats.get(i).getRetentionLeaseStats().retentionLeases(); + final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases(); assertThat(currentRetentionLeases.leases(), hasSize(1)); - final ClusterStateResponse followerIndexClusterState = - followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get(); - final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID(); final RetentionLease retentionLease = currentRetentionLeases.leases().iterator().next(); - assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, followerUUID, leaderIndex, leaderUUID))); + assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, leaderIndex))); // we assert that retention leases are being renewed by an increase in the timestamp assertThat(retentionLease.timestamp(), greaterThan(retentionLeases.get(i).leases().iterator().next().timestamp())); } @@ -321,9 +319,9 @@ public void testRetentionLeasesAreNotBeingRenewedAfterRecoveryCompletes() throws leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); assertNotNull(stats.getShards()); assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas))); - final List shardStats = getShardStats(stats); + final List shardsStats = getShardsStats(stats); for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { - final RetentionLeases currentRetentionLeases = shardStats.get(i).getRetentionLeaseStats().retentionLeases(); + final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases(); assertThat(currentRetentionLeases.leases(), hasSize(1)); final ClusterStateResponse followerIndexClusterState = followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get(); @@ -349,9 +347,9 @@ public void testRetentionLeasesAreNotBeingRenewedAfterRecoveryCompletes() throws leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); assertNotNull(stats.getShards()); assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas))); - final List shardStats = getShardStats(stats); + final List shardsStats = getShardsStats(stats); for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { - final RetentionLeases currentRetentionLeases = shardStats.get(i).getRetentionLeaseStats().retentionLeases(); + final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases(); assertThat(currentRetentionLeases.leases(), hasSize(1)); final ClusterStateResponse followerIndexClusterState = followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get(); @@ -371,6 +369,170 @@ public void testRetentionLeasesAreNotBeingRenewedAfterRecoveryCompletes() throws } } + public void testUnfollowRemovesRetentionLeases() throws Exception { + final String leaderIndex = "leader"; + final String followerIndex = "follower"; + final String leaderIndexSettings = + getIndexSettings(randomIntBetween(1, 4), 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get()); + final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + + ensureFollowerGreen(true, followerIndex); + + final String retentionLeaseId = getRetentionLeaseId(followerIndex, leaderIndex); + + final IndicesStatsResponse stats = + leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); + final List shardsStats = getShardsStats(stats); + for (final ShardStats shardStats : shardsStats) { + assertThat(shardStats.getRetentionLeaseStats().retentionLeases().leases(), hasSize(1)); + assertThat( + shardStats.getRetentionLeaseStats().retentionLeases().leases().iterator().next().id(), + equalTo(retentionLeaseId)); + } + + // we will sometimes fake that some of the retention leases are already removed on the leader shard + final Set shardIds = new HashSet<>(Arrays.asList(0, 1, 2, 3)); + // new HashSet<>(randomSubsetOf(randomIntBetween(0, 4), IntStream.range(0, 4).boxed().collect(Collectors.toSet()))); + + final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get(); + try { + for (final ObjectCursor senderNode : followerClusterState.getState().nodes().getDataNodes().values()) { + final MockTransportService senderTransportService = + (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); + final ClusterStateResponse leaderClusterState = + leaderClient().admin().cluster().prepareState().clear().setNodes(true).get(); + for (final ObjectCursor receiverNode : leaderClusterState.getState().nodes().getDataNodes().values()) { + final MockTransportService receiverTransportService = + (MockTransportService) getLeaderCluster().getInstance(TransportService.class, receiverNode.value.getName()); + senderTransportService.addSendBehavior(receiverTransportService, + (connection, requestId, action, request, options) -> { + if (RetentionLeaseActions.Remove.ACTION_NAME.equals(action)) { + final RetentionLeaseActions.RemoveRequest removeRequest = (RetentionLeaseActions.RemoveRequest) request; + if (shardIds.contains(removeRequest.getShardId().id())) { + final String primaryShardNodeId = + getLeaderCluster() + .clusterService() + .state() + .routingTable() + .index(leaderIndex) + .shard(removeRequest.getShardId().id()) + .primaryShard() + .currentNodeId(); + final String primaryShardNodeName = + getLeaderCluster().clusterService().state().nodes().get(primaryShardNodeId).getName(); + final IndexShard primary = + getLeaderCluster() + .getInstance(IndicesService.class, primaryShardNodeName) + .getShardOrNull(removeRequest.getShardId()); + final CountDownLatch latch = new CountDownLatch(1); + primary.removeRetentionLease( + retentionLeaseId, + ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString()))); + try { + latch.await(); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + fail(e.toString()); + } + } + } + connection.sendRequest(requestId, action, request, options); + }); + } + + } + + + pauseFollow(followerIndex); + followerClient().admin().indices().close(new CloseIndexRequest(followerIndex)).actionGet(); + assertAcked(followerClient().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request(followerIndex)).actionGet()); + + final IndicesStatsResponse afterUnfollowStats = + leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); + final List afterUnfollowShardsStats = getShardsStats(afterUnfollowStats); + for (final ShardStats shardStats : afterUnfollowShardsStats) { + assertThat(shardStats.getRetentionLeaseStats().retentionLeases().leases(), empty()); + } + } finally { + + } + } + + public void testUnfollowFailsToRemoveRetentionLeases() throws Exception { + final String leaderIndex = "leader"; + final String followerIndex = "follower"; + final int numberOfShards = randomIntBetween(1, 4); + final String leaderIndexSettings = + getIndexSettings(numberOfShards, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get()); + final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + + ensureFollowerGreen(true, followerIndex); + + pauseFollow(followerIndex); + followerClient().admin().indices().close(new CloseIndexRequest(followerIndex)).actionGet(); + + // we will disrupt requests to remove retention leases for these random shards + final Set shardIds = + new HashSet<>(randomSubsetOf(randomIntBetween(1, 4), IntStream.range(0, 4).boxed().collect(Collectors.toSet()))); + + final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get(); + try { + for (final ObjectCursor senderNode : followerClusterState.getState().nodes().getDataNodes().values()) { + final MockTransportService senderTransportService = + (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); + final ClusterStateResponse leaderClusterState = + leaderClient().admin().cluster().prepareState().clear().setNodes(true).get(); + for (final ObjectCursor receiverNode : leaderClusterState.getState().nodes().getDataNodes().values()) { + final MockTransportService receiverTransportService = + (MockTransportService) getLeaderCluster().getInstance(TransportService.class, receiverNode.value.getName()); + senderTransportService.addSendBehavior(receiverTransportService, + (connection, requestId, action, request, options) -> { + if (RetentionLeaseActions.Remove.ACTION_NAME.equals(action)) { + final RetentionLeaseActions.RemoveRequest removeRequest = (RetentionLeaseActions.RemoveRequest) request; + if (shardIds.contains(removeRequest.getShardId().id())) { + throw randomBoolean() + ? new ConnectTransportException(receiverNode.value, "connection failed") + : new IndexShardClosedException(removeRequest.getShardId()); + } + } + connection.sendRequest(requestId, action, request, options); + }); + } + + } + + final ElasticsearchException e = expectThrows( + ElasticsearchException.class, + () -> followerClient().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request(followerIndex)).actionGet()); + + final ClusterStateResponse followerIndexClusterState = + followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get(); + final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID(); + + final ClusterStateResponse leaderIndexClusterState = + leaderClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(leaderIndex).get(); + final String leaderUUID = leaderIndexClusterState.getState().metaData().index(leaderIndex).getIndexUUID(); + + assertThat( + e.getMetadata("es.failed_to_remove_retention_leases"), + contains(retentionLeaseId( + getFollowerCluster().getClusterName(), + new Index(followerIndex, followerUUID), + getLeaderCluster().getClusterName(), + new Index(leaderIndex, leaderUUID)))); + } finally { + for (final ObjectCursor senderNode : followerClusterState.getState().nodes().getDataNodes().values()) { + final MockTransportService senderTransportService = + (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); + senderTransportService.clearAllRules(); + } + } + } + /** * Extract the shard stats from an indices stats response, with the stats ordered by shard ID with primaries first. This is to have a * consistent ordering when comparing two responses. @@ -378,7 +540,7 @@ public void testRetentionLeasesAreNotBeingRenewedAfterRecoveryCompletes() throws * @param stats the indices stats * @return the shard stats in sorted order with (shard ID, primary) as the sort key */ - private List getShardStats(final IndicesStatsResponse stats) { + private List getShardsStats(final IndicesStatsResponse stats) { return Arrays.stream(stats.getShards()) .sorted((s, t) -> { if (s.getShardRouting().shardId().id() == t.getShardRouting().shardId().id()) { @@ -390,6 +552,18 @@ private List getShardStats(final IndicesStatsResponse stats) { .collect(Collectors.toList()); } + private String getRetentionLeaseId(final String followerIndex, final String leaderIndex) { + final ClusterStateResponse followerIndexClusterState = + followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get(); + final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID(); + + final ClusterStateResponse leaderIndexClusterState = + leaderClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(leaderIndex).get(); + final String leaderUUID = leaderIndexClusterState.getState().metaData().index(leaderIndex).getIndexUUID(); + + return getRetentionLeaseId(followerIndex, followerUUID, leaderIndex, leaderUUID); + } + private String getRetentionLeaseId(String followerIndex, String followerUUID, String leaderIndex, String leaderUUID) { return retentionLeaseId( getFollowerCluster().getClusterName(), diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/UnfollowAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/UnfollowAction.java index 211f8a2254b6b..9bb440cc0895d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/UnfollowAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/UnfollowAction.java @@ -6,25 +6,20 @@ package org.elasticsearch.xpack.core.ccr.action; -import org.elasticsearch.Version; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.xcontent.ToXContentObject; -import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; -import java.util.Objects; import static org.elasticsearch.action.ValidateActions.addValidationError; -public class UnfollowAction extends Action { +public class UnfollowAction extends Action { public static final UnfollowAction INSTANCE = new UnfollowAction(); public static final String NAME = "indices:admin/xpack/ccr/unfollow"; @@ -34,8 +29,8 @@ private UnfollowAction() { } @Override - public Response newResponse() { - return new Response(); + public AcknowledgedResponse newResponse() { + return new AcknowledgedResponse(); } public static class Request extends AcknowledgedRequest implements IndicesRequest { @@ -81,95 +76,4 @@ public void writeTo(StreamOutput out) throws IOException { } } - public static class Response extends AcknowledgedResponse implements ToXContentObject { - - private boolean retentionLeasesRemoved; - @Nullable private Exception retentionLeasesRemovalFailureCause; - - public Response() { - - } - - public Response( - final boolean acknowledged, - final boolean retentionLeasesRemoved, - final Exception retentionLeasesRemovalFailureCause) { - super(acknowledged); - this.retentionLeasesRemoved = retentionLeasesRemoved; - if (retentionLeasesRemoved && retentionLeasesRemovalFailureCause != null) { - throw new IllegalArgumentException( - "there should not be a failure cause when retention leases are removed", - retentionLeasesRemovalFailureCause); - } else if (retentionLeasesRemoved == false && retentionLeasesRemovalFailureCause == null) { - throw new IllegalArgumentException("there should be a failure cause when retention leases are not removed"); - } - this.retentionLeasesRemovalFailureCause = retentionLeasesRemovalFailureCause; - } - - @Override - public void readFrom(final StreamInput in) throws IOException { - super.readFrom(in); - if (in.getVersion().onOrAfter(Version.V_8_0_0)) { - retentionLeasesRemoved = in.readBoolean(); - if (retentionLeasesRemoved) { - retentionLeasesRemovalFailureCause = null; - } else { - retentionLeasesRemovalFailureCause = in.readException(); - } - } else { - // the response is from an old version that did not attempt to remove retention leases, treat as success - retentionLeasesRemoved = true; - retentionLeasesRemovalFailureCause = null; - } - } - - @Override - public void writeTo(final StreamOutput out) throws IOException { - super.writeTo(out); - if (out.getVersion().onOrAfter(Version.V_8_0_0)) { - if (retentionLeasesRemoved) { - out.writeBoolean(true); - } else { - out.writeBoolean(false); - assert retentionLeasesRemovalFailureCause != null; - out.writeException(retentionLeasesRemovalFailureCause); - } - } - } - - @Override - protected void addCustomFields(XContentBuilder builder, Params params) throws IOException { - builder.field("acknowledged", acknowledged); - builder.field("retention_leases_removed", retentionLeasesRemoved); - if (retentionLeasesRemovalFailureCause != null) { - builder.field("retention_leases_removal_failure_cause", retentionLeasesRemovalFailureCause); - } - } - - @Override - public boolean equals(final Object that) { - if (this == that) return true; - if (that == null || getClass() != that.getClass()) return false; - final UnfollowAction.Response response = (UnfollowAction.Response) that; - return acknowledged == response.acknowledged && - retentionLeasesRemoved == response.retentionLeasesRemoved && - Objects.equals(retentionLeasesRemovalFailureCause, response.retentionLeasesRemovalFailureCause); - } - - @Override - public int hashCode() { - return Objects.hash(acknowledged, retentionLeasesRemoved, retentionLeasesRemovalFailureCause); - } - - @Override - public String toString() { - return "Response{" + - "acknowledged=" + acknowledged + - ", retentionLeasesRemoved=" + retentionLeasesRemoved + - ", retentionLeasesRemovalFailureCause=" + retentionLeasesRemovalFailureCause + - '}'; - } - - } - } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java index b4bc82392c210..f35a14314338c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java @@ -12,13 +12,13 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; -import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; -import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction; -import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; +import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction; -import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; +import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction; +import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; +import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; import org.elasticsearch.xpack.core.ccr.action.UnfollowAction; import java.util.Objects; @@ -86,12 +86,12 @@ public ActionFuture pauseFollow(final PauseFollowAction.Re return listener; } - public void unfollow(final UnfollowAction.Request request, final ActionListener listener) { + public void unfollow(final UnfollowAction.Request request, final ActionListener listener) { client.execute(UnfollowAction.INSTANCE, request, listener); } - public ActionFuture unfollow(final UnfollowAction.Request request) { - final PlainActionFuture listener = PlainActionFuture.newFuture(); + public ActionFuture unfollow(final UnfollowAction.Request request) { + final PlainActionFuture listener = PlainActionFuture.newFuture(); client.execute(UnfollowAction.INSTANCE, request, listener); return listener; } From 0d5e93a416c18d79240ffc5d6b1ef3d131ca66ba Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 19 Feb 2019 23:03:54 -0500 Subject: [PATCH 08/10] Revert unnecessary change --- docs/java-rest/high-level/ccr/unfollow.asciidoc | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/docs/java-rest/high-level/ccr/unfollow.asciidoc b/docs/java-rest/high-level/ccr/unfollow.asciidoc index c9c8a51dbf365..779b8c3f586c4 100644 --- a/docs/java-rest/high-level/ccr/unfollow.asciidoc +++ b/docs/java-rest/high-level/ccr/unfollow.asciidoc @@ -29,10 +29,7 @@ The returned +{response}+ indicates if the unfollow request was received. -------------------------------------------------- include-tagged::{doc-tests-file}[{api}-response] -------------------------------------------------- -<1> Whether or not the unfollow was acknowledged. -<2> Whether or not the retention leases on the leader were successfully removed. -<3> Indicates the cause of the failure to remove the retention leases on the - leader, in case of failure. +<1> Whether or not the unfollow was acknowledge. include::../execution.asciidoc[] From d834e5e23701d9e986362817643a6fc1528975fb Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 19 Feb 2019 23:22:45 -0500 Subject: [PATCH 09/10] Fix logger calls --- .../xpack/ccr/action/TransportUnfollowAction.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java index ae14990064770..0e6b0cccefffb 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java @@ -187,16 +187,16 @@ private void handleException( "{} retention lease [{}] not found on {} while unfollowing", followerShardId, retentionLeaseId, - leaderShardId, - e)); + leaderShardId), + e); listener.onResponse(new RetentionLeaseActions.Response()); } else { logger.warn(new ParameterizedMessage( "{} failed to remove retention lease [{}] on {} while unfollowing", followerShardId, retentionLeaseId, - leaderShardId, - e)); + leaderShardId), + e); listener.onFailure(e); } } From 09ddc76a83b77bb759eaf3cdf480a02a1bf336b9 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 19 Feb 2019 23:24:32 -0500 Subject: [PATCH 10/10] Uncomment randomness --- .../java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java index e04a190d622e0..29430ff6d25cd 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java @@ -393,8 +393,8 @@ public void testUnfollowRemovesRetentionLeases() throws Exception { } // we will sometimes fake that some of the retention leases are already removed on the leader shard - final Set shardIds = new HashSet<>(Arrays.asList(0, 1, 2, 3)); - // new HashSet<>(randomSubsetOf(randomIntBetween(0, 4), IntStream.range(0, 4).boxed().collect(Collectors.toSet()))); + final Set shardIds = + new HashSet<>(randomSubsetOf(randomIntBetween(0, 4), IntStream.range(0, 4).boxed().collect(Collectors.toSet()))); final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get(); try {