2121
2222import org .apache .logging .log4j .LogManager ;
2323import org .apache .logging .log4j .Logger ;
24+ import org .elasticsearch .action .ActionListener ;
25+ import org .elasticsearch .action .ActionRunnable ;
26+ import org .elasticsearch .cluster .metadata .MetaData ;
2427import org .elasticsearch .cluster .metadata .RepositoryMetaData ;
2528import org .elasticsearch .cluster .service .ClusterService ;
2629import org .elasticsearch .common .Strings ;
3235import org .elasticsearch .common .settings .Setting ;
3336import org .elasticsearch .common .unit .ByteSizeUnit ;
3437import org .elasticsearch .common .unit .ByteSizeValue ;
38+ import org .elasticsearch .common .unit .TimeValue ;
3539import org .elasticsearch .common .xcontent .NamedXContentRegistry ;
3640import org .elasticsearch .monitor .jvm .JvmInfo ;
3741import org .elasticsearch .repositories .RepositoryException ;
42+ import org .elasticsearch .repositories .ShardGenerations ;
3843import org .elasticsearch .repositories .blobstore .BlobStoreRepository ;
39-
44+ import org .elasticsearch .snapshots .SnapshotId ;
45+ import org .elasticsearch .snapshots .SnapshotInfo ;
46+ import org .elasticsearch .snapshots .SnapshotShardFailure ;
47+ import org .elasticsearch .snapshots .SnapshotsService ;
48+ import org .elasticsearch .threadpool .Scheduler ;
49+ import org .elasticsearch .threadpool .ThreadPool ;
50+
51+ import java .util .List ;
52+ import java .util .Map ;
53+ import java .util .concurrent .TimeUnit ;
54+ import java .util .concurrent .atomic .AtomicReference ;
4055import java .util .function .Function ;
4156
4257/**
@@ -142,6 +157,23 @@ class S3Repository extends BlobStoreRepository {
142157
143158 static final Setting <String > CLIENT_NAME = new Setting <>("client" , "default" , Function .identity ());
144159
160+ /**
161+ * Artificial delay to introduce after a snapshot finalization or delete has finished so long as the repository is still using the
162+ * backwards compatible snapshot format from before
163+ * {@link org.elasticsearch.snapshots.SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION} ({@link org.elasticsearch.Version#V_7_6_0}).
164+ * This delay is necessary so that the eventually consistent nature of AWS S3 does not randomly result in repository corruption when
165+ * doing repository operations in rapid succession on a repository in the old metadata format.
166+ * This setting should not be adjusted in production when working with an AWS S3 backed repository. Doing so risks the repository
167+ * becoming silently corrupted. To get rid of this waiting period, either create a new S3 repository or remove all snapshots older than
168+ * {@link org.elasticsearch.Version#V_7_6_0} from the repository which will trigger an upgrade of the repository metadata to the new
169+ * format and disable the cooldown period.
170+ */
171+ static final Setting <TimeValue > COOLDOWN_PERIOD = Setting .timeSetting (
172+ "cooldown_period" ,
173+ new TimeValue (3 , TimeUnit .MINUTES ),
174+ new TimeValue (0 , TimeUnit .MILLISECONDS ),
175+ Setting .Property .Dynamic );
176+
145177 /**
146178 * Specifies the path within bucket to repository data. Defaults to root directory.
147179 */
@@ -165,6 +197,12 @@ class S3Repository extends BlobStoreRepository {
165197
166198 private final RepositoryMetaData repositoryMetaData ;
167199
200+ /**
201+ * Time period to delay repository operations by after finalizing or deleting a snapshot.
202+ * See {@link #COOLDOWN_PERIOD} for details.
203+ */
204+ private final TimeValue coolDown ;
205+
168206 /**
169207 * Constructs an s3 backed repository
170208 */
@@ -211,6 +249,8 @@ class S3Repository extends BlobStoreRepository {
211249 + "store these in named clients and the elasticsearch keystore for secure settings." );
212250 }
213251
252+ coolDown = COOLDOWN_PERIOD .get (metadata .settings ());
253+
214254 logger .debug (
215255 "using bucket [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], cannedACL [{}], storageClass [{}]" ,
216256 bucket ,
@@ -221,6 +261,70 @@ class S3Repository extends BlobStoreRepository {
221261 storageClass );
222262 }
223263
264+ /**
265+ * Holds a reference to delayed repository operation {@link Scheduler.Cancellable} so it can be cancelled should the repository be
266+ * closed concurrently.
267+ */
268+ private final AtomicReference <Scheduler .Cancellable > finalizationFuture = new AtomicReference <>();
269+
270+ @ Override
271+ public void finalizeSnapshot (SnapshotId snapshotId , ShardGenerations shardGenerations , long startTime , String failure , int totalShards ,
272+ List <SnapshotShardFailure > shardFailures , long repositoryStateId , boolean includeGlobalState ,
273+ MetaData clusterMetaData , Map <String , Object > userMetadata , boolean writeShardGens ,
274+ ActionListener <SnapshotInfo > listener ) {
275+ if (writeShardGens == false ) {
276+ listener = delayedListener (listener );
277+ }
278+ super .finalizeSnapshot (snapshotId , shardGenerations , startTime , failure , totalShards , shardFailures , repositoryStateId ,
279+ includeGlobalState , clusterMetaData , userMetadata , writeShardGens , listener );
280+ }
281+
282+ @ Override
283+ public void deleteSnapshot (SnapshotId snapshotId , long repositoryStateId , boolean writeShardGens , ActionListener <Void > listener ) {
284+ if (writeShardGens == false ) {
285+ listener = delayedListener (listener );
286+ }
287+ super .deleteSnapshot (snapshotId , repositoryStateId , writeShardGens , listener );
288+ }
289+
290+ /**
291+ * Wraps given listener such that it is executed with a delay of {@link #coolDown} on the snapshot thread-pool after being invoked.
292+ * See {@link #COOLDOWN_PERIOD} for details.
293+ */
294+ private <T > ActionListener <T > delayedListener (ActionListener <T > listener ) {
295+ final ActionListener <T > wrappedListener = ActionListener .runBefore (listener , () -> {
296+ final Scheduler .Cancellable cancellable = finalizationFuture .getAndSet (null );
297+ assert cancellable != null ;
298+ });
299+ return new ActionListener <T >() {
300+ @ Override
301+ public void onResponse (T response ) {
302+ logCooldownInfo ();
303+ final Scheduler .Cancellable existing = finalizationFuture .getAndSet (
304+ threadPool .schedule (ActionRunnable .wrap (wrappedListener , l -> l .onResponse (response )),
305+ coolDown , ThreadPool .Names .SNAPSHOT ));
306+ assert existing == null : "Already have an ongoing finalization " + finalizationFuture ;
307+ }
308+
309+ @ Override
310+ public void onFailure (Exception e ) {
311+ logCooldownInfo ();
312+ final Scheduler .Cancellable existing = finalizationFuture .getAndSet (
313+ threadPool .schedule (ActionRunnable .wrap (wrappedListener , l -> l .onFailure (e )), coolDown , ThreadPool .Names .SNAPSHOT ));
314+ assert existing == null : "Already have an ongoing finalization " + finalizationFuture ;
315+ }
316+ };
317+ }
318+
319+ private void logCooldownInfo () {
320+ logger .info ("Sleeping for [{}] after modifying repository [{}] because it contains snapshots older than version [{}]" +
321+ " and therefore is using a backwards compatible metadata format that requires this cooldown period to avoid " +
322+ "repository corruption. To get rid of this message and move to the new repository metadata format, either remove " +
323+ "all snapshots older than version [{}] from the repository or create a new repository at an empty location." ,
324+ coolDown , metadata .name (), SnapshotsService .SHARD_GEN_IN_REPO_DATA_VERSION ,
325+ SnapshotsService .SHARD_GEN_IN_REPO_DATA_VERSION );
326+ }
327+
224328 @ Override
225329 protected S3BlobStore createBlobStore () {
226330 return new S3BlobStore (service , bucket , serverSideEncryption , bufferSize , cannedACL , storageClass , repositoryMetaData );
@@ -241,4 +345,14 @@ public BlobPath basePath() {
241345 protected ByteSizeValue chunkSize () {
242346 return chunkSize ;
243347 }
348+
349+ @ Override
350+ protected void doClose () {
351+ final Scheduler .Cancellable cancellable = finalizationFuture .getAndSet (null );
352+ if (cancellable != null ) {
353+ logger .debug ("Repository [{}] closed during cool-down period" , metadata .name ());
354+ cancellable .cancel ();
355+ }
356+ super .doClose ();
357+ }
244358}
0 commit comments