Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions server/src/main/java/org/elasticsearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public final class IndexModule {

private final IndexSettings indexSettings;
private final AnalysisRegistry analysisRegistry;
private final EngineFactory engineFactory;
private final Function<IndexSettings, EngineFactory> engineFactoryFn;
private SetOnce<IndexSearcherWrapperFactory> indexSearcherWrapper = new SetOnce<>();
private final Set<IndexEventListener> indexEventListeners = new HashSet<>();
private final Map<String, TriFunction<Settings, Version, ScriptService, Similarity>> similarities = new HashMap<>();
Expand All @@ -121,12 +121,13 @@ public final class IndexModule {
*
* @param indexSettings the index settings
* @param analysisRegistry the analysis registry
* @param engineFactory the engine factory
* @param engineFactoryFn a function from IndexSettings to the engine factory to use
*/
public IndexModule(final IndexSettings indexSettings, final AnalysisRegistry analysisRegistry, final EngineFactory engineFactory) {
public IndexModule(final IndexSettings indexSettings, final AnalysisRegistry analysisRegistry,
final Function<IndexSettings, EngineFactory> engineFactoryFn) {
this.indexSettings = indexSettings;
this.analysisRegistry = analysisRegistry;
this.engineFactory = Objects.requireNonNull(engineFactory);
this.engineFactoryFn = Objects.requireNonNull(engineFactoryFn);
this.searchOperationListeners.add(new SearchSlowLog(indexSettings));
this.indexOperationListeners.add(new IndexingSlowLog(indexSettings));
}
Expand Down Expand Up @@ -172,8 +173,8 @@ public Index getIndex() {
*
* @return the engine factory
*/
EngineFactory getEngineFactory() {
return engineFactory;
Function<IndexSettings, EngineFactory> getEngineFactoryFn() {
return engineFactoryFn;
}

/**
Expand Down Expand Up @@ -382,7 +383,7 @@ public IndexService newIndexService(
}
return new IndexService(indexSettings, environment, xContentRegistry,
new SimilarityService(indexSettings, scriptService, similarities),
shardStoreDeleter, analysisRegistry, engineFactory, circuitBreakerService, bigArrays, threadPool, scriptService,
shardStoreDeleter, analysisRegistry, engineFactoryFn, circuitBreakerService, bigArrays, threadPool, scriptService,
client, queryCache, store, eventListener, searcherWrapperFactory, mapperRegistry,
indicesFieldDataCache, searchOperationListeners, indexOperationListeners, namedWriteableRegistry);
}
Expand Down
13 changes: 7 additions & 6 deletions server/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

Expand All @@ -109,7 +110,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final NamedXContentRegistry xContentRegistry;
private final NamedWriteableRegistry namedWriteableRegistry;
private final SimilarityService similarityService;
private final EngineFactory engineFactory;
private final Function<IndexSettings, EngineFactory> engineFactoryFn;
private final IndexWarmer warmer;
private volatile Map<Integer, IndexShard> shards = emptyMap();
private final AtomicBoolean closed = new AtomicBoolean(false);
Expand Down Expand Up @@ -139,7 +140,7 @@ public IndexService(
SimilarityService similarityService,
ShardStoreDeleter shardStoreDeleter,
AnalysisRegistry registry,
EngineFactory engineFactory,
Function<IndexSettings, EngineFactory> engineFactoryFn,
CircuitBreakerService circuitBreakerService,
BigArrays bigArrays,
ThreadPool threadPool,
Expand Down Expand Up @@ -188,7 +189,7 @@ public IndexService(
this.warmer = new IndexWarmer(indexSettings.getSettings(), threadPool, indexFieldData,
bitsetFilterCache.createListener(threadPool));
this.indexCache = new IndexCache(indexSettings, queryCache, bitsetFilterCache);
this.engineFactory = Objects.requireNonNull(engineFactory);
this.engineFactoryFn = Objects.requireNonNull(engineFactoryFn);
// initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE
this.searcherWrapper = wrapperFactory.newWrapper(this);
this.searchOperationListeners = Collections.unmodifiableList(searchOperationListeners);
Expand Down Expand Up @@ -380,7 +381,7 @@ public synchronized IndexShard createShard(ShardRouting routing, Consumer<ShardI
store = new Store(shardId, this.indexSettings, indexStore.newDirectoryService(path), lock,
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId)));
indexShard = new IndexShard(routing, this.indexSettings, path, store, indexSortSupplier,
indexCache, mapperService, similarityService, engineFactory,
indexCache, mapperService, similarityService, engineFactoryFn,
eventListener, searcherWrapper, threadPool, bigArrays, engineWarmer,
searchOperationListeners, indexingOperationListeners, () -> globalCheckpointSyncer.accept(shardId),
circuitBreakerService);
Expand Down Expand Up @@ -681,8 +682,8 @@ public interface ShardStoreDeleter {
void addPendingDelete(ShardId shardId, IndexSettings indexSettings);
}

public final EngineFactory getEngineFactory() {
return engineFactory;
public final Function<IndexSettings, EngineFactory> getEngineFactoryFn() {
return engineFactoryFn;
}

final IndexSearcherWrapper getSearcherWrapper() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -193,7 +194,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
protected volatile IndexShardState state;
protected volatile long primaryTerm;
protected final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
final EngineFactory engineFactory;
final Function<IndexSettings, EngineFactory> engineFactoryFn;

private final IndexingOperationListener indexingOperationListeners;
private final Runnable globalCheckpointSyncer;
Expand Down Expand Up @@ -248,7 +249,7 @@ public IndexShard(
IndexCache indexCache,
MapperService mapperService,
SimilarityService similarityService,
@Nullable EngineFactory engineFactory,
Function<IndexSettings, EngineFactory> engineFactoryFn,
IndexEventListener indexEventListener,
IndexSearcherWrapper indexSearcherWrapper,
ThreadPool threadPool,
Expand All @@ -266,7 +267,7 @@ public IndexShard(
this.warmer = warmer;
this.similarityService = similarityService;
Objects.requireNonNull(store, "Store must be provided to the index shard");
this.engineFactory = Objects.requireNonNull(engineFactory);
this.engineFactoryFn = Objects.requireNonNull(engineFactoryFn);
this.store = store;
this.indexSortSupplier = indexSortSupplier;
this.indexEventListener = indexEventListener;
Expand Down Expand Up @@ -2115,7 +2116,7 @@ private Engine createNewEngine(EngineConfig config) {
}

protected Engine newEngine(EngineConfig config) {
return engineFactory.newReadWriteEngine(config);
return engineFactoryFn.apply(this.indexSettings).newReadWriteEngine(config);
}

private static void persistMetadata(
Expand Down Expand Up @@ -2445,8 +2446,8 @@ public ShardFailure(ShardRouting routing, String reason, @Nullable Exception cau
}
}

EngineFactory getEngineFactory() {
return engineFactory;
Function<IndexSettings, EngineFactory> getEngineFactoryFn() {
return engineFactoryFn;
}

// for tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ private synchronized IndexService createIndexService(final String reason,
idxSettings.getNumberOfReplicas(),
reason);

final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings));
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, this::getEngineFactory);
for (IndexingOperationListener operationListener : indexingOperationListeners) {
indexModule.addIndexOperationListener(operationListener);
}
Expand All @@ -475,7 +475,8 @@ private synchronized IndexService createIndexService(final String reason,
);
}

private EngineFactory getEngineFactory(final IndexSettings idxSettings) {
// Visible for testing
EngineFactory getEngineFactory(final IndexSettings idxSettings) {
final List<Optional<EngineFactory>> engineFactories =
engineFactoryProviders
.stream()
Expand Down Expand Up @@ -511,7 +512,7 @@ private EngineFactory getEngineFactory(final IndexSettings idxSettings) {
*/
public synchronized MapperService createIndexMapperService(IndexMetaData indexMetaData) throws IOException {
final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, indexScopedSettings);
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings));
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, this::getEngineFactory);
pluginsService.onIndexModule(indexModule);
return indexModule.newIndexMapperService(xContentRegistry, mapperRegistry, scriptService);
}
Expand Down
38 changes: 24 additions & 14 deletions server/src/test/java/org/elasticsearch/index/IndexModuleTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,13 @@ private IndexService newIndexService(IndexModule module) throws IOException {
}

public void testWrapperIsBound() throws IOException {
IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry, new MockEngineFactory(AssertingDirectoryReader.class));
IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry,
s -> new MockEngineFactory(AssertingDirectoryReader.class));
module.setSearcherWrapper((s) -> new Wrapper());

IndexService indexService = newIndexService(module);
assertTrue(indexService.getSearcherWrapper() instanceof Wrapper);
assertSame(indexService.getEngineFactory(), module.getEngineFactory());
assertSame(indexService.getEngineFactoryFn(), module.getEngineFactoryFn());
indexService.close("simon says", false);
}

Expand All @@ -165,7 +166,7 @@ public void testRegisterIndexStore() throws IOException {
.put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), "foo_store")
.build();
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(index, settings);
IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry, new InternalEngineFactory());
IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry, s -> new InternalEngineFactory());
module.addIndexStore("foo_store", FooStore::new);
try {
module.addIndexStore("foo_store", FooStore::new);
Expand All @@ -189,7 +190,7 @@ public void beforeIndexRemoved(IndexService indexService, IndexRemovalReason rea
}
};
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(index, settings);
IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry, new InternalEngineFactory());
IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry, s -> new InternalEngineFactory());
module.addIndexEventListener(eventListener);
IndexService indexService = newIndexService(module);
IndexSettings x = indexService.getIndexSettings();
Expand All @@ -204,7 +205,7 @@ public void beforeIndexRemoved(IndexService indexService, IndexRemovalReason rea
public void testListener() throws IOException {
Setting<Boolean> booleanSetting = Setting.boolSetting("index.foo.bar", false, Property.Dynamic, Property.IndexScope);
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(index, settings, booleanSetting);
IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry, new InternalEngineFactory());
IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry, s -> new InternalEngineFactory());
Setting<Boolean> booleanSetting2 = Setting.boolSetting("index.foo.bar.baz", false, Property.Dynamic, Property.IndexScope);
AtomicBoolean atomicBoolean = new AtomicBoolean(false);
module.addSettingsUpdateConsumer(booleanSetting, atomicBoolean::set);
Expand All @@ -224,7 +225,8 @@ public void testListener() throws IOException {

public void testAddIndexOperationListener() throws IOException {
IndexModule module =
new IndexModule(IndexSettingsModule.newIndexSettings(index, settings), emptyAnalysisRegistry, new InternalEngineFactory());
new IndexModule(IndexSettingsModule.newIndexSettings(index, settings),
emptyAnalysisRegistry, s -> new InternalEngineFactory());
AtomicBoolean executed = new AtomicBoolean(false);
IndexingOperationListener listener = new IndexingOperationListener() {
@Override
Expand Down Expand Up @@ -255,7 +257,8 @@ public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {

public void testAddSearchOperationListener() throws IOException {
IndexModule module =
new IndexModule(IndexSettingsModule.newIndexSettings(index, settings), emptyAnalysisRegistry, new InternalEngineFactory());
new IndexModule(IndexSettingsModule.newIndexSettings(index, settings),
emptyAnalysisRegistry, s -> new InternalEngineFactory());
AtomicBoolean executed = new AtomicBoolean(false);
SearchOperationListener listener = new SearchOperationListener() {

Expand Down Expand Up @@ -289,7 +292,8 @@ public void testAddSimilarity() throws IOException {
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.build();
IndexModule module =
new IndexModule(IndexSettingsModule.newIndexSettings("foo", settings), emptyAnalysisRegistry, new InternalEngineFactory());
new IndexModule(IndexSettingsModule.newIndexSettings("foo", settings),
emptyAnalysisRegistry, s -> new InternalEngineFactory());
module.addSimilarity("test_similarity",
(providerSettings, indexCreatedVersion, scriptService) -> new TestSimilarity(providerSettings.get("key")));

Expand All @@ -304,7 +308,8 @@ public void testAddSimilarity() throws IOException {

public void testFrozen() {
IndexModule module =
new IndexModule(IndexSettingsModule.newIndexSettings(index, settings), emptyAnalysisRegistry, new InternalEngineFactory());
new IndexModule(IndexSettingsModule.newIndexSettings(index, settings),
emptyAnalysisRegistry, s -> new InternalEngineFactory());
module.freeze();
String msg = "Can't modify IndexModule once the index service has been created";
assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.addSearchOperationListener(null)).getMessage());
Expand All @@ -323,7 +328,8 @@ public void testSetupUnknownSimilarity() throws IOException {
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.build();
IndexModule module =
new IndexModule(IndexSettingsModule.newIndexSettings("foo", settings), emptyAnalysisRegistry, new InternalEngineFactory());
new IndexModule(IndexSettingsModule.newIndexSettings("foo", settings),
emptyAnalysisRegistry, s -> new InternalEngineFactory());
Exception ex = expectThrows(IllegalArgumentException.class, () -> newIndexService(module));
assertEquals("Unknown Similarity type [test_similarity] for [my_similarity]", ex.getMessage());
}
Expand All @@ -335,7 +341,8 @@ public void testSetupWithoutType() throws IOException {
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.build();
IndexModule module =
new IndexModule(IndexSettingsModule.newIndexSettings("foo", settings), emptyAnalysisRegistry, new InternalEngineFactory());
new IndexModule(IndexSettingsModule.newIndexSettings("foo", settings),
emptyAnalysisRegistry, s -> new InternalEngineFactory());
Exception ex = expectThrows(IllegalArgumentException.class, () -> newIndexService(module));
assertEquals("Similarity [my_similarity] must have an associated type", ex.getMessage());
}
Expand All @@ -345,7 +352,8 @@ public void testForceCustomQueryCache() throws IOException {
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
IndexModule module =
new IndexModule(IndexSettingsModule.newIndexSettings("foo", settings), emptyAnalysisRegistry, new InternalEngineFactory());
new IndexModule(IndexSettingsModule.newIndexSettings("foo", settings),
emptyAnalysisRegistry, s -> new InternalEngineFactory());
module.forceQueryCacheProvider((a, b) -> new CustomQueryCache());
expectThrows(AlreadySetException.class, () -> module.forceQueryCacheProvider((a, b) -> new CustomQueryCache()));
IndexService indexService = newIndexService(module);
Expand All @@ -358,7 +366,8 @@ public void testDefaultQueryCacheImplIsSelected() throws IOException {
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
IndexModule module =
new IndexModule(IndexSettingsModule.newIndexSettings("foo", settings), emptyAnalysisRegistry, new InternalEngineFactory());
new IndexModule(IndexSettingsModule.newIndexSettings("foo", settings),
emptyAnalysisRegistry, s -> new InternalEngineFactory());
IndexService indexService = newIndexService(module);
assertTrue(indexService.cache().query() instanceof IndexQueryCache);
indexService.close("simon says", false);
Expand All @@ -370,7 +379,8 @@ public void testDisableQueryCacheHasPrecedenceOverForceQueryCache() throws IOExc
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
IndexModule module =
new IndexModule(IndexSettingsModule.newIndexSettings("foo", settings), emptyAnalysisRegistry, new InternalEngineFactory());
new IndexModule(IndexSettingsModule.newIndexSettings("foo", settings),
emptyAnalysisRegistry, s -> new InternalEngineFactory());
module.forceQueryCacheProvider((a, b) -> new CustomQueryCache());
IndexService indexService = newIndexService(module);
assertTrue(indexService.cache().query() instanceof DisabledQueryCache);
Expand Down
Loading