From 6f24de17dec82ed8dfe8b1dab7ef41ad3a02bc73 Mon Sep 17 00:00:00 2001 From: Armin Date: Thu, 4 Apr 2019 21:00:17 +0200 Subject: [PATCH 01/18] Add Eventually Consistent Mock Repository * Reproduce S3 semantics exactly --- .../common/blobstore/BlobPath.java | 16 ++ .../snapshots/SnapshotResiliencyTests.java | 47 ++-- .../MockEventuallyConsistentRepository.java | 210 ++++++++++++++++++ 3 files changed, 260 insertions(+), 13 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/BlobPath.java b/server/src/main/java/org/elasticsearch/common/blobstore/BlobPath.java index ea02aebb0aaad..d4d15850771fb 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/BlobPath.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/BlobPath.java @@ -76,4 +76,20 @@ public String toString() { } return sb.toString(); } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + return paths.equals(((BlobPath) o).paths); + } + + @Override + public int hashCode() { + return paths.hashCode(); + } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 32cb9f4a9d8f5..ad141cdee4aaa 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -148,6 +148,7 @@ import org.elasticsearch.search.SearchService; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.fetch.FetchPhase; +import org.elasticsearch.snapshots.mockstore.MockEventuallyConsistentRepository; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.disruption.DisruptableMockTransport; import org.elasticsearch.test.disruption.NetworkDisruption; @@ -199,9 +200,19 @@ public class SnapshotResiliencyTests extends ESTestCase { private Path tempDir; + /** + * Whether to use eventually consistent blob store in tests. + */ + private boolean eventuallyConsistent; + + private MockEventuallyConsistentRepository.Context blobStoreContext; @Before public void createServices() { tempDir = createTempDir(); + eventuallyConsistent = randomBoolean(); + if (eventuallyConsistent) { + blobStoreContext = new MockEventuallyConsistentRepository.Context(); + } deterministicTaskQueue = new DeterministicTaskQueue(Settings.builder().put(NODE_NAME_SETTING.getKey(), "shared").build(), random()); } @@ -892,19 +903,7 @@ public void onFailure(final Exception e) { final IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(); repositoriesService = new RepositoriesService( settings, clusterService, transportService, - Collections.singletonMap(FsRepository.TYPE, metaData -> { - final Repository repository = new FsRepository(metaData, environment, xContentRegistry()) { - @Override - protected void assertSnapshotOrGenericThread() { - // eliminate thread name check as we create repo in the test thread - } - }; - repository.start(); - return repository; - } - ), - emptyMap(), - threadPool + Collections.singletonMap(FsRepository.TYPE, getRepoFactory(environment)), emptyMap(), threadPool ); snapshotsService = new SnapshotsService(settings, clusterService, indexNameExpressionResolver, repositoriesService, threadPool); @@ -1085,6 +1084,28 @@ searchTransportService, new SearchPhaseController(searchService::createReduceCon client.initialize(actions, () -> clusterService.localNode().getId(), transportService.getRemoteClusterService()); } + private Repository.Factory getRepoFactory(final Environment environment) { + // Run half the tests with the eventually consistent repository + if (eventuallyConsistent) { + return metaData -> { + final Repository repository = new MockEventuallyConsistentRepository( + metaData, environment, xContentRegistry(), deterministicTaskQueue, blobStoreContext); + repository.start(); + return repository; + }; + } else { + return metaData -> { + final Repository repository = new FsRepository(metaData, environment, xContentRegistry()) { + @Override + protected void assertSnapshotOrGenericThread() { + // eliminate thread name check as we create repo in the test thread + } + }; + repository.start(); + return repository; + }; + } + } public void restart() { testClusterNodes.disconnectNode(this); final ClusterState oldState = this.clusterService.state(); diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java new file mode 100644 index 0000000000000..58d4983f4b306 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java @@ -0,0 +1,210 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.snapshots.mockstore; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.coordination.DeterministicTaskQueue; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.blobstore.BlobMetaData; +import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.BlobStore; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.io.PathUtils; +import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.env.Environment; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.fs.FsRepository; +import org.elasticsearch.snapshots.SnapshotId; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Mock Repository that simulates the mechanics of an eventually consistent blobstore. + */ +public class MockEventuallyConsistentRepository extends FsRepository { + + private final DeterministicTaskQueue deterministicTaskQueue; + + private final Context context; + + public MockEventuallyConsistentRepository(RepositoryMetaData metadata, Environment environment, + NamedXContentRegistry namedXContentRegistry, DeterministicTaskQueue deterministicTaskQueue, Context context) { + super(overrideSettings(metadata, environment), environment, namedXContentRegistry); + this.deterministicTaskQueue = deterministicTaskQueue; + this.context = context; + } + + @Override + public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData clusterMetadata) { + super.initializeSnapshot(snapshotId, indices, clusterMetadata); + } + + @Override + protected void assertSnapshotOrGenericThread() { + // eliminate thread name check as we create repo in the test thread + } + + private static RepositoryMetaData overrideSettings(RepositoryMetaData metadata, Environment environment) { + // TODO: use another method of testing not being able to read the test file written by the master... + // this is super duper hacky + if (metadata.settings().getAsBoolean("localize_location", false)) { + Path location = PathUtils.get(metadata.settings().get("location")); + location = location.resolve(Integer.toString(environment.hashCode())); + return new RepositoryMetaData(metadata.name(), metadata.type(), + Settings.builder().put(metadata.settings()).put("location", location.toAbsolutePath()).build()); + } else { + return metadata; + } + } + + @Override + protected void doStop() { + super.doStop(); + } + + @Override + protected BlobStore createBlobStore() throws Exception { + return new MockBlobStore(super.createBlobStore()); + } + + public static final class Context { + + private final Map, Map>> state = new HashMap<>(); + + public Tuple, Map> getState(BlobPath path) { + return state.computeIfAbsent(path, p -> new Tuple<>(new HashSet<>(), new HashMap<>())); + } + + } + + private class MockBlobStore extends BlobStoreWrapper { + + MockBlobStore(BlobStore delegate) { + super(delegate); + } + + @Override + public BlobContainer blobContainer(BlobPath path) { + return new MockBlobContainer(super.blobContainer(path), context.getState(path)); + } + + private class MockBlobContainer extends BlobContainerWrapper { + + private final Set cachedMisses; + + private final Map pendingWrites; + + MockBlobContainer(BlobContainer delegate, Tuple, Map> state) { + super(delegate); + cachedMisses = state.v1(); + pendingWrites = state.v2(); + } + + @Override + public boolean blobExists(String blobName) { + ensureReadAfterWrite(blobName); + final boolean result = super.blobExists(blobName); + if (result == false) { + cachedMisses.add(blobName); + } + return result; + } + + @Override + public InputStream readBlob(String name) throws IOException { + ensureReadAfterWrite(name); + return super.readBlob(name); + } + + private void ensureReadAfterWrite(String blobName) { + if (cachedMisses.contains(blobName) == false && pendingWrites.containsKey(blobName)) { + pendingWrites.remove(blobName).run(); + } + } + + @Override + public void deleteBlob(String blobName) throws IOException { + super.deleteBlob(blobName); + } + + @Override + public void deleteBlobIgnoringIfNotExists(String blobName) throws IOException { + super.deleteBlobIgnoringIfNotExists(blobName); + } + + @Override + public Map listBlobs() throws IOException { + return super.listBlobs(); + } + + @Override + public Map listBlobsByPrefix(String blobNamePrefix) throws IOException { + return super.listBlobsByPrefix(blobNamePrefix); + } + + @Override + public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) + throws IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + Streams.copy(inputStream, baos); + pendingWrites.put(blobName, () -> { + try { + super.writeBlob(blobName, new ByteArrayInputStream(baos.toByteArray()), blobSize, failIfAlreadyExists); + if (cachedMisses.contains(blobName)) { + deterministicTaskQueue.scheduleNow(() -> cachedMisses.remove(blobName)); + } + } catch (NoSuchFileException | FileAlreadyExistsException e) { + // Ignoring, assuming a previous concurrent delete removed the parent path and that overwrites are not + // detectable with this kind of store + } catch (IOException e) { + throw new AssertionError(e); + } + }); + deterministicTaskQueue.scheduleNow(() -> { + if (pendingWrites.containsKey(blobName)) { + pendingWrites.remove(blobName).run(); + } + }); + } + + @Override + public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize, + final boolean failIfAlreadyExists) throws IOException { + writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists); + } + } + } +} From 15fe43f1e185e749d6ce1590ddb58dfe9ddf642c Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 4 Apr 2019 21:35:00 +0200 Subject: [PATCH 02/18] bck --- .../snapshots/mockstore/MockEventuallyConsistentRepository.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java index 58d4983f4b306..06b2a084eac50 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java @@ -19,8 +19,6 @@ package org.elasticsearch.snapshots.mockstore; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.coordination.DeterministicTaskQueue; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; From c346c3d8381b850136b40eb25ed0af6418f7c76b Mon Sep 17 00:00:00 2001 From: Armin Date: Fri, 5 Apr 2019 07:20:37 +0200 Subject: [PATCH 03/18] better --- .../MockEventuallyConsistentRepository.java | 61 ++++--------------- 1 file changed, 12 insertions(+), 49 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java index df04627f971d5..562425446fdad 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java @@ -20,32 +20,25 @@ package org.elasticsearch.snapshots.mockstore; import org.elasticsearch.cluster.coordination.DeterministicTaskQueue; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.common.blobstore.BlobContainer; -import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.common.io.Streams; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; -import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.fs.FsRepository; -import org.elasticsearch.snapshots.SnapshotId; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.file.DirectoryNotEmptyException; import java.nio.file.FileAlreadyExistsException; import java.nio.file.NoSuchFileException; -import java.nio.file.Path; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; @@ -60,39 +53,16 @@ public class MockEventuallyConsistentRepository extends FsRepository { public MockEventuallyConsistentRepository(RepositoryMetaData metadata, Environment environment, NamedXContentRegistry namedXContentRegistry, DeterministicTaskQueue deterministicTaskQueue, Context context) { - super(overrideSettings(metadata, environment), environment, namedXContentRegistry, deterministicTaskQueue.getThreadPool()); + super(metadata, environment, namedXContentRegistry, deterministicTaskQueue.getThreadPool()); this.deterministicTaskQueue = deterministicTaskQueue; this.context = context; } - @Override - public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData clusterMetadata) { - super.initializeSnapshot(snapshotId, indices, clusterMetadata); - } - @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo in the test thread } - private static RepositoryMetaData overrideSettings(RepositoryMetaData metadata, Environment environment) { - // TODO: use another method of testing not being able to read the test file written by the master... - // this is super duper hacky - if (metadata.settings().getAsBoolean("localize_location", false)) { - Path location = PathUtils.get(metadata.settings().get("location")); - location = location.resolve(Integer.toString(environment.hashCode())); - return new RepositoryMetaData(metadata.name(), metadata.type(), - Settings.builder().put(metadata.settings()).put("location", location.toAbsolutePath()).build()); - } else { - return metadata; - } - } - - @Override - protected void doStop() { - super.doStop(); - } - @Override protected BlobStore createBlobStore() throws Exception { return new MockBlobStore(super.createBlobStore()); @@ -154,23 +124,16 @@ private void ensureReadAfterWrite(String blobName) { } @Override - public void deleteBlob(String blobName) throws IOException { - super.deleteBlob(blobName); - } - - @Override - public void deleteBlobIgnoringIfNotExists(String blobName) throws IOException { - super.deleteBlobIgnoringIfNotExists(blobName); - } - - @Override - public Map listBlobs() throws IOException { - return super.listBlobs(); - } - - @Override - public Map listBlobsByPrefix(String blobNamePrefix) throws IOException { - return super.listBlobsByPrefix(blobNamePrefix); + public void deleteBlob(String blobName) { + deterministicTaskQueue.scheduleNow(() -> { + try { + super.deleteBlob(blobName); + } catch (DirectoryNotEmptyException | NoSuchFileException e) { + // ignored + } catch (IOException e) { + throw new AssertionError(e); + } + }); } @Override From ab8da427506ca08598584e179eaa6b17e86f23cb Mon Sep 17 00:00:00 2001 From: Armin Date: Fri, 5 Apr 2019 07:23:37 +0200 Subject: [PATCH 04/18] repeat 5000 --- .../snapshots/mockstore/MockEventuallyConsistentRepository.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java index 562425446fdad..b402c97a91c31 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java @@ -19,6 +19,7 @@ package org.elasticsearch.snapshots.mockstore; +import com.carrotsearch.randomizedtesting.annotations.Repeat; import org.elasticsearch.cluster.coordination.DeterministicTaskQueue; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.common.blobstore.BlobContainer; @@ -45,6 +46,7 @@ /** * Mock Repository that simulates the mechanics of an eventually consistent blobstore. */ +@Repeat(iterations = 5000) public class MockEventuallyConsistentRepository extends FsRepository { private final DeterministicTaskQueue deterministicTaskQueue; From 8713682d0f044a1c7fdc805642c530a25d5d99a5 Mon Sep 17 00:00:00 2001 From: Armin Date: Fri, 5 Apr 2019 07:26:21 +0200 Subject: [PATCH 05/18] repeat 5000 --- .../org/elasticsearch/snapshots/SnapshotResiliencyTests.java | 2 ++ .../snapshots/mockstore/MockEventuallyConsistentRepository.java | 2 -- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index d4f8024fb02d5..2029ead4f6380 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.snapshots; +import com.carrotsearch.randomizedtesting.annotations.Repeat; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; @@ -192,6 +193,7 @@ import static org.hamcrest.Matchers.hasSize; import static org.mockito.Mockito.mock; +@Repeat(iterations = 5000) public class SnapshotResiliencyTests extends ESTestCase { private DeterministicTaskQueue deterministicTaskQueue; diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java index b402c97a91c31..562425446fdad 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java @@ -19,7 +19,6 @@ package org.elasticsearch.snapshots.mockstore; -import com.carrotsearch.randomizedtesting.annotations.Repeat; import org.elasticsearch.cluster.coordination.DeterministicTaskQueue; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.common.blobstore.BlobContainer; @@ -46,7 +45,6 @@ /** * Mock Repository that simulates the mechanics of an eventually consistent blobstore. */ -@Repeat(iterations = 5000) public class MockEventuallyConsistentRepository extends FsRepository { private final DeterministicTaskQueue deterministicTaskQueue; From 86e3d627b1466874108edd939efd0c9244171817 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 5 Apr 2019 12:51:41 +0200 Subject: [PATCH 06/18] bck --- .../snapshots/SnapshotResiliencyTests.java | 30 +++++++++---------- .../MockEventuallyConsistentRepository.java | 21 +++++++++++-- 2 files changed, 32 insertions(+), 19 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 2029ead4f6380..0b6b18bc61e4a 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.snapshots; -import com.carrotsearch.randomizedtesting.annotations.Repeat; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; @@ -107,6 +106,7 @@ import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.ClusterSettings; @@ -193,7 +193,6 @@ import static org.hamcrest.Matchers.hasSize; import static org.mockito.Mockito.mock; -@Repeat(iterations = 5000) public class SnapshotResiliencyTests extends ESTestCase { private DeterministicTaskQueue deterministicTaskQueue; @@ -203,16 +202,15 @@ public class SnapshotResiliencyTests extends ESTestCase { private Path tempDir; /** - * Whether to use eventually consistent blob store in tests. + * Context shared by all the node's {@link Repository} instances if the eventually consistent blobstore is to be used. + * {@code null} if not using the eventually consistent blobstore. */ - private boolean eventuallyConsistent; + @Nullable private MockEventuallyConsistentRepository.Context blobStoreContext; - private MockEventuallyConsistentRepository.Context blobStoreContext; @Before public void createServices() { tempDir = createTempDir(); - eventuallyConsistent = randomBoolean(); - if (eventuallyConsistent) { + if (randomBoolean()) { blobStoreContext = new MockEventuallyConsistentRepository.Context(); } deterministicTaskQueue = @@ -1086,16 +1084,9 @@ searchTransportService, new SearchPhaseController(searchService::createReduceCon client.initialize(actions, () -> clusterService.localNode().getId(), transportService.getRemoteClusterService()); } - private Repository.Factory getRepoFactory(final Environment environment) { + private Repository.Factory getRepoFactory(Environment environment) { // Run half the tests with the eventually consistent repository - if (eventuallyConsistent) { - return metaData -> { - final Repository repository = new MockEventuallyConsistentRepository( - metaData, environment, xContentRegistry(), deterministicTaskQueue, blobStoreContext); - repository.start(); - return repository; - }; - } else { + if (blobStoreContext == null) { return metaData -> { final Repository repository = new FsRepository(metaData, environment, xContentRegistry(), threadPool) { @Override @@ -1106,6 +1097,13 @@ protected void assertSnapshotOrGenericThread() { repository.start(); return repository; }; + } else { + return metaData -> { + final Repository repository = new MockEventuallyConsistentRepository( + metaData, environment, xContentRegistry(), deterministicTaskQueue, blobStoreContext); + repository.start(); + return repository; + }; } } public void restart() { diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java index 562425446fdad..17202e8a6f4dd 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java @@ -43,7 +43,14 @@ import java.util.Set; /** - * Mock Repository that simulates the mechanics of an eventually consistent blobstore. + * Mock Repository that simulates the eventually consistent behaviour of AWS S3 as documented in the + * AWS S3 docs. + * Specifically this implementation simulates: + *
    + *
  • First read after write is consistent for each blob. (see S3 docs for specifics)
  • + *
  • Deletes and updates to a blob can become visible with a delay.
  • + *
  • Blobs can become visible to list operations with a delay.
  • + *
*/ public class MockEventuallyConsistentRepository extends FsRepository { @@ -68,14 +75,20 @@ protected BlobStore createBlobStore() throws Exception { return new MockBlobStore(super.createBlobStore()); } + /** + * Context that must be shared between all instances of {@link MockEventuallyConsistentRepository} in a test run. + */ public static final class Context { + /** + * Map of blob path to a tuple of cached non-existent blobs in them and a map of child blob name to {@link Runnable} that when + * executed will create the blob. + */ private final Map, Map>> state = new HashMap<>(); public Tuple, Map> getState(BlobPath path) { return state.computeIfAbsent(path, p -> new Tuple<>(new HashSet<>(), new HashMap<>())); } - } private class MockBlobStore extends BlobStoreWrapper { @@ -125,11 +138,12 @@ private void ensureReadAfterWrite(String blobName) { @Override public void deleteBlob(String blobName) { + // TODO: simulate longer delays here once the S3 blob store implementation can handle them deterministicTaskQueue.scheduleNow(() -> { try { super.deleteBlob(blobName); } catch (DirectoryNotEmptyException | NoSuchFileException e) { - // ignored + // ignored since neither of these exceptions would occur on S3 } catch (IOException e) { throw new AssertionError(e); } @@ -154,6 +168,7 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b throw new AssertionError(e); } }); + // TODO: simulate longer delays here once the S3 blob store implementation can handle them deterministicTaskQueue.scheduleNow(() -> { if (pendingWrites.containsKey(blobName)) { pendingWrites.remove(blobName).run(); From e320ebcebdb81650c67a1cefe17c551dcaec4caa Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 17 Apr 2019 12:59:42 +0200 Subject: [PATCH 07/18] CR: Fix incorrect consistency and deleting of empty trees --- .../snapshots/SnapshotResiliencyTests.java | 34 +++++++++++++++++++ .../MockEventuallyConsistentRepository.java | 6 +++- 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 955d951ad077f..bf4ab08b7b5c8 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -171,8 +171,12 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.file.DirectoryNotEmptyException; +import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -538,6 +542,7 @@ private void assertNoStaleRepositoryData() throws IOException { repos = reposDir.filter(s -> s.getFileName().toString().startsWith("extra") == false).collect(Collectors.toList()); } for (Path repoRoot : repos) { + cleanupEmptyTrees(repoRoot); final Path latestIndexGenBlob = repoRoot.resolve("index.latest"); assertTrue("Could not find index.latest blob for repo at [" + repoRoot + ']', Files.exists(latestIndexGenBlob)); final long latestGen = ByteBuffer.wrap(Files.readAllBytes(latestIndexGenBlob)).getLong(0); @@ -553,6 +558,35 @@ private void assertNoStaleRepositoryData() throws IOException { } } + // Lucene's mock file system randomly generates empty `extra0` files that break the deletion of blob-store directories. + // We clean those up here before checking a blob-store for stale files in this test. + private void cleanupEmptyTrees(Path repoPath) { + try { + Files.walkFileTree(repoPath, new SimpleFileVisitor<>() { + + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + if (file.getFileName().toString().startsWith("extra")) { + Files.delete(file); + } + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { + try { + Files.delete(dir); + } catch (DirectoryNotEmptyException e) { + // We're only interested in deleting empty trees here, just ignore directories with content + } + return FileVisitResult.CONTINUE; + } + }); + } catch (IOException e) { + throw new AssertionError(e); + } + } + private static void assertIndexGenerations(Path repoRoot, long latestGen) throws IOException { try (Stream repoRootBlobs = Files.list(repoRoot)) { final long[] indexGenerations = repoRootBlobs.filter(p -> p.getFileName().toString().startsWith("index-")) diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java index 17202e8a6f4dd..f4034fa56879c 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java @@ -131,13 +131,17 @@ public InputStream readBlob(String name) throws IOException { } private void ensureReadAfterWrite(String blobName) { - if (cachedMisses.contains(blobName) == false && pendingWrites.containsKey(blobName)) { + // TODO: Make this even less consistent by keeping track of blobs that existed before instead of simply ensuring + // read after first write again if the blob doesn't exist + if (cachedMisses.contains(blobName) == false && pendingWrites.containsKey(blobName) + && super.blobExists(blobName) == false) { pendingWrites.remove(blobName).run(); } } @Override public void deleteBlob(String blobName) { + ensureReadAfterWrite(blobName); // TODO: simulate longer delays here once the S3 blob store implementation can handle them deterministicTaskQueue.scheduleNow(() -> { try { From 7b5c2e612200eeb092773029aeb4d53f7a269ffb Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 17 Apr 2019 13:17:43 +0200 Subject: [PATCH 08/18] CR: Fix incorrect consistency and deleting of empty trees --- .../org/elasticsearch/snapshots/SnapshotResiliencyTests.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index bf4ab08b7b5c8..75a4f2e8e94c9 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -575,7 +575,10 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO @Override public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { try { - Files.delete(dir); + // don't delete the root of the repository + if (repoPath.equals(dir) == false) { + Files.delete(dir); + } } catch (DirectoryNotEmptyException e) { // We're only interested in deleting empty trees here, just ignore directories with content } From dfffe74de5a190e1e123b47b57443baf2c1e0c5a Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 17 Apr 2019 13:51:37 +0200 Subject: [PATCH 09/18] handle empty dirs like on S3/GCS --- .../snapshots/SnapshotResiliencyTests.java | 29 ++++++++++++++----- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 75a4f2e8e94c9..68872afdc6bc5 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -169,11 +169,13 @@ import org.junit.After; import org.junit.Before; +import java.io.FileNotFoundException; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.file.DirectoryNotEmptyException; import java.nio.file.FileVisitResult; import java.nio.file.Files; +import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; @@ -538,7 +540,7 @@ public void run() { private void assertNoStaleRepositoryData() throws IOException { final Path repoPath = tempDir.resolve("repo").toAbsolutePath(); final List repos; - try (Stream reposDir = Files.list(repoPath)) { + try (Stream reposDir = repoFilesByPrefix(repoPath)) { repos = reposDir.filter(s -> s.getFileName().toString().startsWith("extra") == false).collect(Collectors.toList()); } for (Path repoRoot : repos) { @@ -575,10 +577,7 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO @Override public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { try { - // don't delete the root of the repository - if (repoPath.equals(dir) == false) { - Files.delete(dir); - } + Files.delete(dir); } catch (DirectoryNotEmptyException e) { // We're only interested in deleting empty trees here, just ignore directories with content } @@ -591,7 +590,7 @@ public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOEx } private static void assertIndexGenerations(Path repoRoot, long latestGen) throws IOException { - try (Stream repoRootBlobs = Files.list(repoRoot)) { + try (Stream repoRootBlobs = repoFilesByPrefix(repoRoot)) { final long[] indexGenerations = repoRootBlobs.filter(p -> p.getFileName().toString().startsWith("index-")) .map(p -> p.getFileName().toString().replace("index-", "")) .mapToLong(Long::parseLong).sorted().toArray(); @@ -603,7 +602,7 @@ private static void assertIndexGenerations(Path repoRoot, long latestGen) throws private static void assertIndexUUIDs(Path repoRoot, RepositoryData repositoryData) throws IOException { final List expectedIndexUUIDs = repositoryData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toList()); - try (Stream indexRoots = Files.list(repoRoot.resolve("indices"))) { + try (Stream indexRoots = repoFilesByPrefix(repoRoot.resolve("indices"))) { final List foundIndexUUIDs = indexRoots.filter(s -> s.getFileName().toString().startsWith("extra") == false) .map(p -> p.getFileName().toString()).collect(Collectors.toList()); assertThat(foundIndexUUIDs, containsInAnyOrder(expectedIndexUUIDs.toArray(Strings.EMPTY_ARRAY))); @@ -614,7 +613,7 @@ private static void assertSnapshotUUIDs(Path repoRoot, RepositoryData repository final List expectedSnapshotUUIDs = repositoryData.getSnapshotIds().stream().map(SnapshotId::getUUID).collect(Collectors.toList()); for (String prefix : new String[]{"snap-", "meta-"}) { - try (Stream repoRootBlobs = Files.list(repoRoot)) { + try (Stream repoRootBlobs = repoFilesByPrefix(repoRoot)) { final Collection foundSnapshotUUIDs = repoRootBlobs.filter(p -> p.getFileName().toString().startsWith(prefix)) .map(p -> p.getFileName().toString().replace(prefix, "").replace(".dat", "")) .collect(Collectors.toSet()); @@ -623,6 +622,20 @@ private static void assertSnapshotUUIDs(Path repoRoot, RepositoryData repository } } + /** + * List contents of a blob path and return an empty stream if the path doesn't exist. + * @param prefix Path to find children for + * @return stream of child paths + * @throws IOException on failure + */ + private static Stream repoFilesByPrefix(Path prefix) throws IOException { + try { + return Files.list(prefix); + } catch (FileNotFoundException | NoSuchFileException e) { + return Stream.empty(); + } + } + private void clearDisruptionsAndAwaitSync() { testClusterNodes.clearNetworkDisruptions(); runUntil(() -> { From 0f35d334d3e52dbaf1abcd3f7723a7640874c694 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 19 Apr 2019 13:31:26 +0200 Subject: [PATCH 10/18] CR: fix failing to use cached misses in read and exists calls --- .../snapshots/SnapshotResiliencyTests.java | 1 - .../MockEventuallyConsistentRepository.java | 15 +++++++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index e09baa3add691..3fcd98cc44a2a 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -444,7 +444,6 @@ public void testConcurrentSnapshotCreateAndDelete() { * Simulates concurrent restarts of data and master nodes as well as relocating a primary shard, while starting and subsequently * deleting a snapshot. */ - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/41326") public void testSnapshotPrimaryRelocations() { final int masterNodeCount = randomFrom(1, 3, 5); setupTestCluster(masterNodeCount, randomIntBetween(2, 10)); diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java index f4034fa56879c..1f93a0710a68f 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java @@ -116,6 +116,9 @@ private class MockBlobContainer extends BlobContainerWrapper { @Override public boolean blobExists(String blobName) { + if (cachedMisses.contains(blobName)) { + return false; + } ensureReadAfterWrite(blobName); final boolean result = super.blobExists(blobName); if (result == false) { @@ -126,15 +129,15 @@ public boolean blobExists(String blobName) { @Override public InputStream readBlob(String name) throws IOException { + if (cachedMisses.contains(name)) { + throw new NoSuchFileException(name); + } ensureReadAfterWrite(name); return super.readBlob(name); } private void ensureReadAfterWrite(String blobName) { - // TODO: Make this even less consistent by keeping track of blobs that existed before instead of simply ensuring - // read after first write again if the blob doesn't exist - if (cachedMisses.contains(blobName) == false && pendingWrites.containsKey(blobName) - && super.blobExists(blobName) == false) { + if (cachedMisses.contains(blobName) == false && pendingWrites.containsKey(blobName)) { pendingWrites.remove(blobName).run(); } } @@ -157,6 +160,10 @@ public void deleteBlob(String blobName) { @Override public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { + // TODO: Add an assertion that no blob except index.latest is ever written to twice with different data here + // Currently this is not possible because master failovers in SnapshotResiliencyTests.testSnapshotWithNodeDisconnects + // will lead to snap-{uuid}.dat being written two repeatedly with different content during snapshot finalization + // which should be fixed. final ByteArrayOutputStream baos = new ByteArrayOutputStream(); Streams.copy(inputStream, baos); pendingWrites.put(blobName, () -> { From cf9a39d1bba6c1098f51ac6667e1d4869ff216b4 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 24 Apr 2019 15:47:42 +0200 Subject: [PATCH 11/18] Make things nicer loooking --- .../MockEventuallyConsistentRepository.java | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java index 1f93a0710a68f..aaf1d599960e9 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java @@ -24,7 +24,6 @@ import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; @@ -80,14 +79,16 @@ protected BlobStore createBlobStore() throws Exception { */ public static final class Context { - /** - * Map of blob path to a tuple of cached non-existent blobs in them and a map of child blob name to {@link Runnable} that when - * executed will create the blob. - */ - private final Map, Map>> state = new HashMap<>(); + private final Map> cachedMisses = new HashMap<>(); - public Tuple, Map> getState(BlobPath path) { - return state.computeIfAbsent(path, p -> new Tuple<>(new HashSet<>(), new HashMap<>())); + private final Map> pendingWriteActions = new HashMap<>(); + + private Map pendingActions(BlobPath path) { + return pendingWriteActions.computeIfAbsent(path, p -> new HashMap<>()); + } + + private Set cachedMisses(BlobPath path) { + return cachedMisses.computeIfAbsent(path, p -> new HashSet<>()); } } @@ -99,7 +100,7 @@ private class MockBlobStore extends BlobStoreWrapper { @Override public BlobContainer blobContainer(BlobPath path) { - return new MockBlobContainer(super.blobContainer(path), context.getState(path)); + return new MockBlobContainer(super.blobContainer(path), context.cachedMisses(path), context.pendingActions(path)); } private class MockBlobContainer extends BlobContainerWrapper { @@ -108,10 +109,10 @@ private class MockBlobContainer extends BlobContainerWrapper { private final Map pendingWrites; - MockBlobContainer(BlobContainer delegate, Tuple, Map> state) { + MockBlobContainer(BlobContainer delegate, Set cachedMisses, Map pendingWrites) { super(delegate); - cachedMisses = state.v1(); - pendingWrites = state.v2(); + this.cachedMisses = cachedMisses; + this.pendingWrites = pendingWrites; } @Override @@ -170,6 +171,7 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b try { super.writeBlob(blobName, new ByteArrayInputStream(baos.toByteArray()), blobSize, failIfAlreadyExists); if (cachedMisses.contains(blobName)) { + // Remove cached missing blob later to simulate inconsistency between list and get calls. deterministicTaskQueue.scheduleNow(() -> cachedMisses.remove(blobName)); } } catch (NoSuchFileException | FileAlreadyExistsException e) { From f8df21787a63c29597cc9cb403db7d94e892e005 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 7 May 2019 16:38:32 +0200 Subject: [PATCH 12/18] repro #41898 --- .../repositories/blobstore/BlobStoreRepository.java | 3 --- .../mockstore/MockEventuallyConsistentRepository.java | 5 ++++- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 39a4011783a55..0ef1d3ab149f0 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -395,9 +395,6 @@ public void initializeSnapshot(SnapshotId snapshotId, List indices, Met if (repositoryData.getAllSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) { throw new InvalidSnapshotNameException(metadata.name(), snapshotId.getName(), "snapshot with the same name already exists"); } - if (snapshotFormat.exists(blobContainer(), snapshotId.getUUID())) { - throw new InvalidSnapshotNameException(metadata.name(), snapshotId.getName(), "snapshot with the same name already exists"); - } // Write Global MetaData globalMetaDataFormat.write(clusterMetaData, blobContainer(), snapshotId.getUUID()); diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java index aaf1d599960e9..f55a636cfd643 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java @@ -172,7 +172,10 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b super.writeBlob(blobName, new ByteArrayInputStream(baos.toByteArray()), blobSize, failIfAlreadyExists); if (cachedMisses.contains(blobName)) { // Remove cached missing blob later to simulate inconsistency between list and get calls. - deterministicTaskQueue.scheduleNow(() -> cachedMisses.remove(blobName)); + // Just scheduling at the current time since we get randomized future execution from the deterministic + // task queue's jitter anyway. + deterministicTaskQueue.scheduleAt( + deterministicTaskQueue.getCurrentTimeMillis(), () -> cachedMisses.remove(blobName)); } } catch (NoSuchFileException | FileAlreadyExistsException e) { // Ignoring, assuming a previous concurrent delete removed the parent path and that overwrites are not From 0f310ad85f72ea2badc81ee7c9a04cfe80d79e32 Mon Sep 17 00:00:00 2001 From: Armin Date: Mon, 24 Jun 2019 10:27:01 +0200 Subject: [PATCH 13/18] CR: Adjust implementation to in memory list of all operations --- .../snapshots/SnapshotResiliencyTests.java | 5 +- .../MockEventuallyConsistentRepository.java | 287 ++++++++++++------ .../coordination/DeterministicTaskQueue.java | 4 + 3 files changed, 197 insertions(+), 99 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index debcff1743fab..80213d5540597 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -108,8 +108,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.ClusterSettings; @@ -225,6 +223,9 @@ public void createServices() { @After public void verifyReposThenStopServices() { try { + if (blobStoreContext != null) { + blobStoreContext.forceConsistent(); + } BlobStoreTestUtil.assertConsistency( (BlobStoreRepository) testClusterNodes.randomMasterNodeSafe().repositoriesService.repository("repo"), Runnable::run); diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java index f55a636cfd643..f26b8675c1038 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java @@ -21,57 +21,67 @@ import org.elasticsearch.cluster.coordination.DeterministicTaskQueue; import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; -import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; +import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; -import org.elasticsearch.repositories.fs.FsRepository; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; -import java.nio.file.DirectoryNotEmptyException; -import java.nio.file.FileAlreadyExistsException; import java.nio.file.NoSuchFileException; +import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; +import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.stream.Collectors; /** - * Mock Repository that simulates the eventually consistent behaviour of AWS S3 as documented in the + * Mock Repository that allows testing the eventually consistent behaviour of AWS S3 as documented in the * AWS S3 docs. - * Specifically this implementation simulates: - *
    - *
  • First read after write is consistent for each blob. (see S3 docs for specifics)
  • - *
  • Deletes and updates to a blob can become visible with a delay.
  • - *
  • Blobs can become visible to list operations with a delay.
  • - *
+ * Currently, the repository asserts that no inconsistent reads are made. + * TODO: Resolve todos on list and overwrite operation consistency to fully cover S3's behavior. */ -public class MockEventuallyConsistentRepository extends FsRepository { - - private final DeterministicTaskQueue deterministicTaskQueue; +public class MockEventuallyConsistentRepository extends BlobStoreRepository { private final Context context; public MockEventuallyConsistentRepository(RepositoryMetaData metadata, Environment environment, - NamedXContentRegistry namedXContentRegistry, DeterministicTaskQueue deterministicTaskQueue, Context context) { - super(metadata, environment, namedXContentRegistry, deterministicTaskQueue.getThreadPool()); - this.deterministicTaskQueue = deterministicTaskQueue; + NamedXContentRegistry namedXContentRegistry, DeterministicTaskQueue deterministicTaskQueue, Context context) { + super(metadata, environment.settings(), namedXContentRegistry, deterministicTaskQueue.getThreadPool(), BlobPath.cleanPath()); this.context = context; } + // Filters out all actions that are super-seeded by subsequent actions + // TODO: Remove all usages of this method, snapshots should not depend on consistent list operations + private static List consistentView(List actions) { + final Map lastActions = new HashMap<>(); + for (BlobStoreAction action : actions) { + if (action.operation == Operation.PUT) { + lastActions.put(action.path, action); + } else if (action.operation == Operation.DELETE) { + lastActions.remove(action.path); + } + } + return List.copyOf(lastActions.values()); + } + @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo in the test thread } @Override - protected BlobStore createBlobStore() throws Exception { - return new MockBlobStore(super.createBlobStore()); + protected BlobStore createBlobStore() { + return new MockBlobStore(); } /** @@ -79,122 +89,205 @@ protected BlobStore createBlobStore() throws Exception { */ public static final class Context { - private final Map> cachedMisses = new HashMap<>(); + private final List actions = new ArrayList<>(); + + /** + * Force the repository into a consistent end state so that its eventual state can be examined. + */ + public void forceConsistent() { + synchronized (actions) { + final List consistentActions = consistentView(actions); + actions.clear(); + actions.addAll(consistentActions); + } + } + } + + private enum Operation { + PUT, GET, DELETE + } + + private static final class BlobStoreAction { + + private final Operation operation; - private final Map> pendingWriteActions = new HashMap<>(); + @Nullable + private final byte[] data; - private Map pendingActions(BlobPath path) { - return pendingWriteActions.computeIfAbsent(path, p -> new HashMap<>()); + private final String path; + + private BlobStoreAction(Operation operation, String path, byte[] data) { + this.operation = operation; + this.path = path; + this.data = data; } - private Set cachedMisses(BlobPath path) { - return cachedMisses.computeIfAbsent(path, p -> new HashSet<>()); + private BlobStoreAction(Operation operation, String path) { + this(operation, path, null); } } - private class MockBlobStore extends BlobStoreWrapper { + private class MockBlobStore implements BlobStore { - MockBlobStore(BlobStore delegate) { - super(delegate); - } + private AtomicBoolean closed = new AtomicBoolean(false); @Override public BlobContainer blobContainer(BlobPath path) { - return new MockBlobContainer(super.blobContainer(path), context.cachedMisses(path), context.pendingActions(path)); + return new MockBlobContainer(path); } - private class MockBlobContainer extends BlobContainerWrapper { + @Override + public void close() { + closed.set(true); + } - private final Set cachedMisses; + private class MockBlobContainer implements BlobContainer { - private final Map pendingWrites; + private final BlobPath path; - MockBlobContainer(BlobContainer delegate, Set cachedMisses, Map pendingWrites) { - super(delegate); - this.cachedMisses = cachedMisses; - this.pendingWrites = pendingWrites; + MockBlobContainer(BlobPath path) { + this.path = path; + } + + @Override + public BlobPath path() { + return path; } @Override public boolean blobExists(String blobName) { - if (cachedMisses.contains(blobName)) { + if (closed.get()) { + throw new AssertionError("Blobstore is closed already"); + } + try { + readBlob(blobName); + return true; + } catch (NoSuchFileException e) { return false; } - ensureReadAfterWrite(blobName); - final boolean result = super.blobExists(blobName); - if (result == false) { - cachedMisses.add(blobName); + } + + @Override + public InputStream readBlob(String name) throws NoSuchFileException { + if (closed.get()) { + throw new AssertionError("Blobstore is closed already"); + } + final String blobPath = path.buildAsString() + name; + synchronized (context.actions) { + context.actions.add(new BlobStoreAction(Operation.GET, blobPath)); + final List relevantActions = + context.actions.stream().filter(action -> blobPath.equals(action.path)).collect(Collectors.toList()); + final List writes = new ArrayList<>(); + boolean readBeforeWrite = false; + for (BlobStoreAction relevantAction : relevantActions) { + if (relevantAction.operation == Operation.PUT) { + writes.add(relevantAction.data); + } + if (writes.isEmpty() && relevantAction.operation == Operation.GET) { + readBeforeWrite = true; + } + } + if (writes.isEmpty()) { + throw new NoSuchFileException(blobPath); + } + if (readBeforeWrite == false && writes.size() == 1) { + // Consistent read after write + return new ByteArrayInputStream(writes.get(0)); + } + if ("incompatible-snapshots".equals(blobPath) == false && "index.latest".equals(blobPath) == false) { + throw new AssertionError("Inconsistent read on [" + blobPath + ']'); + } + return consistentView(relevantActions).stream() + .filter(action -> action.path.equals(blobPath) && action.operation == Operation.PUT) + .findAny().map( + action -> new ByteArrayInputStream(action.data)).orElseThrow(() -> new NoSuchFileException(blobPath)); + } + } + + @Override + public void deleteBlob(String blobName) { + if (closed.get()) { + throw new AssertionError("Blobstore is closed already"); + } + synchronized (context.actions) { + context.actions.add(new BlobStoreAction(Operation.DELETE, path.buildAsString() + blobName)); + } + } + + @Override + public void delete() { + if (closed.get()) { + throw new AssertionError("Blobstore is closed already"); + } + final String thisPath = path.buildAsString(); + synchronized (context.actions) { + consistentView(context.actions).stream().filter(action -> action.path.startsWith(thisPath)) + .forEach(a -> context.actions.add(new BlobStoreAction(Operation.DELETE, a.path))); } - return result; } @Override - public InputStream readBlob(String name) throws IOException { - if (cachedMisses.contains(name)) { - throw new NoSuchFileException(name); + public Map listBlobs() { + if (closed.get()) { + throw new AssertionError("Blobstore is closed already"); + } + final String thisPath = path.buildAsString(); + synchronized (context.actions) { + return consistentView(context.actions).stream() + .filter( + action -> action.path.startsWith(thisPath) && action.path.substring(thisPath.length()).indexOf('/') == -1 + && action.operation == Operation.PUT) + .collect( + Collectors.toMap( + action -> action.path.substring(thisPath.length()), + action -> new PlainBlobMetaData(action.path.substring(thisPath.length()), action.data.length))); } - ensureReadAfterWrite(name); - return super.readBlob(name); } - private void ensureReadAfterWrite(String blobName) { - if (cachedMisses.contains(blobName) == false && pendingWrites.containsKey(blobName)) { - pendingWrites.remove(blobName).run(); + @Override + public Map children() { + if (closed.get()) { + throw new AssertionError("Blobstore is closed already"); + } + final String thisPath = path.buildAsString(); + synchronized (context.actions) { + return consistentView(context.actions).stream() + .filter(action -> + action.operation == Operation.PUT + && action.path.startsWith(thisPath) && action.path.substring(thisPath.length()).indexOf('/') != -1) + .map(action -> action.path.substring(thisPath.length()).split("/")[0]) + .distinct() + .collect(Collectors.toMap(Function.identity(), name -> new MockBlobContainer(path.add(name)))); } } @Override - public void deleteBlob(String blobName) { - ensureReadAfterWrite(blobName); - // TODO: simulate longer delays here once the S3 blob store implementation can handle them - deterministicTaskQueue.scheduleNow(() -> { - try { - super.deleteBlob(blobName); - } catch (DirectoryNotEmptyException | NoSuchFileException e) { - // ignored since neither of these exceptions would occur on S3 - } catch (IOException e) { - throw new AssertionError(e); - } - }); + public Map listBlobsByPrefix(String blobNamePrefix) { + return Maps.ofEntries( + listBlobs().entrySet().stream().filter(entry -> entry.getKey().startsWith(blobNamePrefix)).collect(Collectors.toList()) + ); } @Override public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) - throws IOException { - // TODO: Add an assertion that no blob except index.latest is ever written to twice with different data here - // Currently this is not possible because master failovers in SnapshotResiliencyTests.testSnapshotWithNodeDisconnects - // will lead to snap-{uuid}.dat being written two repeatedly with different content during snapshot finalization - // which should be fixed. - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - Streams.copy(inputStream, baos); - pendingWrites.put(blobName, () -> { - try { - super.writeBlob(blobName, new ByteArrayInputStream(baos.toByteArray()), blobSize, failIfAlreadyExists); - if (cachedMisses.contains(blobName)) { - // Remove cached missing blob later to simulate inconsistency between list and get calls. - // Just scheduling at the current time since we get randomized future execution from the deterministic - // task queue's jitter anyway. - deterministicTaskQueue.scheduleAt( - deterministicTaskQueue.getCurrentTimeMillis(), () -> cachedMisses.remove(blobName)); - } - } catch (NoSuchFileException | FileAlreadyExistsException e) { - // Ignoring, assuming a previous concurrent delete removed the parent path and that overwrites are not - // detectable with this kind of store - } catch (IOException e) { - throw new AssertionError(e); - } - }); - // TODO: simulate longer delays here once the S3 blob store implementation can handle them - deterministicTaskQueue.scheduleNow(() -> { - if (pendingWrites.containsKey(blobName)) { - pendingWrites.remove(blobName).run(); - } - }); + throws IOException { + if (closed.get()) { + throw new AssertionError("Blobstore is closed already"); + } + // TODO: Throw if we try to overwrite any blob other than incompatible_snapshots or index.latest with different content + // than it already contains. + assert blobSize < Integer.MAX_VALUE; + final byte[] data = new byte[(int) blobSize]; + final int read = inputStream.read(data); + assert read == data.length; + synchronized (context.actions) { + context.actions.add(new BlobStoreAction(Operation.PUT, path.buildAsString() + blobName, data)); + } } @Override public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize, - final boolean failIfAlreadyExists) throws IOException { + final boolean failIfAlreadyExists) throws IOException { writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists); } } diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java index 4567b97700604..6ef2cc746d1cf 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java @@ -282,6 +282,10 @@ public ThreadPool getThreadPool() { return getThreadPool(Function.identity()); } + public Random getRandom() { + return random; + } + /** * @return A ThreadPool that uses this task queue and wraps Runnables in the given wrapper. */ From 1e50b85ae92679c435b887b644f8f24cb436fd89 Mon Sep 17 00:00:00 2001 From: Armin Date: Mon, 24 Jun 2019 14:09:29 +0200 Subject: [PATCH 14/18] shorter diff --- .../elasticsearch/common/blobstore/BlobPath.java | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/BlobPath.java b/server/src/main/java/org/elasticsearch/common/blobstore/BlobPath.java index 611da1c04faf2..d3acd02a06d1f 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/BlobPath.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/BlobPath.java @@ -92,20 +92,4 @@ public String toString() { } return sb.toString(); } - - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - return paths.equals(((BlobPath) o).paths); - } - - @Override - public int hashCode() { - return paths.hashCode(); - } } From f0427aa31be81f3c40637bcb4ec1b490de9e4ffc Mon Sep 17 00:00:00 2001 From: Armin Date: Mon, 24 Jun 2019 14:09:58 +0200 Subject: [PATCH 15/18] shorter diff --- .../cluster/coordination/DeterministicTaskQueue.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java index 6ef2cc746d1cf..4567b97700604 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java @@ -282,10 +282,6 @@ public ThreadPool getThreadPool() { return getThreadPool(Function.identity()); } - public Random getRandom() { - return random; - } - /** * @return A ThreadPool that uses this task queue and wraps Runnables in the given wrapper. */ From d1f1e798735c6b43bffe0077d65f3ee09c8bdd13 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 16 Jul 2019 08:05:09 +0200 Subject: [PATCH 16/18] CR: Test, fix delete case, dry up ensure open --- .../snapshots/SnapshotResiliencyTests.java | 2 +- .../MockEventuallyConsistentRepository.java | 61 ++++----- ...ckEventuallyConsistentRepositoryTests.java | 126 ++++++++++++++++++ 3 files changed, 154 insertions(+), 35 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 9bca3d41371d9..289d707a1e4d3 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -1095,7 +1095,7 @@ protected void assertSnapshotOrGenericThread() { } else { return metaData -> { final Repository repository = new MockEventuallyConsistentRepository( - metaData, environment, xContentRegistry(), deterministicTaskQueue, blobStoreContext); + metaData, environment, xContentRegistry(), deterministicTaskQueue.getThreadPool(), blobStoreContext); repository.start(); return repository; }; diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java index f26b8675c1038..9481276393f3f 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java @@ -19,7 +19,6 @@ package org.elasticsearch.snapshots.mockstore; -import org.elasticsearch.cluster.coordination.DeterministicTaskQueue; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.blobstore.BlobContainer; @@ -31,6 +30,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.threadpool.ThreadPool; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -55,8 +55,8 @@ public class MockEventuallyConsistentRepository extends BlobStoreRepository { private final Context context; public MockEventuallyConsistentRepository(RepositoryMetaData metadata, Environment environment, - NamedXContentRegistry namedXContentRegistry, DeterministicTaskQueue deterministicTaskQueue, Context context) { - super(metadata, environment.settings(), namedXContentRegistry, deterministicTaskQueue.getThreadPool(), BlobPath.cleanPath()); + NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool, Context context) { + super(metadata, environment.settings(), namedXContentRegistry, threadPool, BlobPath.cleanPath()); this.context = context; } @@ -141,6 +141,12 @@ public void close() { closed.set(true); } + private void ensureNotClosed() { + if (closed.get()) { + throw new AssertionError("Blobstore is closed already"); + } + } + private class MockBlobContainer implements BlobContainer { private final BlobPath path; @@ -156,9 +162,7 @@ public BlobPath path() { @Override public boolean blobExists(String blobName) { - if (closed.get()) { - throw new AssertionError("Blobstore is closed already"); - } + ensureNotClosed(); try { readBlob(blobName); return true; @@ -169,14 +173,19 @@ public boolean blobExists(String blobName) { @Override public InputStream readBlob(String name) throws NoSuchFileException { - if (closed.get()) { - throw new AssertionError("Blobstore is closed already"); - } + ensureNotClosed(); final String blobPath = path.buildAsString() + name; synchronized (context.actions) { + final List relevantActions = new ArrayList<>( + context.actions.stream().filter(action -> blobPath.equals(action.path)).collect(Collectors.toList())); context.actions.add(new BlobStoreAction(Operation.GET, blobPath)); - final List relevantActions = - context.actions.stream().filter(action -> blobPath.equals(action.path)).collect(Collectors.toList()); + for (int i = relevantActions.size() - 1; i > 0; i--) { + if (relevantActions.get(i).operation == Operation.GET) { + relevantActions.remove(i); + } else { + break; + } + } final List writes = new ArrayList<>(); boolean readBeforeWrite = false; for (BlobStoreAction relevantAction : relevantActions) { @@ -190,25 +199,17 @@ public InputStream readBlob(String name) throws NoSuchFileException { if (writes.isEmpty()) { throw new NoSuchFileException(blobPath); } - if (readBeforeWrite == false && writes.size() == 1) { + if (readBeforeWrite == false && relevantActions.size() == 1) { // Consistent read after write return new ByteArrayInputStream(writes.get(0)); } - if ("incompatible-snapshots".equals(blobPath) == false && "index.latest".equals(blobPath) == false) { - throw new AssertionError("Inconsistent read on [" + blobPath + ']'); - } - return consistentView(relevantActions).stream() - .filter(action -> action.path.equals(blobPath) && action.operation == Operation.PUT) - .findAny().map( - action -> new ByteArrayInputStream(action.data)).orElseThrow(() -> new NoSuchFileException(blobPath)); + throw new AssertionError("Inconsistent read on [" + blobPath + ']'); } } @Override public void deleteBlob(String blobName) { - if (closed.get()) { - throw new AssertionError("Blobstore is closed already"); - } + ensureNotClosed(); synchronized (context.actions) { context.actions.add(new BlobStoreAction(Operation.DELETE, path.buildAsString() + blobName)); } @@ -216,9 +217,7 @@ public void deleteBlob(String blobName) { @Override public void delete() { - if (closed.get()) { - throw new AssertionError("Blobstore is closed already"); - } + ensureNotClosed(); final String thisPath = path.buildAsString(); synchronized (context.actions) { consistentView(context.actions).stream().filter(action -> action.path.startsWith(thisPath)) @@ -228,9 +227,7 @@ public void delete() { @Override public Map listBlobs() { - if (closed.get()) { - throw new AssertionError("Blobstore is closed already"); - } + ensureNotClosed(); final String thisPath = path.buildAsString(); synchronized (context.actions) { return consistentView(context.actions).stream() @@ -246,9 +243,7 @@ public Map listBlobs() { @Override public Map children() { - if (closed.get()) { - throw new AssertionError("Blobstore is closed already"); - } + ensureNotClosed(); final String thisPath = path.buildAsString(); synchronized (context.actions) { return consistentView(context.actions).stream() @@ -271,9 +266,7 @@ public Map listBlobsByPrefix(String blobNamePrefix) { @Override public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { - if (closed.get()) { - throw new AssertionError("Blobstore is closed already"); - } + ensureNotClosed(); // TODO: Throw if we try to overwrite any blob other than incompatible_snapshots or index.latest with different content // than it already contains. assert blobSize < Integer.MAX_VALUE; diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java new file mode 100644 index 0000000000000..c3f51e9e64614 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java @@ -0,0 +1,126 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.snapshots.mockstore; + +import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.env.Environment; +import org.elasticsearch.env.TestEnvironment; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.util.Arrays; + +import static org.elasticsearch.env.Environment.PATH_HOME_SETTING; +import static org.elasticsearch.node.Node.NODE_NAME_SETTING; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; + +public class MockEventuallyConsistentRepositoryTests extends ESTestCase { + + private Environment environment; + + @Override + public void setUp() throws Exception { + super.setUp(); + final Path tempDir = createTempDir(); + final String nodeName = "testNode"; + environment = TestEnvironment.newEnvironment(Settings.builder() + .put(NODE_NAME_SETTING.getKey(), nodeName) + .put(PATH_HOME_SETTING.getKey(), tempDir.resolve(nodeName).toAbsolutePath()) + .put(Environment.PATH_REPO_SETTING.getKey(), tempDir.resolve("repo").toAbsolutePath()) + .build()); + } + + public void testReadAfterWriteConsistently() throws IOException { + MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); + try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( + new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), environment, + xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) { + repository.start(); + final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath()); + final String blobName = randomAlphaOfLength(10); + final int lengthWritten = randomIntBetween(1, 100); + final byte[] blobData = randomByteArrayOfLength(lengthWritten); + blobContainer.writeBlob(blobName, new ByteArrayInputStream(blobData), lengthWritten, true); + try (InputStream in = blobContainer.readBlob(blobName)) { + final byte[] readBytes = new byte[lengthWritten + 1]; + final int lengthSeen = in.read(readBytes); + assertThat(lengthSeen, equalTo(lengthWritten)); + assertArrayEquals(blobData, Arrays.copyOf(readBytes, lengthWritten)); + } + } + } + + public void testReadAfterWriteAfterReadThrows() throws IOException { + MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); + try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( + new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), environment, + xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) { + repository.start(); + final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath()); + final String blobName = randomAlphaOfLength(10); + final int lengthWritten = randomIntBetween(1, 100); + final byte[] blobData = randomByteArrayOfLength(lengthWritten); + assertMissing(blobContainer, blobName); + blobContainer.writeBlob(blobName, new ByteArrayInputStream(blobData), lengthWritten, true); + assertThrowsOnInconsistentRead(blobContainer, blobName); + } + } + + public void testReadAfterDeleteAfterWriteThrows() throws IOException { + MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); + try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( + new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), environment, + xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) { + repository.start(); + final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath()); + final String blobName = randomAlphaOfLength(10); + final int lengthWritten = randomIntBetween(1, 100); + final byte[] blobData = randomByteArrayOfLength(lengthWritten); + blobContainer.writeBlob(blobName, new ByteArrayInputStream(blobData), lengthWritten, true); + blobContainer.deleteBlob(blobName); + assertThrowsOnInconsistentRead(blobContainer, blobName); + blobStoreContext.forceConsistent(); + assertMissing(blobContainer, blobName); + } + } + + private static void assertThrowsOnInconsistentRead(BlobContainer blobContainer, String blobName) throws IOException { + try (InputStream in = blobContainer.readBlob(blobName)) { + fail("Inconsistent read should throw"); + } catch (AssertionError assertionError) { + assertThat(assertionError.getMessage(), equalTo("Inconsistent read on [" + blobName + ']')); + } + } + + private static void assertMissing(BlobContainer container, String blobName) throws IOException { + try (InputStream in = container.readBlob(blobName)) { + fail("Reading a non-existent blob should throw"); + } catch (NoSuchFileException expected) { + } + } +} From 58a9d4234b6fe151743bb3bbf842b5ef704e6ba9 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 16 Jul 2019 15:57:59 +0200 Subject: [PATCH 17/18] CR: much stronger overwrite blob assertions, some cleanups, cleaner tests --- .../blobstore/BlobStoreRepository.java | 8 +- .../MockEventuallyConsistentRepository.java | 109 +++++++++++++----- ...ckEventuallyConsistentRepositoryTests.java | 77 +++++++++++-- 3 files changed, 153 insertions(+), 41 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 7648c158ff2aa..86466eb4b50ec 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -129,13 +129,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private static final int BUFFER_SIZE = 4096; - private static final String SNAPSHOT_PREFIX = "snap-"; + public static final String SNAPSHOT_PREFIX = "snap-"; - private static final String SNAPSHOT_CODEC = "snapshot"; + public static final String SNAPSHOT_CODEC = "snapshot"; private static final String INDEX_FILE_PREFIX = "index-"; - private static final String INDEX_LATEST_BLOB = "index.latest"; + public static final String INDEX_LATEST_BLOB = "index.latest"; private static final String TESTS_FILE = "tests-"; @@ -180,7 +180,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private final ChecksumBlobStoreFormat indexMetaDataFormat; - private final ChecksumBlobStoreFormat snapshotFormat; + protected final ChecksumBlobStoreFormat snapshotFormat; private final boolean readOnly; diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java index 9481276393f3f..7c1f8e4b8cd05 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java @@ -19,6 +19,7 @@ package org.elasticsearch.snapshots.mockstore; +import org.apache.lucene.codecs.CodecUtil; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.blobstore.BlobContainer; @@ -26,10 +27,16 @@ import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.util.Maps; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.env.Environment; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import java.io.ByteArrayInputStream; @@ -44,6 +51,10 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; + /** * Mock Repository that allows testing the eventually consistent behaviour of AWS S3 as documented in the * AWS S3 docs. @@ -54,10 +65,13 @@ public class MockEventuallyConsistentRepository extends BlobStoreRepository { private final Context context; + private final NamedXContentRegistry namedXContentRegistry; + public MockEventuallyConsistentRepository(RepositoryMetaData metadata, Environment environment, NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool, Context context) { super(metadata, environment.settings(), namedXContentRegistry, threadPool, BlobPath.cleanPath()); this.context = context; + this.namedXContentRegistry = namedXContentRegistry; } // Filters out all actions that are super-seeded by subsequent actions @@ -176,37 +190,33 @@ public InputStream readBlob(String name) throws NoSuchFileException { ensureNotClosed(); final String blobPath = path.buildAsString() + name; synchronized (context.actions) { - final List relevantActions = new ArrayList<>( - context.actions.stream().filter(action -> blobPath.equals(action.path)).collect(Collectors.toList())); + final List relevantActions = relevantActions(blobPath); context.actions.add(new BlobStoreAction(Operation.GET, blobPath)); - for (int i = relevantActions.size() - 1; i > 0; i--) { - if (relevantActions.get(i).operation == Operation.GET) { - relevantActions.remove(i); - } else { - break; - } - } - final List writes = new ArrayList<>(); - boolean readBeforeWrite = false; - for (BlobStoreAction relevantAction : relevantActions) { - if (relevantAction.operation == Operation.PUT) { - writes.add(relevantAction.data); - } - if (writes.isEmpty() && relevantAction.operation == Operation.GET) { - readBeforeWrite = true; - } - } - if (writes.isEmpty()) { + if (relevantActions.stream().noneMatch(a -> a.operation == Operation.PUT)) { throw new NoSuchFileException(blobPath); } - if (readBeforeWrite == false && relevantActions.size() == 1) { + if (relevantActions.size() == 1 && relevantActions.get(0).operation == Operation.PUT) { // Consistent read after write - return new ByteArrayInputStream(writes.get(0)); + return new ByteArrayInputStream(relevantActions.get(0).data); } throw new AssertionError("Inconsistent read on [" + blobPath + ']'); } } + private List relevantActions(String blobPath) { + assert Thread.holdsLock(context.actions); + final List relevantActions = new ArrayList<>( + context.actions.stream().filter(action -> blobPath.equals(action.path)).collect(Collectors.toList())); + for (int i = relevantActions.size() - 1; i > 0; i--) { + if (relevantActions.get(i).operation == Operation.GET) { + relevantActions.remove(i); + } else { + break; + } + } + return relevantActions; + } + @Override public void deleteBlob(String blobName) { ensureNotClosed(); @@ -265,16 +275,63 @@ public Map listBlobsByPrefix(String blobNamePrefix) { @Override public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) - throws IOException { + throws IOException { ensureNotClosed(); - // TODO: Throw if we try to overwrite any blob other than incompatible_snapshots or index.latest with different content - // than it already contains. assert blobSize < Integer.MAX_VALUE; final byte[] data = new byte[(int) blobSize]; final int read = inputStream.read(data); assert read == data.length; + final String blobPath = path.buildAsString() + blobName; synchronized (context.actions) { - context.actions.add(new BlobStoreAction(Operation.PUT, path.buildAsString() + blobName, data)); + final List relevantActions = relevantActions(blobPath); + // We do some checks in case there is a consistent state for a blob to prevent turning it inconsistent. + final boolean hasConsistentContent = + relevantActions.size() == 1 && relevantActions.get(0).operation == Operation.PUT; + if (BlobStoreRepository.INDEX_LATEST_BLOB.equals(blobName)) { + // TODO: Ensure that it is impossible to ever decrement the generation id stored in index.latest then assert that + // it never decrements here + } else if (blobName.startsWith(BlobStoreRepository.SNAPSHOT_PREFIX)) { + if (hasConsistentContent) { + if (basePath().buildAsString().equals(path().buildAsString())) { + try { + // TODO: dry up the logic for reading SnapshotInfo here against the code in ChecksumBlobStoreFormat + final int offset = CodecUtil.headerLength(BlobStoreRepository.SNAPSHOT_CODEC); + final SnapshotInfo updatedInfo = SnapshotInfo.fromXContentInternal( + XContentHelper.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, + new BytesArray(data, offset, data.length - offset - CodecUtil.footerLength()), + XContentType.SMILE)); + // If the existing snapshotInfo differs only in the timestamps it stores, then the overwrite is not + // a problem and could be the result of a correctly handled master failover. + final SnapshotInfo existingInfo = snapshotFormat.readBlob(this, blobName); + assertThat(existingInfo.snapshotId(), equalTo(updatedInfo.snapshotId())); + assertThat(existingInfo.reason(), equalTo(updatedInfo.reason())); + assertThat(existingInfo.state(), equalTo(updatedInfo.state())); + assertThat(existingInfo.totalShards(), equalTo(updatedInfo.totalShards())); + assertThat(existingInfo.successfulShards(), equalTo(updatedInfo.successfulShards())); + assertThat( + existingInfo.shardFailures(), containsInAnyOrder(updatedInfo.shardFailures().toArray())); + assertThat(existingInfo.indices(), equalTo(updatedInfo.indices())); + return; // No need to add a write for this since we didn't change content + } catch (Exception e) { + // Rethrow as AssertionError here since kind exception might otherwise be swallowed and logged by + // the blob store repository. + // Since we are not doing any actual IO we don't expect this to throw ever and an exception would + // signal broken SnapshotInfo bytes or unexpected behavior of SnapshotInfo otherwise. + throw new AssertionError("Failed to deserialize SnapshotInfo", e); + } + } else { + // Primaries never retry so any shard level snap- blob retry/overwrite even with the same content is + // not expected. + throw new AssertionError("Shard level snap-{uuid} blobs should never be overwritten"); + } + } + } else { + if (hasConsistentContent) { + ESTestCase.assertArrayEquals("Tried to overwrite blob [" + blobName + "]", relevantActions.get(0).data, data); + return; // No need to add a write for this since we didn't change content + } + } + context.actions.add(new BlobStoreAction(Operation.PUT, blobPath, data)); } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java index c3f51e9e64614..81934fe93bd8a 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java @@ -19,11 +19,13 @@ package org.elasticsearch.snapshots.mockstore; import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; @@ -33,10 +35,12 @@ import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.util.Arrays; +import java.util.Collections; import static org.elasticsearch.env.Environment.PATH_HOME_SETTING; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.startsWith; import static org.mockito.Mockito.mock; public class MockEventuallyConsistentRepositoryTests extends ESTestCase { @@ -85,7 +89,7 @@ public void testReadAfterWriteAfterReadThrows() throws IOException { final String blobName = randomAlphaOfLength(10); final int lengthWritten = randomIntBetween(1, 100); final byte[] blobData = randomByteArrayOfLength(lengthWritten); - assertMissing(blobContainer, blobName); + expectThrows(NoSuchFileException.class, () -> blobContainer.readBlob(blobName)); blobContainer.writeBlob(blobName, new ByteArrayInputStream(blobData), lengthWritten, true); assertThrowsOnInconsistentRead(blobContainer, blobName); } @@ -105,22 +109,73 @@ public void testReadAfterDeleteAfterWriteThrows() throws IOException { blobContainer.deleteBlob(blobName); assertThrowsOnInconsistentRead(blobContainer, blobName); blobStoreContext.forceConsistent(); - assertMissing(blobContainer, blobName); + expectThrows(NoSuchFileException.class, () -> blobContainer.readBlob(blobName)); } } - private static void assertThrowsOnInconsistentRead(BlobContainer blobContainer, String blobName) throws IOException { - try (InputStream in = blobContainer.readBlob(blobName)) { - fail("Inconsistent read should throw"); - } catch (AssertionError assertionError) { - assertThat(assertionError.getMessage(), equalTo("Inconsistent read on [" + blobName + ']')); + public void testOverwriteRandomBlobFails() throws IOException { + MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); + try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( + new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), environment, + xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) { + repository.start(); + final BlobContainer container = repository.blobStore().blobContainer(repository.basePath()); + final String blobName = randomAlphaOfLength(10); + final int lengthWritten = randomIntBetween(1, 100); + final byte[] blobData = randomByteArrayOfLength(lengthWritten); + container.writeBlob(blobName, new ByteArrayInputStream(blobData), lengthWritten, false); + final AssertionError assertionError = expectThrows(AssertionError.class, + () -> container.writeBlob(blobName, new ByteArrayInputStream(blobData), lengthWritten - 1, false)); + assertThat(assertionError.getMessage(), startsWith("Tried to overwrite blob [" + blobName +"]")); + } + } + + public void testOverwriteShardSnapBlobFails() throws IOException { + MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); + try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( + new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), environment, + xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) { + repository.start(); + final BlobContainer container = + repository.blobStore().blobContainer(repository.basePath().add("indices").add("someindex").add("0")); + final String blobName = BlobStoreRepository.SNAPSHOT_PREFIX + UUIDs.randomBase64UUID(); + final int lengthWritten = randomIntBetween(1, 100); + final byte[] blobData = randomByteArrayOfLength(lengthWritten); + container.writeBlob(blobName, new ByteArrayInputStream(blobData), lengthWritten, false); + final AssertionError assertionError = expectThrows(AssertionError.class, + () -> container.writeBlob(blobName, new ByteArrayInputStream(blobData), lengthWritten, false)); + assertThat(assertionError.getMessage(), equalTo("Shard level snap-{uuid} blobs should never be overwritten")); } } - private static void assertMissing(BlobContainer container, String blobName) throws IOException { - try (InputStream in = container.readBlob(blobName)) { - fail("Reading a non-existent blob should throw"); - } catch (NoSuchFileException expected) { + public void testOverwriteSnapshotInfoBlob() { + MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); + try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( + new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), environment, + xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) { + repository.start(); + + // We create a snap- blob for snapshot "foo" in the first generation + final SnapshotId snapshotId = new SnapshotId("foo", UUIDs.randomBase64UUID()); + repository.finalizeSnapshot(snapshotId, Collections.emptyList(), 1L, null, 5, Collections.emptyList(), + -1L, false, Collections.emptyMap()); + + // We try to write another snap- blob for "foo" in the next generation. It fails because the content differs. + final AssertionError assertionError = expectThrows(AssertionError.class, + () -> repository.finalizeSnapshot( + snapshotId, Collections.emptyList(), 1L, null, 6, Collections.emptyList(), + 0, false, Collections.emptyMap())); + assertThat(assertionError.getMessage(), equalTo("\nExpected: <6>\n but: was <5>")); + + // We try to write yet another snap- blob for "foo" in the next generation. + // It passes cleanly because the content of the blob except for the timestamps. + repository.finalizeSnapshot(snapshotId, Collections.emptyList(), 1L, null, 5, Collections.emptyList(), + 0, false, Collections.emptyMap()); } } + + private static void assertThrowsOnInconsistentRead(BlobContainer blobContainer, String blobName) { + final AssertionError assertionError = expectThrows(AssertionError.class, () -> blobContainer.readBlob(blobName)); + assertThat(assertionError.getMessage(), equalTo("Inconsistent read on [" + blobName + ']')); + } } From 0afcaa30d2b84faf4d20ab3db1d8d6fad93c6bb5 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 17 Jul 2019 09:27:33 +0200 Subject: [PATCH 18/18] blobExists is gone :) --- .../mockstore/MockEventuallyConsistentRepository.java | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java index 7c1f8e4b8cd05..d21f3db81e69c 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java @@ -174,17 +174,6 @@ public BlobPath path() { return path; } - @Override - public boolean blobExists(String blobName) { - ensureNotClosed(); - try { - readBlob(blobName); - return true; - } catch (NoSuchFileException e) { - return false; - } - } - @Override public InputStream readBlob(String name) throws NoSuchFileException { ensureNotClosed();