Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion buildSrc/src/main/resources/checkstyle_suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]env[/\\]NodeEnvironment.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]AsyncShardFetch.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]DanglingIndicesState.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]Gateway.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]GatewayAllocator.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]GatewayMetaState.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]GatewayService.java" checks="LineLength" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.cluster.metadata;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest;
Expand All @@ -37,6 +38,8 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.NodeServicesProvider;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.SnapshotsService;
Expand All @@ -59,10 +62,16 @@ public class MetaDataIndexStateService extends AbstractComponent {
private final AllocationService allocationService;

private final MetaDataIndexUpgradeService metaDataIndexUpgradeService;
private final NodeServicesProvider nodeServiceProvider;
private final IndicesService indicesService;

@Inject
public MetaDataIndexStateService(Settings settings, ClusterService clusterService, AllocationService allocationService, MetaDataIndexUpgradeService metaDataIndexUpgradeService) {
public MetaDataIndexStateService(Settings settings, ClusterService clusterService, AllocationService allocationService,
MetaDataIndexUpgradeService metaDataIndexUpgradeService,
NodeServicesProvider nodeServicesProvider, IndicesService indicesService) {
super(settings);
this.nodeServiceProvider = nodeServicesProvider;
this.indicesService = indicesService;
this.clusterService = clusterService;
this.allocationService = allocationService;
this.metaDataIndexUpgradeService = metaDataIndexUpgradeService;
Expand Down Expand Up @@ -162,6 +171,12 @@ public ClusterState execute(ClusterState currentState) {
// The index might be closed because we couldn't import it due to old incompatible version
// We need to check that this index can be upgraded to the current version
indexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(indexMetaData);
try {
indicesService.verifyIndexMetadata(nodeServiceProvider, indexMetaData);
} catch (Exception e) {
throw new ElasticsearchException("Failed to verify index " + indexMetaData.getIndex(), e);
}

mdBuilder.put(indexMetaData, true);
blocksBuilder.removeIndexBlock(indexName, INDEX_CLOSED_BLOCK);
}
Expand Down
29 changes: 25 additions & 4 deletions core/src/main/java/org/elasticsearch/gateway/Gateway.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,14 @@
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.NodeServicesProvider;
import org.elasticsearch.indices.IndicesService;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.function.Supplier;

/**
Expand All @@ -53,10 +59,15 @@ public class Gateway extends AbstractComponent implements ClusterStateListener {
private final TransportNodesListGatewayMetaState listGatewayMetaState;

private final Supplier<Integer> minimumMasterNodesProvider;
private final IndicesService indicesService;
private final NodeServicesProvider nodeServicesProvider;

public Gateway(Settings settings, ClusterService clusterService, NodeEnvironment nodeEnv, GatewayMetaState metaState,
TransportNodesListGatewayMetaState listGatewayMetaState, Discovery discovery) {
TransportNodesListGatewayMetaState listGatewayMetaState, Discovery discovery,
NodeServicesProvider nodeServicesProvider, IndicesService indicesService) {
super(settings);
this.nodeServicesProvider = nodeServicesProvider;
this.indicesService = indicesService;
this.clusterService = clusterService;
this.nodeEnv = nodeEnv;
this.metaState = metaState;
Expand All @@ -66,9 +77,9 @@ public Gateway(Settings settings, ClusterService clusterService, NodeEnvironment
}

public void performStateRecovery(final GatewayStateRecoveredListener listener) throws GatewayException {
ObjectHashSet<String> nodesIds = new ObjectHashSet<>(clusterService.state().nodes().masterNodes().keys());
logger.trace("performing state recovery from {}", nodesIds);
TransportNodesListGatewayMetaState.NodesGatewayMetaState nodesState = listGatewayMetaState.list(nodesIds.toArray(String.class), null).actionGet();
String[] nodesIds = clusterService.state().nodes().masterNodes().keys().toArray(String.class);
logger.trace("performing state recovery from {}", Arrays.toString(nodesIds));
TransportNodesListGatewayMetaState.NodesGatewayMetaState nodesState = listGatewayMetaState.list(nodesIds, null).actionGet();


int requiredAllocation = Math.max(1, minimumMasterNodesProvider.get());
Expand Down Expand Up @@ -129,7 +140,17 @@ public void performStateRecovery(final GatewayStateRecoveredListener listener) t
if (electedIndexMetaData != null) {
if (indexMetaDataCount < requiredAllocation) {
logger.debug("[{}] found [{}], required [{}], not adding", index, indexMetaDataCount, requiredAllocation);
} // TODO if this logging statement is correct then we are missing an else here
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, looks wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that logging message is useless - it is also relevant for the case where the state was fetched from enough nodes, but some index isn't present on requiredAllocation - which is extremely rare. IMO we should just remove all index level "selection". We elect a global state and that's it. I'll do this as a follow up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alright - I'll keep the TODO just in case

try {
if (electedIndexMetaData.getState() == IndexMetaData.State.OPEN) {
// verify that we can actually create this index - if not we recover it as closed with lots of warn logs
indicesService.verifyIndexMetadata(nodeServicesProvider, electedIndexMetaData);
}
} catch (Exception e) {
logger.warn("recovering index {} failed - recovering as closed", e, electedIndexMetaData.getIndex());
electedIndexMetaData = IndexMetaData.builder(electedIndexMetaData).state(IndexMetaData.State.CLOSE).build();
}

metaDataBuilder.put(electedIndexMetaData, false);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ public GatewayMetaState(Settings settings, NodeEnvironment nodeEnv, MetaStateSer
if (DiscoveryNode.masterNode(settings) || DiscoveryNode.dataNode(settings)) {
try {
ensureNoPre019State();
pre20Upgrade();
IndexFolderUpgrader.upgradeIndicesIfNeeded(settings, nodeEnv);
upgradeMetaData();
long startNS = System.nanoTime();
metaStateService.loadFullState();
logger.debug("took {} to load state", TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - startNS)));
Expand Down Expand Up @@ -222,7 +222,7 @@ private void ensureNoPre019State() throws Exception {
* MetaDataIndexUpgradeService might also update obsolete settings if needed. When this happens we rewrite
* index metadata with new settings.
*/
private void pre20Upgrade() throws Exception {
private void upgradeMetaData() throws Exception {
MetaData metaData = loadMetaState();
List<IndexMetaData> updateIndexMetaData = new ArrayList<>();
for (IndexMetaData indexMetaData : metaData) {
Expand All @@ -235,7 +235,7 @@ private void pre20Upgrade() throws Exception {
// means the upgrade can continue. Now it's safe to overwrite index metadata with the new version.
for (IndexMetaData indexMetaData : updateIndexMetaData) {
// since we still haven't upgraded the index folders, we write index state in the old folder
metaStateService.writeIndex("upgrade", indexMetaData, nodeEnv.resolveIndexFolder(indexMetaData.getIndex().getName()));
metaStateService.writeIndex("upgrade", indexMetaData, nodeEnv.resolveIndexFolder(indexMetaData.getIndex().getUUID()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.NodeServicesProvider;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -95,9 +97,11 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
@Inject
public GatewayService(Settings settings, AllocationService allocationService, ClusterService clusterService,
ThreadPool threadPool, NodeEnvironment nodeEnvironment, GatewayMetaState metaState,
TransportNodesListGatewayMetaState listGatewayMetaState, Discovery discovery) {
TransportNodesListGatewayMetaState listGatewayMetaState, Discovery discovery,
NodeServicesProvider nodeServicesProvider, IndicesService indicesService) {
super(settings);
this.gateway = new Gateway(settings, clusterService, nodeEnvironment, metaState, listGatewayMetaState, discovery);
this.gateway = new Gateway(settings, clusterService, nodeEnvironment, metaState, listGatewayMetaState, discovery,
nodeServicesProvider, indicesService);
this.allocationService = allocationService;
this.clusterService = clusterService;
this.threadPool = threadPool;
Expand Down
79 changes: 59 additions & 20 deletions core/src/main/java/org/elasticsearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.indices;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.CollectionUtil;
Expand All @@ -33,6 +34,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.CircuitBreaker;
Expand Down Expand Up @@ -66,6 +68,7 @@
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.recovery.RecoveryStats;
import org.elasticsearch.index.refresh.RefreshStats;
Expand All @@ -74,6 +77,7 @@
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.index.shard.IndexingStats;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStoreConfig;
Expand All @@ -88,9 +92,11 @@
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -324,44 +330,30 @@ public IndexService indexServiceSafe(Index index) {
* @throws IndexAlreadyExistsException if the index already exists.
*/
public synchronized IndexService createIndex(final NodeServicesProvider nodeServicesProvider, IndexMetaData indexMetaData, List<IndexEventListener> builtInListeners) throws IOException {

if (!lifecycle.started()) {
throw new IllegalStateException("Can't create an index [" + indexMetaData.getIndex() + "], node is closed");
}
if (indexMetaData.getIndexUUID().equals(IndexMetaData.INDEX_UUID_NA_VALUE)) {
throw new IllegalArgumentException("index must have a real UUID found value: [" + indexMetaData.getIndexUUID() + "]");
}
final Index index = indexMetaData.getIndex();
final Predicate<String> indexNameMatcher = (indexExpression) -> indexNameExpressionResolver.matchesIndex(index.getName(), indexExpression, clusterService.state());
final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, indexNameMatcher, indexScopeSetting);
if (hasIndex(index)) {
throw new IndexAlreadyExistsException(index);
}
logger.debug("creating Index [{}], shards [{}]/[{}{}]",
indexMetaData.getIndex(),
idxSettings.getNumberOfShards(),
idxSettings.getNumberOfReplicas(),
idxSettings.isShadowReplicaIndex() ? "s" : "");

final IndexModule indexModule = new IndexModule(idxSettings, indexStoreConfig, analysisRegistry);
pluginsService.onIndexModule(indexModule);
for (IndexEventListener listener : builtInListeners) {
indexModule.addIndexEventListener(listener);
}
List<IndexEventListener> finalListeners = new ArrayList<>(builtInListeners);
final IndexEventListener onStoreClose = new IndexEventListener() {
@Override
public void onStoreClosed(ShardId shardId) {
indicesQueryCache.onClose(shardId);
}
};
indexModule.addIndexEventListener(onStoreClose);
indexModule.addIndexEventListener(oldShardsStats);
final IndexEventListener listener = indexModule.freeze();
listener.beforeIndexCreated(index, idxSettings.getSettings());
final IndexService indexService = indexModule.newIndexService(nodeEnv, this, nodeServicesProvider, indicesQueryCache, mapperRegistry, indicesFieldDataCache, indexingMemoryController);
finalListeners.add(onStoreClose);
finalListeners.add(oldShardsStats);
final IndexService indexService = createIndexService("create index", nodeServicesProvider, indexMetaData, indicesQueryCache, indicesFieldDataCache, finalListeners, indexingMemoryController);
boolean success = false;
try {
assert indexService.getIndexEventListener() == listener;
listener.afterIndexCreated(indexService);
indexService.getIndexEventListener().afterIndexCreated(indexService);
indices = newMapBuilder(indices).put(index.getUUID(), indexService).immutableMap();
success = true;
return indexService;
Expand All @@ -370,7 +362,54 @@ public void onStoreClosed(ShardId shardId) {
indexService.close("plugins_failed", true);
}
}
}

/**
* This creates a new IndexService without registering it
*/
private synchronized IndexService createIndexService(final String reason, final NodeServicesProvider nodeServicesProvider, IndexMetaData indexMetaData, IndicesQueryCache indicesQueryCache, IndicesFieldDataCache indicesFieldDataCache, List<IndexEventListener> builtInListeners, IndexingOperationListener... indexingOperationListeners) throws IOException {
final Index index = indexMetaData.getIndex();
final Predicate<String> indexNameMatcher = (indexExpression) -> indexNameExpressionResolver.matchesIndex(index.getName(), indexExpression, clusterService.state());
final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, indexNameMatcher, indexScopeSetting);
logger.debug("creating Index [{}], shards [{}]/[{}{}] - reason [{}]",
indexMetaData.getIndex(),
idxSettings.getNumberOfShards(),
idxSettings.getNumberOfReplicas(),
idxSettings.isShadowReplicaIndex() ? "s" : "", reason);

final IndexModule indexModule = new IndexModule(idxSettings, indexStoreConfig, analysisRegistry);
pluginsService.onIndexModule(indexModule);
for (IndexEventListener listener : builtInListeners) {
indexModule.addIndexEventListener(listener);
}
final IndexEventListener listener = indexModule.freeze();
listener.beforeIndexCreated(index, idxSettings.getSettings());
return indexModule.newIndexService(nodeEnv, this, nodeServicesProvider, indicesQueryCache, mapperRegistry, indicesFieldDataCache, indexingOperationListeners);
}

/**
* This method verifies that the given {@link IndexMetaData} holds sane values to create an {@link IndexService}. This method will throw an
* exception if the creation fails. The created {@link IndexService} will not be registered and will be closed immediately.
*/
public synchronized void verifyIndexMetadata(final NodeServicesProvider nodeServicesProvider, IndexMetaData metaData) throws IOException {
final List<Closeable> closeables = new ArrayList<>();
try {
IndicesFieldDataCache indicesFieldDataCache = new IndicesFieldDataCache(settings, new IndexFieldDataCache.Listener() {});
closeables.add(indicesFieldDataCache);
IndicesQueryCache indicesQueryCache = new IndicesQueryCache(settings);
closeables.add(indicesQueryCache);
// this will also fail if some plugin fails etc. which is nice since we can verify that early
final IndexService service = createIndexService("metadata verification", nodeServicesProvider,
metaData, indicesQueryCache, indicesFieldDataCache, Collections.emptyList());
for (ObjectCursor<MappingMetaData> typeMapping : metaData.getMappings().values()) {
// don't apply the default mapping, it has been applied when the mapping was created
service.mapperService().merge(typeMapping.value.type(), typeMapping.value.source(),
MapperService.MergeReason.MAPPING_RECOVERY, true);
}
closeables.add(() -> service.close("metadata verification", false));
} finally {
IOUtils.close(closeables);
}
}

/**
Expand Down
Loading