From b7a0c89c6c08c7388559db410d7fc31c8449da0a Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 12 Feb 2019 13:57:44 -0700 Subject: [PATCH] Improve CcrRepositoryIT mappings tests Currently we index documents concurrently to attempt to ensure that we update mappings during the restore process. However, this does not actually test that the mapping will be correct and is dangerous as it can lead to a misalignment between the max sequence number and the local checkpoint. If these are not aligned, peer recovery cannot be completed without initiating following which this test does not do. That causes teardown assertions to fail. This commit removes the concurrent indexing and flushes after the documents are indexed. Additionally it modifies the mapping specific test to ensure that there is a mapping update when the restore session is initiated. This mapping update is picked up at the end of the restore by the follower. --- .../ClearCcrRestoreSessionAction.java | 2 +- .../PutCcrRestoreSessionAction.java | 2 +- .../xpack/ccr/CcrRepositoryIT.java | 102 +++++++++--------- 3 files changed, 55 insertions(+), 51 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java index 317890edb4206..c4651e877fadd 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java @@ -23,7 +23,7 @@ public class ClearCcrRestoreSessionAction extends Action { public static final ClearCcrRestoreSessionAction INSTANCE = new ClearCcrRestoreSessionAction(); - private static final String NAME = "internal:admin/ccr/restore/session/clear"; + public static final String NAME = "internal:admin/ccr/restore/session/clear"; private ClearCcrRestoreSessionAction() { super(NAME); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java index eceacc1d926d8..393548225a0c2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java @@ -33,7 +33,7 @@ public class PutCcrRestoreSessionAction extends Action { public static final PutCcrRestoreSessionAction INSTANCE = new PutCcrRestoreSessionAction(); - private static final String NAME = "internal:admin/ccr/restore/session/put"; + public static final String NAME = "internal:admin/ccr/restore/session/put"; private PutCcrRestoreSessionAction() { super(NAME); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java index 9f061b9c33099..d39262208e795 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java @@ -40,6 +40,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.CcrIntegTestCase; import org.elasticsearch.xpack.ccr.action.repositories.GetCcrRestoreFileChunkAction; +import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionAction; import org.elasticsearch.xpack.ccr.repository.CcrRepository; import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService; @@ -48,6 +49,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -59,8 +61,6 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThan; -// TODO: Fold this integration test into a more expansive integration test as more bootstrap from remote work -// TODO: is completed. public class CcrRepositoryIT extends CcrIntegTestCase { private final IndicesOptions indicesOptions = IndicesOptions.strictSingleIndexNoExpandForbidClosed(); @@ -197,36 +197,6 @@ public void testDocsAreRecovered() throws Exception { leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).setWaitIfOngoing(true).get(); - AtomicBoolean isRunning = new AtomicBoolean(true); - - // Concurrently index new docs with mapping changes - Thread thread = new Thread(() -> { - char[] chars = "abcdeghijklmnopqrstuvwxyz".toCharArray(); - for (char c : chars) { - if (isRunning.get() == false) { - break; - } - final String source; - long l = randomLongBetween(0, 50000); - if (randomBoolean()) { - source = String.format(Locale.ROOT, "{\"%c\":%d}", c, l); - } else { - source = String.format(Locale.ROOT, "{\"%c\":\"%d\"}", c, l); - } - for (int i = 64; i < 150; i++) { - if (isRunning.get() == false) { - break; - } - leaderClient().prepareIndex("index1", "doc", Long.toString(i)).setSource(source, XContentType.JSON).get(); - if (rarely()) { - leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).get(); - } - } - leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).setWaitIfOngoing(true).get(); - } - }); - thread.start(); - Settings.Builder settingsBuilder = Settings.builder() .put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followerIndex) .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true); @@ -245,9 +215,6 @@ public void testDocsAreRecovered() throws Exception { assertExpectedDocument(followerIndex, i); } - isRunning.set(false); - thread.join(); - settingsRequest = new ClusterUpdateSettingsRequest(); ByteSizeValue defaultValue = CcrSettings.RECOVERY_CHUNK_SIZE.getDefault(Settings.EMPTY); settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_CHUNK_SIZE.getKey(), defaultValue)); @@ -421,23 +388,60 @@ public void testFollowerMappingIsUpdated() throws IOException { .renameReplacement(followerIndex).masterNodeTimeout(new TimeValue(1L, TimeUnit.HOURS)) .indexSettings(settingsBuilder); - final String source = String.format(Locale.ROOT, "{\"k\":%d}", 1); - leaderClient().prepareIndex("index1", "doc", Long.toString(1)).setSource(source, XContentType.JSON).get(); - PlainActionFuture future = PlainActionFuture.newFuture(); - restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future)); - RestoreInfo restoreInfo = future.actionGet(); + List transportServices = new ArrayList<>(); + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean updateSent = new AtomicBoolean(false); + Runnable updateMappings = () -> { + if (updateSent.compareAndSet(false, true)) { + leaderClient() + .admin() + .indices() + .preparePutMapping(leaderIndex) + .setType("doc") + .setSource("{\"properties\":{\"k\":{\"type\":\"long\"}}}", XContentType.JSON) + .execute(ActionListener.wrap(latch::countDown)); + } + try { + latch.await(); + } catch (InterruptedException e) { + throw ExceptionsHelper.convertToRuntime(e); + } + }; - assertEquals(restoreInfo.totalShards(), restoreInfo.successfulShards()); - assertEquals(0, restoreInfo.failedShards()); + for (TransportService transportService : getFollowerCluster().getDataOrMasterNodeInstances(TransportService.class)) { + MockTransportService mockTransportService = (MockTransportService) transportService; + transportServices.add(mockTransportService); + mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (action.equals(PutCcrRestoreSessionAction.NAME)) { + updateMappings.run(); + connection.sendRequest(requestId, action, request, options); + } else { + connection.sendRequest(requestId, action, request, options); + } + }); + } + + try { + PlainActionFuture future = PlainActionFuture.newFuture(); + restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future)); + RestoreInfo restoreInfo = future.actionGet(); - ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); - clusterStateRequest.clear(); - clusterStateRequest.metaData(true); - clusterStateRequest.indices(followerIndex); - MappingMetaData mappingMetaData = followerClient().admin().indices().prepareGetMappings("index2").get().getMappings() - .get("index2").get("doc"); - assertThat(XContentMapValues.extractValue("properties.k.type", mappingMetaData.sourceAsMap()), equalTo("long")); + assertEquals(restoreInfo.totalShards(), restoreInfo.successfulShards()); + assertEquals(0, restoreInfo.failedShards()); + + ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.clear(); + clusterStateRequest.metaData(true); + clusterStateRequest.indices(followerIndex); + MappingMetaData mappingMetaData = followerClient().admin().indices().prepareGetMappings("index2").get().getMappings() + .get("index2").get("doc"); + assertThat(XContentMapValues.extractValue("properties.k.type", mappingMetaData.sourceAsMap()), equalTo("long")); + } finally { + for (MockTransportService transportService : transportServices) { + transportService.clearAllRules(); + } + } } private void assertExpectedDocument(String followerIndex, final int value) {