diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/upgrade/UpgradeIT.java b/core/src/test/java/org/elasticsearch/action/admin/indices/upgrade/UpgradeIT.java index 620cef31f9a08..9af2289c8daa3 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/upgrade/UpgradeIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/upgrade/UpgradeIT.java @@ -21,28 +21,23 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; -import org.elasticsearch.action.admin.indices.segments.IndexSegments; -import org.elasticsearch.action.admin.indices.segments.IndexShardSegments; -import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; -import org.elasticsearch.action.admin.indices.segments.ShardSegments; import org.elasticsearch.action.admin.indices.upgrade.get.IndexUpgradeStatus; -import org.elasticsearch.action.admin.indices.upgrade.get.UpgradeStatusResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.engine.Segment; import org.elasticsearch.test.ESBackcompatTestCase; import org.elasticsearch.test.ESIntegTestCase; import org.junit.BeforeClass; import java.util.ArrayList; -import java.util.Collection; import java.util.List; +import static org.elasticsearch.test.OldIndexUtils.assertNotUpgraded; +import static org.elasticsearch.test.OldIndexUtils.assertUpgraded; +import static org.elasticsearch.test.OldIndexUtils.getUpgradeStatus; +import static org.elasticsearch.test.OldIndexUtils.isUpgraded; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; @@ -152,30 +147,6 @@ public void testUpgrade() throws Exception { assertUpgraded(client()); } - public static void assertNotUpgraded(Client client, String... index) throws Exception { - for (IndexUpgradeStatus status : getUpgradeStatus(client, index)) { - assertTrue("index " + status.getIndex() + " should not be zero sized", status.getTotalBytes() != 0); - // TODO: it would be better for this to be strictly greater, but sometimes an extra flush - // mysteriously happens after the second round of docs are indexed - assertTrue("index " + status.getIndex() + " should have recovered some segments from transaction log", - status.getTotalBytes() >= status.getToUpgradeBytes()); - assertTrue("index " + status.getIndex() + " should need upgrading", status.getToUpgradeBytes() != 0); - } - } - - public static void assertNoAncientSegments(Client client, String... index) throws Exception { - for (IndexUpgradeStatus status : getUpgradeStatus(client, index)) { - assertTrue("index " + status.getIndex() + " should not be zero sized", status.getTotalBytes() != 0); - // TODO: it would be better for this to be strictly greater, but sometimes an extra flush - // mysteriously happens after the second round of docs are indexed - assertTrue("index " + status.getIndex() + " should not have any ancient segments", - status.getToUpgradeBytesAncient() == 0); - assertTrue("index " + status.getIndex() + " should have recovered some segments from transaction log", - status.getTotalBytes() >= status.getToUpgradeBytes()); - assertTrue("index " + status.getIndex() + " should need upgrading", status.getToUpgradeBytes() != 0); - } - } - /** Returns true if there are any ancient segments. */ public static boolean hasAncientSegments(Client client, String index) throws Exception { for (IndexUpgradeStatus status : getUpgradeStatus(client, index)) { @@ -196,44 +167,6 @@ public static boolean hasOldButNotAncientSegments(Client client, String index) t return false; } - public static void assertUpgraded(Client client, String... index) throws Exception { - for (IndexUpgradeStatus status : getUpgradeStatus(client, index)) { - assertTrue("index " + status.getIndex() + " should not be zero sized", status.getTotalBytes() != 0); - assertEquals("index " + status.getIndex() + " should be upgraded", - 0, status.getToUpgradeBytes()); - } - - // double check using the segments api that all segments are actually upgraded - IndicesSegmentResponse segsRsp; - if (index == null) { - segsRsp = client().admin().indices().prepareSegments().execute().actionGet(); - } else { - segsRsp = client().admin().indices().prepareSegments(index).execute().actionGet(); - } - for (IndexSegments indexSegments : segsRsp.getIndices().values()) { - for (IndexShardSegments shard : indexSegments) { - for (ShardSegments segs : shard.getShards()) { - for (Segment seg : segs.getSegments()) { - assertEquals("Index " + indexSegments.getIndex() + " has unupgraded segment " + seg.toString(), - Version.CURRENT.luceneVersion.major, seg.version.major); - assertEquals("Index " + indexSegments.getIndex() + " has unupgraded segment " + seg.toString(), - Version.CURRENT.luceneVersion.minor, seg.version.minor); - } - } - } - } - } - - static boolean isUpgraded(Client client, String index) throws Exception { - ESLogger logger = Loggers.getLogger(UpgradeIT.class); - int toUpgrade = 0; - for (IndexUpgradeStatus status : getUpgradeStatus(client, index)) { - logger.info("Index: {}, total: {}, toUpgrade: {}", status.getIndex(), status.getTotalBytes(), status.getToUpgradeBytes()); - toUpgrade += status.getToUpgradeBytes(); - } - return toUpgrade == 0; - } - static class UpgradeStatus { public final String indexName; public final int totalBytes; @@ -249,10 +182,4 @@ public UpgradeStatus(String indexName, int totalBytes, int toUpgradeBytes, int t } } - @SuppressWarnings("unchecked") - static Collection getUpgradeStatus(Client client, String... indices) throws Exception { - UpgradeStatusResponse upgradeStatusResponse = client.admin().indices().prepareUpgradeStatus(indices).get(); - assertNoFailures(upgradeStatusResponse); - return upgradeStatusResponse.getIndices().values(); - } } diff --git a/core/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityIT.java b/core/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityIT.java index c4d2f4e0ec9b9..246bc69c2b304 100644 --- a/core/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityIT.java +++ b/core/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityIT.java @@ -19,10 +19,8 @@ package org.elasticsearch.bwcompat; -import org.apache.lucene.index.IndexWriter; import org.apache.lucene.search.Explanation; import org.apache.lucene.util.LuceneTestCase; -import org.apache.lucene.util.SmallFloat; import org.apache.lucene.util.TestUtil; import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; @@ -30,18 +28,14 @@ import org.elasticsearch.action.admin.indices.segments.IndexShardSegments; import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; import org.elasticsearch.action.admin.indices.segments.ShardSegments; -import org.elasticsearch.action.admin.indices.upgrade.UpgradeIT; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.elasticsearch.common.io.FileSystemUtils; -import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.IndexFolderUpgrader; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; @@ -50,7 +44,6 @@ import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.MetaDataStateFormat; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.MergePolicyConfig; import org.elasticsearch.index.engine.Segment; import org.elasticsearch.index.mapper.string.StringFieldMapperPositionIncrementGapTests; import org.elasticsearch.index.query.QueryBuilders; @@ -64,6 +57,7 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalSettingsPlugin; import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.OldIndexUtils; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; import org.hamcrest.Matchers; @@ -72,13 +66,8 @@ import java.io.IOException; import java.io.InputStream; -import java.nio.file.DirectoryStream; -import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.SimpleFileVisitor; -import java.nio.file.attribute.BasicFileAttributes; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -87,8 +76,8 @@ import java.util.SortedSet; import java.util.TreeSet; +import static org.elasticsearch.test.OldIndexUtils.assertUpgradeWorks; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.greaterThanOrEqualTo; // needs at least 2 nodes since it bumps replicas to 1 @@ -113,19 +102,8 @@ protected Collection> nodePlugins() { @Before public void initIndexesList() throws Exception { - indexes = loadIndexesList("index"); - unsupportedIndexes = loadIndexesList("unsupported"); - } - - private List loadIndexesList(String prefix) throws IOException { - List indexes = new ArrayList<>(); - try (DirectoryStream stream = Files.newDirectoryStream(getBwcIndicesPath(), prefix + "-*.zip")) { - for (Path path : stream) { - indexes.add(path.getFileName().toString()); - } - } - Collections.sort(indexes); - return indexes; + indexes = OldIndexUtils.loadIndexesList("index", getBwcIndicesPath()); + unsupportedIndexes = OldIndexUtils.loadIndexesList("unsupported", getBwcIndicesPath()); } @AfterClass @@ -138,11 +116,7 @@ public static void tearDownStatics() { @Override public Settings nodeSettings(int ord) { - return Settings.builder() - .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) // disable merging so no segments will be upgraded - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 30) // speed up recoveries - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 30) - .build(); + return OldIndexUtils.getSettings(); } void setupCluster() throws Exception { @@ -151,14 +125,15 @@ void setupCluster() throws Exception { Path baseTempDir = createTempDir(); // start single data path node Settings.Builder nodeSettings = Settings.builder() - .put(Environment.PATH_DATA_SETTING.getKey(), baseTempDir.resolve("single-path").toAbsolutePath()) - .put(Node.NODE_MASTER_SETTING.getKey(), false); // workaround for dangling index loading issue when node is master + .put(Environment.PATH_DATA_SETTING.getKey(), baseTempDir.resolve("single-path").toAbsolutePath()) + .put(Node.NODE_MASTER_SETTING.getKey(), false); // workaround for dangling index loading issue when node is master InternalTestCluster.Async singleDataPathNode = internalCluster().startNodeAsync(nodeSettings.build()); // start multi data path node nodeSettings = Settings.builder() - .put(Environment.PATH_DATA_SETTING.getKey(), baseTempDir.resolve("multi-path1").toAbsolutePath() + "," + baseTempDir.resolve("multi-path2").toAbsolutePath()) - .put(Node.NODE_MASTER_SETTING.getKey(), false); // workaround for dangling index loading issue when node is master + .put(Environment.PATH_DATA_SETTING.getKey(), baseTempDir.resolve("multi-path1").toAbsolutePath() + "," + baseTempDir + .resolve("multi-path2").toAbsolutePath()) + .put(Node.NODE_MASTER_SETTING.getKey(), false); // workaround for dangling index loading issue when node is master InternalTestCluster.Async multiDataPathNode = internalCluster().startNodeAsync(nodeSettings.build()); // find single data path dir @@ -174,8 +149,8 @@ void setupCluster() throws Exception { multiDataPathNodeName = multiDataPathNode.get(); nodePaths = internalCluster().getInstance(NodeEnvironment.class, multiDataPathNodeName).nodeDataPaths(); assertEquals(2, nodePaths.length); - multiDataPath = new Path[] {nodePaths[0].resolve(NodeEnvironment.INDICES_FOLDER), - nodePaths[1].resolve(NodeEnvironment.INDICES_FOLDER)}; + multiDataPath = new Path[]{nodePaths[0].resolve(NodeEnvironment.INDICES_FOLDER), + nodePaths[1].resolve(NodeEnvironment.INDICES_FOLDER)}; assertFalse(Files.exists(multiDataPath[0])); assertFalse(Files.exists(multiDataPath[1])); Files.createDirectories(multiDataPath[0]); @@ -186,42 +161,8 @@ void setupCluster() throws Exception { } void upgradeIndexFolder() throws Exception { - final NodeEnvironment nodeEnvironment = internalCluster().getInstance(NodeEnvironment.class, singleDataPathNodeName); - IndexFolderUpgrader.upgradeIndicesIfNeeded(Settings.EMPTY, nodeEnvironment); - final NodeEnvironment nodeEnv = internalCluster().getInstance(NodeEnvironment.class, multiDataPathNodeName); - IndexFolderUpgrader.upgradeIndicesIfNeeded(Settings.EMPTY, nodeEnv); - } - - String loadIndex(String indexFile) throws Exception { - Path unzipDir = createTempDir(); - Path unzipDataDir = unzipDir.resolve("data"); - String indexName = indexFile.replace(".zip", "").toLowerCase(Locale.ROOT).replace("unsupported-", "index-"); - - // decompress the index - Path backwardsIndex = getBwcIndicesPath().resolve(indexFile); - try (InputStream stream = Files.newInputStream(backwardsIndex)) { - TestUtil.unzip(stream, unzipDir); - } - - // check it is unique - assertTrue(Files.exists(unzipDataDir)); - Path[] list = FileSystemUtils.files(unzipDataDir); - if (list.length != 1) { - throw new IllegalStateException("Backwards index must contain exactly one cluster"); - } - - // the bwc scripts packs the indices under this path - Path src = list[0].resolve("nodes/0/indices/" + indexName); - assertTrue("[" + indexFile + "] missing index dir: " + src.toString(), Files.exists(src)); - - if (randomBoolean()) { - logger.info("--> injecting index [{}] into single data path", indexName); - copyIndex(logger, src, indexName, singleDataPath); - } else { - logger.info("--> injecting index [{}] into multi data path", indexName); - copyIndex(logger, src, indexName, multiDataPath); - } - return indexName; + OldIndexUtils.upgradeIndexFolder(internalCluster(), singleDataPathNodeName); + OldIndexUtils.upgradeIndexFolder(internalCluster(), multiDataPathNodeName); } void importIndex(String indexName) throws IOException { @@ -230,44 +171,6 @@ void importIndex(String indexName) throws IOException { ensureGreen(indexName); } - // randomly distribute the files from src over dests paths - public static void copyIndex(final ESLogger logger, final Path src, final String indexName, final Path... dests) throws IOException { - Path destinationDataPath = dests[randomInt(dests.length - 1)]; - for (Path dest : dests) { - Path indexDir = dest.resolve(indexName); - assertFalse(Files.exists(indexDir)); - Files.createDirectories(indexDir); - } - Files.walkFileTree(src, new SimpleFileVisitor() { - @Override - public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException { - Path relativeDir = src.relativize(dir); - for (Path dest : dests) { - Path destDir = dest.resolve(indexName).resolve(relativeDir); - Files.createDirectories(destDir); - } - return FileVisitResult.CONTINUE; - } - - @Override - public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { - if (file.getFileName().toString().equals(IndexWriter.WRITE_LOCK_NAME)) { - // skip lock file, we don't need it - logger.trace("Skipping lock file: {}", file); - return FileVisitResult.CONTINUE; - } - - Path relativeFile = src.relativize(file); - Path destFile = destinationDataPath.resolve(indexName).resolve(relativeFile); - logger.trace("--> Moving {} to {}", relativeFile, destFile); - Files.move(file, destFile); - assertFalse(Files.exists(file)); - assertTrue(Files.exists(destFile)); - return FileVisitResult.CONTINUE; - } - }); - } - void unloadIndex(String indexName) throws Exception { assertAcked(client().admin().indices().prepareDelete(indexName).get()); } @@ -295,7 +198,7 @@ public void testAllVersionsTested() throws Exception { fail(msg.toString()); } } - + public void testOldIndexes() throws Exception { setupCluster(); @@ -309,8 +212,18 @@ public void testOldIndexes() throws Exception { } void assertOldIndexWorks(String index) throws Exception { - Version version = extractVersion(index); - String indexName = loadIndex(index); + Version version = OldIndexUtils.extractVersion(index); + Path[] paths; + if (randomBoolean()) { + logger.info("--> injecting index [{}] into single data path", index); + paths = new Path[]{singleDataPath}; + } else { + logger.info("--> injecting index [{}] into multi data path", index); + paths = multiDataPath; + } + + String indexName = index.replace(".zip", "").toLowerCase(Locale.ROOT).replace("unsupported-", "index-"); + OldIndexUtils.loadIndex(indexName, index, createTempDir(), getBwcIndicesPath(), logger, paths); // we explicitly upgrade the index folders as these indices // are imported as dangling indices and not available on // node startup @@ -322,21 +235,12 @@ void assertOldIndexWorks(String index) throws Exception { assertBasicAggregationWorks(indexName); assertRealtimeGetWorks(indexName); assertNewReplicasWork(indexName); - assertUpgradeWorks(indexName, isLatestLuceneVersion(version)); + assertUpgradeWorks(client(), indexName, version); assertDeleteByQueryWorked(indexName, version); assertPositionIncrementGapDefaults(indexName, version); unloadIndex(indexName); } - Version extractVersion(String index) { - return Version.fromString(index.substring(index.indexOf('-') + 1, index.lastIndexOf('.'))); - } - - boolean isLatestLuceneVersion(Version version) { - return version.luceneVersion.major == Version.CURRENT.luceneVersion.major && - version.luceneVersion.minor == Version.CURRENT.luceneVersion.minor; - } - void assertIndexSanity(String indexName, Version indexCreated) { GetIndexResponse getIndexResponse = client().admin().indices().prepareGetIndex().addIndices(indexName).get(); assertEquals(1, getIndexResponse.indices().length); @@ -411,7 +315,8 @@ void assertAllSearchWorks(String indexName) { void assertBasicAggregationWorks(String indexName) { // histogram on a long - SearchResponse searchRsp = client().prepareSearch(indexName).addAggregation(AggregationBuilders.histogram("histo").field("long_sort").interval(10)).get(); + SearchResponse searchRsp = client().prepareSearch(indexName).addAggregation(AggregationBuilders.histogram("histo").field + ("long_sort").interval(10)).get(); ElasticsearchAssertions.assertSearchResponse(searchRsp); Histogram histo = searchRsp.getAggregations().get("histo"); assertNotNull(histo); @@ -454,7 +359,7 @@ void assertNewReplicasWork(String indexName) throws Exception { final long startTime = System.currentTimeMillis(); logger.debug("--> creating [{}] replicas for index [{}]", numReplicas, indexName); assertAcked(client().admin().indices().prepareUpdateSettings(indexName).setSettings(Settings.builder() - .put("number_of_replicas", numReplicas) + .put("number_of_replicas", numReplicas) ).execute().actionGet()); ensureGreen(TimeValue.timeValueMinutes(2), indexName); logger.debug("--> index [{}] is green, took [{}]", indexName, TimeValue.timeValueMillis(System.currentTimeMillis() - startTime)); @@ -482,14 +387,6 @@ void assertPositionIncrementGapDefaults(String indexName, Version version) throw } } - void assertUpgradeWorks(String indexName, boolean alreadyLatest) throws Exception { - if (alreadyLatest == false) { - UpgradeIT.assertNotUpgraded(client(), indexName); - } - assertNoFailures(client().admin().indices().prepareUpgrade(indexName).get()); - UpgradeIT.assertUpgraded(client(), indexName); - } - private Path getNodeDir(String indexFile) throws IOException { Path unzipDir = createTempDir(); Path unzipDataDir = unzipDir.resolve("data"); diff --git a/core/src/test/java/org/elasticsearch/common/util/IndexFolderUpgraderTests.java b/core/src/test/java/org/elasticsearch/common/util/IndexFolderUpgraderTests.java index 5302ba8d55c1b..94fc0d8875229 100644 --- a/core/src/test/java/org/elasticsearch/common/util/IndexFolderUpgraderTests.java +++ b/core/src/test/java/org/elasticsearch/common/util/IndexFolderUpgraderTests.java @@ -23,7 +23,6 @@ import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.TestUtil; import org.elasticsearch.Version; -import org.elasticsearch.bwcompat.OldIndexBackwardsCompatibilityIT; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.common.UUIDs; @@ -39,6 +38,7 @@ import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.index.shard.ShardStateMetaData; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.OldIndexUtils; import java.io.BufferedWriter; import java.io.FileNotFoundException; @@ -215,7 +215,7 @@ public void testUpgradeRealIndex() throws IOException, URISyntaxException { assertTrue("[" + path + "] missing index dir: " + src.toString(), Files.exists(src)); final Path indicesPath = randomFrom(nodeEnvironment.nodePaths()).indicesPath; logger.info("--> injecting index [{}] into [{}]", indexName, indicesPath); - OldIndexBackwardsCompatibilityIT.copyIndex(logger, src, indexName, indicesPath); + OldIndexUtils.copyIndex(logger, src, indexName, indicesPath); IndexFolderUpgrader.upgradeIndicesIfNeeded(Settings.EMPTY, nodeEnvironment); // ensure old index folder is deleted diff --git a/test/framework/src/main/java/org/elasticsearch/test/OldIndexUtils.java b/test/framework/src/main/java/org/elasticsearch/test/OldIndexUtils.java new file mode 100644 index 0000000000000..e1967256ddbff --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/test/OldIndexUtils.java @@ -0,0 +1,222 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test; + +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.util.TestUtil; +import org.elasticsearch.Version; +import org.elasticsearch.action.admin.indices.segments.IndexSegments; +import org.elasticsearch.action.admin.indices.segments.IndexShardSegments; +import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.elasticsearch.action.admin.indices.segments.ShardSegments; +import org.elasticsearch.action.admin.indices.upgrade.get.IndexUpgradeStatus; +import org.elasticsearch.action.admin.indices.upgrade.get.UpgradeStatusResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; +import org.elasticsearch.common.io.FileSystemUtils; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.IndexFolderUpgrader; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.MergePolicyConfig; +import org.elasticsearch.index.engine.Segment; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.DirectoryStream; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static junit.framework.TestCase.assertFalse; +import static junit.framework.TestCase.assertTrue; +import static org.elasticsearch.test.ESTestCase.randomInt; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.junit.Assert.assertEquals; + + +public class OldIndexUtils { + + public static List loadIndexesList(String prefix, Path bwcIndicesPath) throws IOException { + List indexes = new ArrayList<>(); + try (DirectoryStream stream = Files.newDirectoryStream(bwcIndicesPath, prefix + "-*.zip")) { + for (Path path : stream) { + indexes.add(path.getFileName().toString()); + } + } + Collections.sort(indexes); + return indexes; + } + + public static Settings getSettings() { + return Settings.builder() + .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) // disable merging so no segments will be upgraded + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 30) // + // speed up recoveries + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 30) + .build(); + } + + public static void upgradeIndexFolder(InternalTestCluster cluster, String nodeName) throws Exception { + final NodeEnvironment nodeEnvironment = cluster.getInstance(NodeEnvironment.class, nodeName); + IndexFolderUpgrader.upgradeIndicesIfNeeded(Settings.EMPTY, nodeEnvironment); + } + + public static void loadIndex(String indexName, String indexFile, Path unzipDir, Path bwcPath, ESLogger logger, Path... paths) throws + Exception { + Path unzipDataDir = unzipDir.resolve("data"); + + Path backwardsIndex = bwcPath.resolve(indexFile); + // decompress the index + try (InputStream stream = Files.newInputStream(backwardsIndex)) { + TestUtil.unzip(stream, unzipDir); + } + + // check it is unique + assertTrue(Files.exists(unzipDataDir)); + Path[] list = FileSystemUtils.files(unzipDataDir); + if (list.length != 1) { + throw new IllegalStateException("Backwards index must contain exactly one cluster"); + } + + // the bwc scripts packs the indices under this path + Path src = list[0].resolve("nodes/0/indices/" + indexName); + assertTrue("[" + indexFile + "] missing index dir: " + src.toString(), Files.exists(src)); + copyIndex(logger, src, indexName, paths); + } + + public static void assertNotUpgraded(Client client, String... index) throws Exception { + for (IndexUpgradeStatus status : getUpgradeStatus(client, index)) { + assertTrue("index " + status.getIndex() + " should not be zero sized", status.getTotalBytes() != 0); + // TODO: it would be better for this to be strictly greater, but sometimes an extra flush + // mysteriously happens after the second round of docs are indexed + assertTrue("index " + status.getIndex() + " should have recovered some segments from transaction log", + status.getTotalBytes() >= status.getToUpgradeBytes()); + assertTrue("index " + status.getIndex() + " should need upgrading", status.getToUpgradeBytes() != 0); + } + } + + @SuppressWarnings("unchecked") + public static Collection getUpgradeStatus(Client client, String... indices) throws Exception { + UpgradeStatusResponse upgradeStatusResponse = client.admin().indices().prepareUpgradeStatus(indices).get(); + assertNoFailures(upgradeStatusResponse); + return upgradeStatusResponse.getIndices().values(); + } + + // randomly distribute the files from src over dests paths + public static void copyIndex(final ESLogger logger, final Path src, final String indexName, final Path... dests) throws IOException { + Path destinationDataPath = dests[randomInt(dests.length - 1)]; + for (Path dest : dests) { + Path indexDir = dest.resolve(indexName); + assertFalse(Files.exists(indexDir)); + Files.createDirectories(indexDir); + } + Files.walkFileTree(src, new SimpleFileVisitor() { + @Override + public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException { + Path relativeDir = src.relativize(dir); + for (Path dest : dests) { + Path destDir = dest.resolve(indexName).resolve(relativeDir); + Files.createDirectories(destDir); + } + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + if (file.getFileName().toString().equals(IndexWriter.WRITE_LOCK_NAME)) { + // skip lock file, we don't need it + logger.trace("Skipping lock file: {}", file); + return FileVisitResult.CONTINUE; + } + + Path relativeFile = src.relativize(file); + Path destFile = destinationDataPath.resolve(indexName).resolve(relativeFile); + logger.trace("--> Moving {} to {}", relativeFile, destFile); + Files.move(file, destFile); + assertFalse(Files.exists(file)); + assertTrue(Files.exists(destFile)); + return FileVisitResult.CONTINUE; + } + }); + } + + public static void assertUpgraded(Client client, String... index) throws Exception { + for (IndexUpgradeStatus status : getUpgradeStatus(client, index)) { + assertTrue("index " + status.getIndex() + " should not be zero sized", status.getTotalBytes() != 0); + assertEquals("index " + status.getIndex() + " should be upgraded", + 0, status.getToUpgradeBytes()); + } + + // double check using the segments api that all segments are actually upgraded + IndicesSegmentResponse segsRsp; + if (index == null) { + segsRsp = client.admin().indices().prepareSegments().execute().actionGet(); + } else { + segsRsp = client.admin().indices().prepareSegments(index).execute().actionGet(); + } + for (IndexSegments indexSegments : segsRsp.getIndices().values()) { + for (IndexShardSegments shard : indexSegments) { + for (ShardSegments segs : shard.getShards()) { + for (Segment seg : segs.getSegments()) { + assertEquals("Index " + indexSegments.getIndex() + " has unupgraded segment " + seg.toString(), + Version.CURRENT.luceneVersion.major, seg.version.major); + assertEquals("Index " + indexSegments.getIndex() + " has unupgraded segment " + seg.toString(), + Version.CURRENT.luceneVersion.minor, seg.version.minor); + } + } + } + } + } + + public static boolean isUpgraded(Client client, String index) throws Exception { + ESLogger logger = Loggers.getLogger(OldIndexUtils.class); + int toUpgrade = 0; + for (IndexUpgradeStatus status : getUpgradeStatus(client, index)) { + logger.info("Index: {}, total: {}, toUpgrade: {}", status.getIndex(), status.getTotalBytes(), status.getToUpgradeBytes()); + toUpgrade += status.getToUpgradeBytes(); + } + return toUpgrade == 0; + } + + public static void assertUpgradeWorks(Client client, String indexName, Version version) throws Exception { + if (OldIndexUtils.isLatestLuceneVersion(version) == false) { + OldIndexUtils.assertNotUpgraded(client, indexName); + } + assertNoFailures(client.admin().indices().prepareUpgrade(indexName).get()); + assertUpgraded(client, indexName); + } + + public static Version extractVersion(String index) { + return Version.fromString(index.substring(index.indexOf('-') + 1, index.lastIndexOf('.'))); + } + + public static boolean isLatestLuceneVersion(Version version) { + return version.luceneVersion.major == Version.CURRENT.luceneVersion.major && + version.luceneVersion.minor == Version.CURRENT.luceneVersion.minor; + } +}