Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
Expand Down Expand Up @@ -653,7 +654,8 @@ public static final IndexShard newIndexShard(
Arrays.asList(listeners),
() -> {},
RetentionLeaseSyncer.EMPTY,
cbs);
cbs,
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER);
}

private static ShardRouting getInitializingShardRouting(ShardRouting existingShardRouting) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) {
config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(),
config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(),
config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier());
config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier(), config.getSnapshotCommitSupplier());
}

@Override
Expand Down
19 changes: 17 additions & 2 deletions server/src/main/java/org/elasticsearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -414,13 +414,16 @@ public IndexService newIndexService(IndexService.IndexCreationContext indexCreat
NamedWriteableRegistry namedWriteableRegistry,
BooleanSupplier idFieldDataEnabled,
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.IndexFoldersDeletionListener indexFoldersDeletionListener) throws IOException {
IndexStorePlugin.IndexFoldersDeletionListener indexFoldersDeletionListener,
Map<String, IndexStorePlugin.SnapshotCommitSupplier> snapshotCommitSuppliers) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory =
indexReaderWrapper.get() == null ? (shard) -> null : indexReaderWrapper.get();
eventListener.beforeIndexCreated(indexSettings.getIndex(), indexSettings.getSettings());
final IndexStorePlugin.DirectoryFactory directoryFactory = getDirectoryFactory(indexSettings, directoryFactories);
final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory = getRecoveryStateFactory(indexSettings, recoveryStateFactories);
final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier
= getSnapshotCommitSupplier(indexSettings, snapshotCommitSuppliers);
QueryCache queryCache = null;
IndexAnalyzers indexAnalyzers = null;
boolean success = false;
Expand All @@ -443,7 +446,7 @@ public IndexService newIndexService(IndexService.IndexCreationContext indexCreat
engineFactory, circuitBreakerService, bigArrays, threadPool, scriptService, clusterService, client, queryCache,
directoryFactory, eventListener, readerWrapperFactory, mapperRegistry, indicesFieldDataCache, searchOperationListeners,
indexOperationListeners, namedWriteableRegistry, idFieldDataEnabled, allowExpensiveQueries, expressionResolver,
valuesSourceRegistry, recoveryStateFactory, indexFoldersDeletionListener);
valuesSourceRegistry, recoveryStateFactory, indexFoldersDeletionListener, snapshotCommitSupplier);
success = true;
return indexService;
} finally {
Expand Down Expand Up @@ -498,6 +501,18 @@ private static IndexStorePlugin.RecoveryStateFactory getRecoveryStateFactory(
return factory;
}

public static final IndexStorePlugin.SnapshotCommitSupplier DEFAULT_SNAPSHOT_COMMIT_SUPPLIER
= e -> e.acquireLastIndexCommit(true); // by default we flush first so that the snapshot is as up-to-date as possible.

private static IndexStorePlugin.SnapshotCommitSupplier getSnapshotCommitSupplier(
final IndexSettings indexSettings,
final Map<String, IndexStorePlugin.SnapshotCommitSupplier> snapshotCommitSuppliers) {
final String storeType = indexSettings.getValue(INDEX_STORE_TYPE_SETTING);
// we check that storeType refers to a valid store type in getDirectoryFactory() so there's no need for strictness here too.
final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier = snapshotCommitSuppliers.get(storeType);
return snapshotCommitSupplier == null ? DEFAULT_SNAPSHOT_COMMIT_SUPPLIER : snapshotCommitSupplier;
}

/**
* creates a new mapper service to do administrative work like mapping updates. This *should not* be used for document parsing.
* doing so will result in an exception.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final IndexStorePlugin.IndexFoldersDeletionListener indexFoldersDeletionListener;
private final IndexStorePlugin.DirectoryFactory directoryFactory;
private final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory;
private final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier;
private final CheckedFunction<DirectoryReader, DirectoryReader, IOException> readerWrapper;
private final IndexCache indexCache;
private final MapperService mapperService;
Expand Down Expand Up @@ -180,8 +181,8 @@ public IndexService(
IndexNameExpressionResolver expressionResolver,
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory,
IndexStorePlugin.IndexFoldersDeletionListener indexFoldersDeletionListener
) {
IndexStorePlugin.IndexFoldersDeletionListener indexFoldersDeletionListener,
IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier) {
super(indexSettings);
this.allowExpensiveQueries = allowExpensiveQueries;
this.indexSettings = indexSettings;
Expand All @@ -191,6 +192,7 @@ public IndexService(
this.circuitBreakerService = circuitBreakerService;
this.expressionResolver = expressionResolver;
this.valuesSourceRegistry = valuesSourceRegistry;
this.snapshotCommitSupplier = snapshotCommitSupplier;
if (needsMapperService(indexSettings, indexCreationContext)) {
assert indexAnalyzers != null;
this.mapperService = new MapperService(indexSettings, indexAnalyzers, xContentRegistry, similarityService, mapperRegistry,
Expand Down Expand Up @@ -484,7 +486,8 @@ public synchronized IndexShard createShard(
indexingOperationListeners,
() -> globalCheckpointSyncer.accept(shardId),
retentionLeaseSyncer,
circuitBreakerService);
circuitBreakerService,
snapshotCommitSupplier);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
shards = Maps.copyMapWithAddedEntry(shards, shardId.id(), indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,13 @@ public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyE
*/
public abstract IndexCommitRef acquireSafeIndexCommit() throws EngineException;

/**
* Acquires the index commit that should be included in a snapshot.
*/
public final IndexCommitRef acquireIndexCommitForSnapshot() throws EngineException {
return engineConfig.getSnapshotCommitSupplier().acquireIndexCommitForSnapshot(this);
}

/**
* @return a summary of the contents of the current safe commit
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.List;
Expand All @@ -60,6 +61,7 @@ public final class EngineConfig {
private volatile boolean enableGcDeletes = true;
private final TimeValue flushMergesAfter;
private final String codecName;
private final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier;
private final ThreadPool threadPool;
private final Engine.Warmer warmer;
private final Store store;
Expand Down Expand Up @@ -120,18 +122,30 @@ public Supplier<RetentionLeases> retentionLeasesSupplier() {
/**
* Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
*/
public EngineConfig(ShardId shardId, ThreadPool threadPool,
IndexSettings indexSettings, Engine.Warmer warmer, Store store,
MergePolicy mergePolicy, Analyzer analyzer,
Similarity similarity, CodecService codecService, Engine.EventListener eventListener,
QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
TranslogConfig translogConfig, TimeValue flushMergesAfter,
List<ReferenceManager.RefreshListener> externalRefreshListener,
List<ReferenceManager.RefreshListener> internalRefreshListener, Sort indexSort,
CircuitBreakerService circuitBreakerService, LongSupplier globalCheckpointSupplier,
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier) {
public EngineConfig(
ShardId shardId,
ThreadPool threadPool,
IndexSettings indexSettings,
Engine.Warmer warmer,
Store store,
MergePolicy mergePolicy,
Analyzer analyzer,
Similarity similarity,
CodecService codecService,
Engine.EventListener eventListener,
QueryCache queryCache,
QueryCachingPolicy queryCachingPolicy,
TranslogConfig translogConfig,
TimeValue flushMergesAfter,
List<ReferenceManager.RefreshListener> externalRefreshListener,
List<ReferenceManager.RefreshListener> internalRefreshListener,
Sort indexSort,
CircuitBreakerService circuitBreakerService,
LongSupplier globalCheckpointSupplier,
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier,
IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier) {
this.shardId = shardId;
this.indexSettings = indexSettings;
this.threadPool = threadPool;
Expand Down Expand Up @@ -169,6 +183,7 @@ public EngineConfig(ShardId shardId, ThreadPool threadPool,
this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier);
this.primaryTermSupplier = primaryTermSupplier;
this.tombstoneDocSupplier = tombstoneDocSupplier;
this.snapshotCommitSupplier = snapshotCommitSupplier;
}

/**
Expand Down Expand Up @@ -370,4 +385,8 @@ public interface TombstoneDocSupplier {
public TombstoneDocSupplier getTombstoneDocSupplier() {
return tombstoneDocSupplier;
}

public IndexStorePlugin.SnapshotCommitSupplier getSnapshotCommitSupplier() {
return snapshotCommitSupplier;
}
}
45 changes: 38 additions & 7 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.rest.RestStatus;
Expand Down Expand Up @@ -225,6 +226,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private final GlobalCheckpointListeners globalCheckpointListeners;
private final PendingReplicationActions pendingReplicationActions;
private final ReplicationTracker replicationTracker;
private final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier;

protected volatile ShardRouting shardRouting;
protected volatile IndexShardState state;
Expand Down Expand Up @@ -305,7 +307,8 @@ public IndexShard(
final List<IndexingOperationListener> listeners,
final Runnable globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final CircuitBreakerService circuitBreakerService) throws IOException {
final CircuitBreakerService circuitBreakerService,
final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
this.shardRouting = shardRouting;
Expand All @@ -315,6 +318,7 @@ public IndexShard(
this.similarityService = similarityService;
Objects.requireNonNull(store, "Store must be provided to the index shard");
this.engineFactory = Objects.requireNonNull(engineFactory);
this.snapshotCommitSupplier = Objects.requireNonNull(snapshotCommitSupplier);
this.store = store;
this.indexSortSupplier = indexSortSupplier;
this.indexEventListener = indexEventListener;
Expand Down Expand Up @@ -1169,6 +1173,19 @@ public Engine.IndexCommitRef acquireLastIndexCommit(boolean flushFirst) throws E
}
}

/**
* Acquires the {@link IndexCommit} which should be included in a snapshot.
*/
public Engine.IndexCommitRef acquireIndexCommitForSnapshot() throws EngineException {
final IndexShardState state = this.state; // one time volatile read
if (state == IndexShardState.STARTED) {
// unlike acquireLastIndexCommit(), there's no need to acquire a snapshot on a shard that is shutting down
return getEngine().acquireIndexCommitForSnapshot();
} else {
throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed");
}
}

/**
* Snapshots the most recent safe index commit from the currently running engine.
* All index files referenced by this index commit won't be freed until the commit/snapshot is closed.
Expand Down Expand Up @@ -2836,16 +2853,30 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
this.warmer.warm(reader);
}
};
return new EngineConfig(shardId,
threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(),
return new EngineConfig(
shardId,
threadPool,
indexSettings,
warmer,
store,
indexSettings.getMergePolicy(),
buildIndexAnalyzer(mapperService),
similarityService.similarity(mapperService == null ? null : mapperService::fieldType), codecService, shardEventListener,
indexCache != null ? indexCache.query() : null, cachingPolicy, translogConfig,
similarityService.similarity(mapperService == null ? null : mapperService::fieldType),
codecService,
shardEventListener,
indexCache != null ? indexCache.query() : null,
cachingPolicy,
translogConfig,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
List.of(refreshListeners, refreshPendingLocationListener),
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
indexSort, circuitBreakerService, globalCheckpointSupplier, replicationTracker::getRetentionLeases,
() -> getOperationPrimaryTerm(), tombstoneDocSupplier());
indexSort,
circuitBreakerService,
globalCheckpointSupplier,
replicationTracker::getRetentionLeases,
this::getOperationPrimaryTerm,
tombstoneDocSupplier(),
snapshotCommitSupplier);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,9 @@ private FileInfo findPhysicalIndexFile(String physicalName) {
return physicalFiles.get(physicalName);
}

@Override
public String toString() {
return "SnapshotFiles{snapshot=[" + snapshot + "], shardStateIdentifier=[" + shardStateIdentifier + "], indexFiles=" + indexFiles
+ "}";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.store;

/**
* Exception thrown if trying to mutate files in an immutable directory.
*/
public class ImmutableDirectoryException extends IllegalArgumentException {
public ImmutableDirectoryException(String message) {
super(message);
}
}
Loading