diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java index 317890edb4206..c4651e877fadd 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java @@ -23,7 +23,7 @@ public class ClearCcrRestoreSessionAction extends Action { public static final ClearCcrRestoreSessionAction INSTANCE = new ClearCcrRestoreSessionAction(); - private static final String NAME = "internal:admin/ccr/restore/session/clear"; + public static final String NAME = "internal:admin/ccr/restore/session/clear"; private ClearCcrRestoreSessionAction() { super(NAME); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java index eceacc1d926d8..393548225a0c2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java @@ -33,7 +33,7 @@ public class PutCcrRestoreSessionAction extends Action { public static final PutCcrRestoreSessionAction INSTANCE = new PutCcrRestoreSessionAction(); - private static final String NAME = "internal:admin/ccr/restore/session/put"; + public static final String NAME = "internal:admin/ccr/restore/session/put"; private PutCcrRestoreSessionAction() { super(NAME); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java index 9f061b9c33099..d39262208e795 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java @@ -40,6 +40,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.CcrIntegTestCase; import org.elasticsearch.xpack.ccr.action.repositories.GetCcrRestoreFileChunkAction; +import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionAction; import org.elasticsearch.xpack.ccr.repository.CcrRepository; import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService; @@ -48,6 +49,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -59,8 +61,6 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThan; -// TODO: Fold this integration test into a more expansive integration test as more bootstrap from remote work -// TODO: is completed. public class CcrRepositoryIT extends CcrIntegTestCase { private final IndicesOptions indicesOptions = IndicesOptions.strictSingleIndexNoExpandForbidClosed(); @@ -197,36 +197,6 @@ public void testDocsAreRecovered() throws Exception { leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).setWaitIfOngoing(true).get(); - AtomicBoolean isRunning = new AtomicBoolean(true); - - // Concurrently index new docs with mapping changes - Thread thread = new Thread(() -> { - char[] chars = "abcdeghijklmnopqrstuvwxyz".toCharArray(); - for (char c : chars) { - if (isRunning.get() == false) { - break; - } - final String source; - long l = randomLongBetween(0, 50000); - if (randomBoolean()) { - source = String.format(Locale.ROOT, "{\"%c\":%d}", c, l); - } else { - source = String.format(Locale.ROOT, "{\"%c\":\"%d\"}", c, l); - } - for (int i = 64; i < 150; i++) { - if (isRunning.get() == false) { - break; - } - leaderClient().prepareIndex("index1", "doc", Long.toString(i)).setSource(source, XContentType.JSON).get(); - if (rarely()) { - leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).get(); - } - } - leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).setWaitIfOngoing(true).get(); - } - }); - thread.start(); - Settings.Builder settingsBuilder = Settings.builder() .put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followerIndex) .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true); @@ -245,9 +215,6 @@ public void testDocsAreRecovered() throws Exception { assertExpectedDocument(followerIndex, i); } - isRunning.set(false); - thread.join(); - settingsRequest = new ClusterUpdateSettingsRequest(); ByteSizeValue defaultValue = CcrSettings.RECOVERY_CHUNK_SIZE.getDefault(Settings.EMPTY); settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_CHUNK_SIZE.getKey(), defaultValue)); @@ -421,23 +388,60 @@ public void testFollowerMappingIsUpdated() throws IOException { .renameReplacement(followerIndex).masterNodeTimeout(new TimeValue(1L, TimeUnit.HOURS)) .indexSettings(settingsBuilder); - final String source = String.format(Locale.ROOT, "{\"k\":%d}", 1); - leaderClient().prepareIndex("index1", "doc", Long.toString(1)).setSource(source, XContentType.JSON).get(); - PlainActionFuture future = PlainActionFuture.newFuture(); - restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future)); - RestoreInfo restoreInfo = future.actionGet(); + List transportServices = new ArrayList<>(); + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean updateSent = new AtomicBoolean(false); + Runnable updateMappings = () -> { + if (updateSent.compareAndSet(false, true)) { + leaderClient() + .admin() + .indices() + .preparePutMapping(leaderIndex) + .setType("doc") + .setSource("{\"properties\":{\"k\":{\"type\":\"long\"}}}", XContentType.JSON) + .execute(ActionListener.wrap(latch::countDown)); + } + try { + latch.await(); + } catch (InterruptedException e) { + throw ExceptionsHelper.convertToRuntime(e); + } + }; - assertEquals(restoreInfo.totalShards(), restoreInfo.successfulShards()); - assertEquals(0, restoreInfo.failedShards()); + for (TransportService transportService : getFollowerCluster().getDataOrMasterNodeInstances(TransportService.class)) { + MockTransportService mockTransportService = (MockTransportService) transportService; + transportServices.add(mockTransportService); + mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (action.equals(PutCcrRestoreSessionAction.NAME)) { + updateMappings.run(); + connection.sendRequest(requestId, action, request, options); + } else { + connection.sendRequest(requestId, action, request, options); + } + }); + } + + try { + PlainActionFuture future = PlainActionFuture.newFuture(); + restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future)); + RestoreInfo restoreInfo = future.actionGet(); - ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); - clusterStateRequest.clear(); - clusterStateRequest.metaData(true); - clusterStateRequest.indices(followerIndex); - MappingMetaData mappingMetaData = followerClient().admin().indices().prepareGetMappings("index2").get().getMappings() - .get("index2").get("doc"); - assertThat(XContentMapValues.extractValue("properties.k.type", mappingMetaData.sourceAsMap()), equalTo("long")); + assertEquals(restoreInfo.totalShards(), restoreInfo.successfulShards()); + assertEquals(0, restoreInfo.failedShards()); + + ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.clear(); + clusterStateRequest.metaData(true); + clusterStateRequest.indices(followerIndex); + MappingMetaData mappingMetaData = followerClient().admin().indices().prepareGetMappings("index2").get().getMappings() + .get("index2").get("doc"); + assertThat(XContentMapValues.extractValue("properties.k.type", mappingMetaData.sourceAsMap()), equalTo("long")); + } finally { + for (MockTransportService transportService : transportServices) { + transportService.clearAllRules(); + } + } } private void assertExpectedDocument(String followerIndex, final int value) {