Skip to content

Commit daef544

Browse files
committed
Snapshot of a searchable snapshot should be empty (#66162)
Today if you take a snapshot of a searchable snapshot index then we treat it like a normal index and copy (any unchanged parts of) its contents the the repository. This is often a complete copy, doubling the snapshot storage required, since a searchable snapshot index typically has a different name from the original index; it may also be that we are taking a snapshot into a different repository. The content of a searchable snapshot is already held in a snapshot, and its index metadata indicates how to find this content, so it is wasteful to copy anything new into the repository. This commit adjusts things so that a searchable snapshot shard presents itself to the snapshotter as if it contained no segments, and adjusts things to account for the consequent mismatch at restore time. Closes #66110
1 parent 325239b commit daef544

File tree

41 files changed

+525
-107
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+525
-107
lines changed

server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.elasticsearch.env.NodeEnvironment;
5959
import org.elasticsearch.env.ShardLock;
6060
import org.elasticsearch.index.Index;
61+
import org.elasticsearch.index.IndexModule;
6162
import org.elasticsearch.index.IndexService;
6263
import org.elasticsearch.index.IndexSettings;
6364
import org.elasticsearch.index.VersionType;
@@ -673,7 +674,8 @@ public static final IndexShard newIndexShard(
673674
Arrays.asList(listeners),
674675
() -> {},
675676
RetentionLeaseSyncer.EMPTY,
676-
cbs);
677+
cbs,
678+
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER);
677679
}
678680

679681
private static ShardRouting getInitializingShardRouting(ShardRouting existingShardRouting) {

server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) {
6868
config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(),
6969
config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(),
7070
config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
71-
config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier());
71+
config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier(), config.getSnapshotCommitSupplier());
7272
}
7373

7474
@Override

server/src/main/java/org/elasticsearch/index/IndexModule.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -414,13 +414,16 @@ public IndexService newIndexService(IndexService.IndexCreationContext indexCreat
414414
NamedWriteableRegistry namedWriteableRegistry,
415415
BooleanSupplier idFieldDataEnabled,
416416
ValuesSourceRegistry valuesSourceRegistry,
417-
IndexStorePlugin.IndexFoldersDeletionListener indexFoldersDeletionListener) throws IOException {
417+
IndexStorePlugin.IndexFoldersDeletionListener indexFoldersDeletionListener,
418+
Map<String, IndexStorePlugin.SnapshotCommitSupplier> snapshotCommitSuppliers) throws IOException {
418419
final IndexEventListener eventListener = freeze();
419420
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory =
420421
indexReaderWrapper.get() == null ? (shard) -> null : indexReaderWrapper.get();
421422
eventListener.beforeIndexCreated(indexSettings.getIndex(), indexSettings.getSettings());
422423
final IndexStorePlugin.DirectoryFactory directoryFactory = getDirectoryFactory(indexSettings, directoryFactories);
423424
final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory = getRecoveryStateFactory(indexSettings, recoveryStateFactories);
425+
final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier
426+
= getSnapshotCommitSupplier(indexSettings, snapshotCommitSuppliers);
424427
QueryCache queryCache = null;
425428
IndexAnalyzers indexAnalyzers = null;
426429
boolean success = false;
@@ -443,7 +446,7 @@ public IndexService newIndexService(IndexService.IndexCreationContext indexCreat
443446
engineFactory, circuitBreakerService, bigArrays, threadPool, scriptService, clusterService, client, queryCache,
444447
directoryFactory, eventListener, readerWrapperFactory, mapperRegistry, indicesFieldDataCache, searchOperationListeners,
445448
indexOperationListeners, namedWriteableRegistry, idFieldDataEnabled, allowExpensiveQueries, expressionResolver,
446-
valuesSourceRegistry, recoveryStateFactory, indexFoldersDeletionListener);
449+
valuesSourceRegistry, recoveryStateFactory, indexFoldersDeletionListener, snapshotCommitSupplier);
447450
success = true;
448451
return indexService;
449452
} finally {
@@ -498,6 +501,18 @@ private static IndexStorePlugin.RecoveryStateFactory getRecoveryStateFactory(
498501
return factory;
499502
}
500503

504+
public static final IndexStorePlugin.SnapshotCommitSupplier DEFAULT_SNAPSHOT_COMMIT_SUPPLIER
505+
= e -> e.acquireLastIndexCommit(true); // by default we flush first so that the snapshot is as up-to-date as possible.
506+
507+
private static IndexStorePlugin.SnapshotCommitSupplier getSnapshotCommitSupplier(
508+
final IndexSettings indexSettings,
509+
final Map<String, IndexStorePlugin.SnapshotCommitSupplier> snapshotCommitSuppliers) {
510+
final String storeType = indexSettings.getValue(INDEX_STORE_TYPE_SETTING);
511+
// we check that storeType refers to a valid store type in getDirectoryFactory() so there's no need for strictness here too.
512+
final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier = snapshotCommitSuppliers.get(storeType);
513+
return snapshotCommitSupplier == null ? DEFAULT_SNAPSHOT_COMMIT_SUPPLIER : snapshotCommitSupplier;
514+
}
515+
501516
/**
502517
* creates a new mapper service to do administrative work like mapping updates. This *should not* be used for document parsing.
503518
* doing so will result in an exception.

server/src/main/java/org/elasticsearch/index/IndexService.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
119119
private final IndexStorePlugin.IndexFoldersDeletionListener indexFoldersDeletionListener;
120120
private final IndexStorePlugin.DirectoryFactory directoryFactory;
121121
private final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory;
122+
private final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier;
122123
private final CheckedFunction<DirectoryReader, DirectoryReader, IOException> readerWrapper;
123124
private final IndexCache indexCache;
124125
private final MapperService mapperService;
@@ -181,8 +182,8 @@ public IndexService(
181182
IndexNameExpressionResolver expressionResolver,
182183
ValuesSourceRegistry valuesSourceRegistry,
183184
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory,
184-
IndexStorePlugin.IndexFoldersDeletionListener indexFoldersDeletionListener
185-
) {
185+
IndexStorePlugin.IndexFoldersDeletionListener indexFoldersDeletionListener,
186+
IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier) {
186187
super(indexSettings);
187188
this.allowExpensiveQueries = allowExpensiveQueries;
188189
this.indexSettings = indexSettings;
@@ -192,6 +193,7 @@ public IndexService(
192193
this.circuitBreakerService = circuitBreakerService;
193194
this.expressionResolver = expressionResolver;
194195
this.valuesSourceRegistry = valuesSourceRegistry;
196+
this.snapshotCommitSupplier = snapshotCommitSupplier;
195197
if (needsMapperService(indexSettings, indexCreationContext)) {
196198
assert indexAnalyzers != null;
197199
this.mapperService = new MapperService(indexSettings, indexAnalyzers, xContentRegistry, similarityService, mapperRegistry,
@@ -485,7 +487,8 @@ public synchronized IndexShard createShard(
485487
indexingOperationListeners,
486488
() -> globalCheckpointSyncer.accept(shardId),
487489
retentionLeaseSyncer,
488-
circuitBreakerService);
490+
circuitBreakerService,
491+
snapshotCommitSupplier);
489492
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
490493
eventListener.afterIndexShardCreated(indexShard);
491494
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1112,6 +1112,13 @@ public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyE
11121112
*/
11131113
public abstract IndexCommitRef acquireSafeIndexCommit() throws EngineException;
11141114

1115+
/**
1116+
* Acquires the index commit that should be included in a snapshot.
1117+
*/
1118+
public final IndexCommitRef acquireIndexCommitForSnapshot() throws EngineException {
1119+
return engineConfig.getSnapshotCommitSupplier().acquireIndexCommitForSnapshot(this);
1120+
}
1121+
11151122
/**
11161123
* @return a summary of the contents of the current safe commit
11171124
*/

server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.elasticsearch.index.translog.TranslogConfig;
4242
import org.elasticsearch.indices.IndexingMemoryController;
4343
import org.elasticsearch.indices.breaker.CircuitBreakerService;
44+
import org.elasticsearch.plugins.IndexStorePlugin;
4445
import org.elasticsearch.threadpool.ThreadPool;
4546

4647
import java.util.List;
@@ -60,6 +61,7 @@ public final class EngineConfig {
6061
private volatile boolean enableGcDeletes = true;
6162
private final TimeValue flushMergesAfter;
6263
private final String codecName;
64+
private final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier;
6365
private final ThreadPool threadPool;
6466
private final Engine.Warmer warmer;
6567
private final Store store;
@@ -130,18 +132,30 @@ public Supplier<RetentionLeases> retentionLeasesSupplier() {
130132
/**
131133
* Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
132134
*/
133-
public EngineConfig(ShardId shardId, ThreadPool threadPool,
134-
IndexSettings indexSettings, Engine.Warmer warmer, Store store,
135-
MergePolicy mergePolicy, Analyzer analyzer,
136-
Similarity similarity, CodecService codecService, Engine.EventListener eventListener,
137-
QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
138-
TranslogConfig translogConfig, TimeValue flushMergesAfter,
139-
List<ReferenceManager.RefreshListener> externalRefreshListener,
140-
List<ReferenceManager.RefreshListener> internalRefreshListener, Sort indexSort,
141-
CircuitBreakerService circuitBreakerService, LongSupplier globalCheckpointSupplier,
142-
Supplier<RetentionLeases> retentionLeasesSupplier,
143-
LongSupplier primaryTermSupplier,
144-
TombstoneDocSupplier tombstoneDocSupplier) {
135+
public EngineConfig(
136+
ShardId shardId,
137+
ThreadPool threadPool,
138+
IndexSettings indexSettings,
139+
Engine.Warmer warmer,
140+
Store store,
141+
MergePolicy mergePolicy,
142+
Analyzer analyzer,
143+
Similarity similarity,
144+
CodecService codecService,
145+
Engine.EventListener eventListener,
146+
QueryCache queryCache,
147+
QueryCachingPolicy queryCachingPolicy,
148+
TranslogConfig translogConfig,
149+
TimeValue flushMergesAfter,
150+
List<ReferenceManager.RefreshListener> externalRefreshListener,
151+
List<ReferenceManager.RefreshListener> internalRefreshListener,
152+
Sort indexSort,
153+
CircuitBreakerService circuitBreakerService,
154+
LongSupplier globalCheckpointSupplier,
155+
Supplier<RetentionLeases> retentionLeasesSupplier,
156+
LongSupplier primaryTermSupplier,
157+
TombstoneDocSupplier tombstoneDocSupplier,
158+
IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier) {
145159
this.shardId = shardId;
146160
this.indexSettings = indexSettings;
147161
this.threadPool = threadPool;
@@ -179,6 +193,7 @@ public EngineConfig(ShardId shardId, ThreadPool threadPool,
179193
this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier);
180194
this.primaryTermSupplier = primaryTermSupplier;
181195
this.tombstoneDocSupplier = tombstoneDocSupplier;
196+
this.snapshotCommitSupplier = snapshotCommitSupplier;
182197
}
183198

184199
/**
@@ -388,4 +403,8 @@ public interface TombstoneDocSupplier {
388403
public TombstoneDocSupplier getTombstoneDocSupplier() {
389404
return tombstoneDocSupplier;
390405
}
406+
407+
public IndexStorePlugin.SnapshotCommitSupplier getSnapshotCommitSupplier() {
408+
return snapshotCommitSupplier;
409+
}
391410
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@
152152
import org.elasticsearch.indices.recovery.RecoveryFailedException;
153153
import org.elasticsearch.indices.recovery.RecoveryState;
154154
import org.elasticsearch.indices.recovery.RecoveryTarget;
155+
import org.elasticsearch.plugins.IndexStorePlugin;
155156
import org.elasticsearch.repositories.RepositoriesService;
156157
import org.elasticsearch.repositories.Repository;
157158
import org.elasticsearch.rest.RestStatus;
@@ -226,6 +227,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
226227
private final GlobalCheckpointListeners globalCheckpointListeners;
227228
private final PendingReplicationActions pendingReplicationActions;
228229
private final ReplicationTracker replicationTracker;
230+
private final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier;
229231

230232
protected volatile ShardRouting shardRouting;
231233
protected volatile IndexShardState state;
@@ -306,7 +308,8 @@ public IndexShard(
306308
final List<IndexingOperationListener> listeners,
307309
final Runnable globalCheckpointSyncer,
308310
final RetentionLeaseSyncer retentionLeaseSyncer,
309-
final CircuitBreakerService circuitBreakerService) throws IOException {
311+
final CircuitBreakerService circuitBreakerService,
312+
final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier) throws IOException {
310313
super(shardRouting.shardId(), indexSettings);
311314
assert shardRouting.initializing();
312315
this.shardRouting = shardRouting;
@@ -316,6 +319,7 @@ public IndexShard(
316319
this.similarityService = similarityService;
317320
Objects.requireNonNull(store, "Store must be provided to the index shard");
318321
this.engineFactory = Objects.requireNonNull(engineFactory);
322+
this.snapshotCommitSupplier = Objects.requireNonNull(snapshotCommitSupplier);
319323
this.store = store;
320324
this.indexSortSupplier = indexSortSupplier;
321325
this.indexEventListener = indexEventListener;
@@ -1236,6 +1240,19 @@ public Engine.IndexCommitRef acquireLastIndexCommit(boolean flushFirst) throws E
12361240
}
12371241
}
12381242

1243+
/**
1244+
* Acquires the {@link IndexCommit} which should be included in a snapshot.
1245+
*/
1246+
public Engine.IndexCommitRef acquireIndexCommitForSnapshot() throws EngineException {
1247+
final IndexShardState state = this.state; // one time volatile read
1248+
if (state == IndexShardState.STARTED) {
1249+
// unlike acquireLastIndexCommit(), there's no need to acquire a snapshot on a shard that is shutting down
1250+
return getEngine().acquireIndexCommitForSnapshot();
1251+
} else {
1252+
throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed");
1253+
}
1254+
}
1255+
12391256
/**
12401257
* Snapshots the most recent safe index commit from the currently running engine.
12411258
* All index files referenced by this index commit won't be freed until the commit/snapshot is closed.
@@ -2936,16 +2953,30 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
29362953
this.warmer.warm(reader);
29372954
}
29382955
};
2939-
return new EngineConfig(shardId,
2940-
threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(),
2956+
return new EngineConfig(
2957+
shardId,
2958+
threadPool,
2959+
indexSettings,
2960+
warmer,
2961+
store,
2962+
indexSettings.getMergePolicy(),
29412963
buildIndexAnalyzer(mapperService),
2942-
similarityService.similarity(mapperService == null ? null : mapperService::fieldType), codecService, shardEventListener,
2943-
indexCache != null ? indexCache.query() : null, cachingPolicy, translogConfig,
2964+
similarityService.similarity(mapperService == null ? null : mapperService::fieldType),
2965+
codecService,
2966+
shardEventListener,
2967+
indexCache != null ? indexCache.query() : null,
2968+
cachingPolicy,
2969+
translogConfig,
29442970
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
29452971
Arrays.asList(refreshListeners, refreshPendingLocationListener),
29462972
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
2947-
indexSort, circuitBreakerService, globalCheckpointSupplier, replicationTracker::getRetentionLeases,
2948-
() -> getOperationPrimaryTerm(), tombstoneDocSupplier());
2973+
indexSort,
2974+
circuitBreakerService,
2975+
globalCheckpointSupplier,
2976+
replicationTracker::getRetentionLeases,
2977+
this::getOperationPrimaryTerm,
2978+
tombstoneDocSupplier(),
2979+
snapshotCommitSupplier);
29492980
}
29502981

29512982
/**

server/src/main/java/org/elasticsearch/index/snapshots/blobstore/SnapshotFiles.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,4 +128,9 @@ public FileInfo findPhysicalIndexFile(String physicalName) {
128128
return physicalFiles.get(physicalName);
129129
}
130130

131+
@Override
132+
public String toString() {
133+
return "SnapshotFiles{snapshot=[" + snapshot + "], shardStateIdentifier=[" + shardStateIdentifier + "], indexFiles=" + indexFiles
134+
+ "}";
135+
}
131136
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.index.store;
21+
22+
/**
23+
* Exception thrown if trying to mutate files in an immutable directory.
24+
*/
25+
public class ImmutableDirectoryException extends IllegalArgumentException {
26+
public ImmutableDirectoryException(String message) {
27+
super(message);
28+
}
29+
}

0 commit comments

Comments
 (0)