Skip to content

Commit f429359

Browse files
Add CoolDown Period to S3 Repository (#51074)
Add cool down period after snapshot finalization and delete to prevent eventually consistent AWS S3 from corrupting shard level metadata as long as the repository is using the old format metadata on the shard level.
1 parent 97b12c1 commit f429359

File tree

2 files changed

+169
-1
lines changed

2 files changed

+169
-1
lines changed

plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java

Lines changed: 115 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121

2222
import org.apache.logging.log4j.LogManager;
2323
import org.apache.logging.log4j.Logger;
24+
import org.elasticsearch.action.ActionListener;
25+
import org.elasticsearch.action.ActionRunnable;
26+
import org.elasticsearch.cluster.metadata.MetaData;
2427
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
2528
import org.elasticsearch.cluster.service.ClusterService;
2629
import org.elasticsearch.common.Strings;
@@ -29,11 +32,23 @@
2932
import org.elasticsearch.common.settings.Setting;
3033
import org.elasticsearch.common.unit.ByteSizeUnit;
3134
import org.elasticsearch.common.unit.ByteSizeValue;
35+
import org.elasticsearch.common.unit.TimeValue;
3236
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
3337
import org.elasticsearch.monitor.jvm.JvmInfo;
3438
import org.elasticsearch.repositories.RepositoryException;
39+
import org.elasticsearch.repositories.ShardGenerations;
3540
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
36-
41+
import org.elasticsearch.snapshots.SnapshotId;
42+
import org.elasticsearch.snapshots.SnapshotInfo;
43+
import org.elasticsearch.snapshots.SnapshotShardFailure;
44+
import org.elasticsearch.snapshots.SnapshotsService;
45+
import org.elasticsearch.threadpool.Scheduler;
46+
import org.elasticsearch.threadpool.ThreadPool;
47+
48+
import java.util.List;
49+
import java.util.Map;
50+
import java.util.concurrent.TimeUnit;
51+
import java.util.concurrent.atomic.AtomicReference;
3752
import java.util.function.Function;
3853

3954
/**
@@ -126,6 +141,23 @@ class S3Repository extends BlobStoreRepository {
126141

127142
static final Setting<String> CLIENT_NAME = new Setting<>("client", "default", Function.identity());
128143

144+
/**
145+
* Artificial delay to introduce after a snapshot finalization or delete has finished so long as the repository is still using the
146+
* backwards compatible snapshot format from before
147+
* {@link org.elasticsearch.snapshots.SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION} ({@link org.elasticsearch.Version#V_7_6_0}).
148+
* This delay is necessary so that the eventually consistent nature of AWS S3 does not randomly result in repository corruption when
149+
* doing repository operations in rapid succession on a repository in the old metadata format.
150+
* This setting should not be adjusted in production when working with an AWS S3 backed repository. Doing so risks the repository
151+
* becoming silently corrupted. To get rid of this waiting period, either create a new S3 repository or remove all snapshots older than
152+
* {@link org.elasticsearch.Version#V_7_6_0} from the repository which will trigger an upgrade of the repository metadata to the new
153+
* format and disable the cooldown period.
154+
*/
155+
static final Setting<TimeValue> COOLDOWN_PERIOD = Setting.timeSetting(
156+
"cooldown_period",
157+
new TimeValue(3, TimeUnit.MINUTES),
158+
new TimeValue(0, TimeUnit.MILLISECONDS),
159+
Setting.Property.Dynamic);
160+
129161
/**
130162
* Specifies the path within bucket to repository data. Defaults to root directory.
131163
*/
@@ -145,6 +177,12 @@ class S3Repository extends BlobStoreRepository {
145177

146178
private final String cannedACL;
147179

180+
/**
181+
* Time period to delay repository operations by after finalizing or deleting a snapshot.
182+
* See {@link #COOLDOWN_PERIOD} for details.
183+
*/
184+
private final TimeValue coolDown;
185+
148186
/**
149187
* Constructs an s3 backed repository
150188
*/
@@ -176,6 +214,8 @@ class S3Repository extends BlobStoreRepository {
176214
this.storageClass = STORAGE_CLASS_SETTING.get(metadata.settings());
177215
this.cannedACL = CANNED_ACL_SETTING.get(metadata.settings());
178216

217+
coolDown = COOLDOWN_PERIOD.get(metadata.settings());
218+
179219
logger.debug(
180220
"using bucket [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], cannedACL [{}], storageClass [{}]",
181221
bucket,
@@ -186,6 +226,70 @@ class S3Repository extends BlobStoreRepository {
186226
storageClass);
187227
}
188228

229+
/**
230+
* Holds a reference to delayed repository operation {@link Scheduler.Cancellable} so it can be cancelled should the repository be
231+
* closed concurrently.
232+
*/
233+
private final AtomicReference<Scheduler.Cancellable> finalizationFuture = new AtomicReference<>();
234+
235+
@Override
236+
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards,
237+
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
238+
MetaData clusterMetaData, Map<String, Object> userMetadata, boolean writeShardGens,
239+
ActionListener<SnapshotInfo> listener) {
240+
if (writeShardGens == false) {
241+
listener = delayedListener(listener);
242+
}
243+
super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId,
244+
includeGlobalState, clusterMetaData, userMetadata, writeShardGens, listener);
245+
}
246+
247+
@Override
248+
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener<Void> listener) {
249+
if (writeShardGens == false) {
250+
listener = delayedListener(listener);
251+
}
252+
super.deleteSnapshot(snapshotId, repositoryStateId, writeShardGens, listener);
253+
}
254+
255+
/**
256+
* Wraps given listener such that it is executed with a delay of {@link #coolDown} on the snapshot thread-pool after being invoked.
257+
* See {@link #COOLDOWN_PERIOD} for details.
258+
*/
259+
private <T> ActionListener<T> delayedListener(ActionListener<T> listener) {
260+
final ActionListener<T> wrappedListener = ActionListener.runBefore(listener, () -> {
261+
final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null);
262+
assert cancellable != null;
263+
});
264+
return new ActionListener<>() {
265+
@Override
266+
public void onResponse(T response) {
267+
logCooldownInfo();
268+
final Scheduler.Cancellable existing = finalizationFuture.getAndSet(
269+
threadPool.schedule(ActionRunnable.wrap(wrappedListener, l -> l.onResponse(response)),
270+
coolDown, ThreadPool.Names.SNAPSHOT));
271+
assert existing == null : "Already have an ongoing finalization " + finalizationFuture;
272+
}
273+
274+
@Override
275+
public void onFailure(Exception e) {
276+
logCooldownInfo();
277+
final Scheduler.Cancellable existing = finalizationFuture.getAndSet(
278+
threadPool.schedule(ActionRunnable.wrap(wrappedListener, l -> l.onFailure(e)), coolDown, ThreadPool.Names.SNAPSHOT));
279+
assert existing == null : "Already have an ongoing finalization " + finalizationFuture;
280+
}
281+
};
282+
}
283+
284+
private void logCooldownInfo() {
285+
logger.info("Sleeping for [{}] after modifying repository [{}] because it contains snapshots older than version [{}]" +
286+
" and therefore is using a backwards compatible metadata format that requires this cooldown period to avoid " +
287+
"repository corruption. To get rid of this message and move to the new repository metadata format, either remove " +
288+
"all snapshots older than version [{}] from the repository or create a new repository at an empty location.",
289+
coolDown, metadata.name(), SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION,
290+
SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION);
291+
}
292+
189293
private static BlobPath buildBasePath(RepositoryMetaData metadata) {
190294
final String basePath = BASE_PATH_SETTING.get(metadata.settings());
191295
if (Strings.hasLength(basePath)) {
@@ -210,4 +314,14 @@ protected BlobStore getBlobStore() {
210314
protected ByteSizeValue chunkSize() {
211315
return chunkSize;
212316
}
317+
318+
@Override
319+
protected void doClose() {
320+
final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null);
321+
if (cancellable != null) {
322+
logger.debug("Repository [{}] closed during cool-down period", metadata.name());
323+
cancellable.cancel();
324+
}
325+
super.doClose();
326+
}
213327
}

plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,30 +22,48 @@
2222
import com.sun.net.httpserver.HttpExchange;
2323
import com.sun.net.httpserver.HttpHandler;
2424
import fixture.s3.S3HttpHandler;
25+
import org.elasticsearch.action.ActionRunnable;
26+
import org.elasticsearch.action.support.PlainActionFuture;
2527
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
2628
import org.elasticsearch.cluster.service.ClusterService;
2729
import org.elasticsearch.common.SuppressForbidden;
2830
import org.elasticsearch.common.blobstore.BlobContainer;
2931
import org.elasticsearch.common.blobstore.BlobPath;
3032
import org.elasticsearch.common.blobstore.BlobStore;
33+
import org.elasticsearch.common.bytes.BytesReference;
3134
import org.elasticsearch.common.settings.MockSecureSettings;
3235
import org.elasticsearch.common.settings.Setting;
3336
import org.elasticsearch.common.settings.Settings;
3437
import org.elasticsearch.common.unit.ByteSizeUnit;
38+
import org.elasticsearch.common.unit.TimeValue;
3539
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
40+
import org.elasticsearch.common.xcontent.XContentFactory;
3641
import org.elasticsearch.plugins.Plugin;
42+
import org.elasticsearch.repositories.RepositoriesService;
43+
import org.elasticsearch.repositories.RepositoryData;
44+
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
3745
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
46+
import org.elasticsearch.snapshots.SnapshotId;
47+
import org.elasticsearch.snapshots.SnapshotsService;
3848
import org.elasticsearch.snapshots.mockstore.BlobStoreWrapper;
49+
import org.elasticsearch.threadpool.ThreadPool;
3950

51+
import java.io.IOException;
52+
import java.io.InputStream;
4053
import java.util.ArrayList;
4154
import java.util.Collection;
4255
import java.util.Collections;
4356
import java.util.List;
4457
import java.util.Map;
4558

59+
import static org.hamcrest.Matchers.greaterThan;
60+
import static org.hamcrest.Matchers.lessThan;
61+
4662
@SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint")
4763
public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase {
4864

65+
private static final TimeValue TEST_COOLDOWN_PERIOD = TimeValue.timeValueSeconds(5L);
66+
4967
@Override
5068
protected String repositoryType() {
5169
return S3Repository.TYPE;
@@ -82,6 +100,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
82100
secureSettings.setString(S3ClientSettings.SECRET_KEY_SETTING.getConcreteSettingForNamespace("test").getKey(), "secret");
83101

84102
return Settings.builder()
103+
.put(ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.getKey(), 0) // We have tests that verify an exact wait time
85104
.put(S3ClientSettings.ENDPOINT_SETTING.getConcreteSettingForNamespace("test").getKey(), httpServerUrl())
86105
// Disable chunked encoding as it simplifies a lot the request parsing on the httpServer side
87106
.put(S3ClientSettings.DISABLE_CHUNKED_ENCODING.getConcreteSettingForNamespace("test").getKey(), true)
@@ -92,6 +111,41 @@ protected Settings nodeSettings(int nodeOrdinal) {
92111
.build();
93112
}
94113

114+
public void testEnforcedCooldownPeriod() throws IOException {
115+
final String repoName = createRepository(randomName(), Settings.builder().put(repositorySettings())
116+
.put(S3Repository.COOLDOWN_PERIOD.getKey(), TEST_COOLDOWN_PERIOD).build());
117+
118+
final SnapshotId fakeOldSnapshot = client().admin().cluster().prepareCreateSnapshot(repoName, "snapshot-old")
119+
.setWaitForCompletion(true).setIndices().get().getSnapshotInfo().snapshotId();
120+
final RepositoriesService repositoriesService = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class);
121+
final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repoName);
122+
final RepositoryData repositoryData =
123+
PlainActionFuture.get(f -> repository.threadPool().generic().execute(() -> repository.getRepositoryData(f)));
124+
final RepositoryData modifiedRepositoryData = repositoryData.withVersions(Collections.singletonMap(fakeOldSnapshot,
125+
SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.minimumCompatibilityVersion()));
126+
final BytesReference serialized =
127+
BytesReference.bytes(modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), false));
128+
PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.run(f, () -> {
129+
try (InputStream stream = serialized.streamInput()) {
130+
repository.blobStore().blobContainer(repository.basePath()).writeBlobAtomic(
131+
BlobStoreRepository.INDEX_FILE_PREFIX + modifiedRepositoryData.getGenId(), stream, serialized.length(), true);
132+
}
133+
})));
134+
135+
final String newSnapshotName = "snapshot-new";
136+
final long beforeThrottledSnapshot = repository.threadPool().relativeTimeInNanos();
137+
client().admin().cluster().prepareCreateSnapshot(repoName, newSnapshotName).setWaitForCompletion(true).setIndices().get();
138+
assertThat(repository.threadPool().relativeTimeInNanos() - beforeThrottledSnapshot, greaterThan(TEST_COOLDOWN_PERIOD.getNanos()));
139+
140+
final long beforeThrottledDelete = repository.threadPool().relativeTimeInNanos();
141+
client().admin().cluster().prepareDeleteSnapshot(repoName, newSnapshotName).get();
142+
assertThat(repository.threadPool().relativeTimeInNanos() - beforeThrottledDelete, greaterThan(TEST_COOLDOWN_PERIOD.getNanos()));
143+
144+
final long beforeFastDelete = repository.threadPool().relativeTimeInNanos();
145+
client().admin().cluster().prepareDeleteSnapshot(repoName, fakeOldSnapshot.getName()).get();
146+
assertThat(repository.threadPool().relativeTimeInNanos() - beforeFastDelete, lessThan(TEST_COOLDOWN_PERIOD.getNanos()));
147+
}
148+
95149
/**
96150
* S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload.
97151
*/

0 commit comments

Comments
 (0)