-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Write state also on data nodes if not master eligible #9952
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
647eb22
0f5afcd
aa66bb9
dd28a5d
39359a6
a3a9831
a72a297
c680f84
d15e732
c461fe6
18cf4e1
4e25695
8652750
8b58004
a792ad6
a4baa80
9c1df62
c394971
e7994b3
bf906fe
32dc21a
82505cf
aeedb29
92d1f40
ab90dbe
5770828
f88e821
7c44299
5cb39b8
7569a51
d2abcfa
9f6f0e1
06d2b59
8e8f8d1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ | |
|
|
||
| package org.elasticsearch.gateway; | ||
|
|
||
| import com.google.common.collect.ImmutableSet; | ||
| import org.elasticsearch.ElasticsearchException; | ||
| import org.elasticsearch.Version; | ||
| import org.elasticsearch.cluster.ClusterChangedEvent; | ||
|
|
@@ -27,9 +28,7 @@ | |
| import org.elasticsearch.cluster.metadata.IndexMetaData; | ||
| import org.elasticsearch.cluster.metadata.MetaData; | ||
| import org.elasticsearch.cluster.node.DiscoveryNode; | ||
| import org.elasticsearch.cluster.routing.DjbHashFunction; | ||
| import org.elasticsearch.cluster.routing.HashFunction; | ||
| import org.elasticsearch.cluster.routing.SimpleHashFunction; | ||
| import org.elasticsearch.cluster.routing.*; | ||
| import org.elasticsearch.common.Nullable; | ||
| import org.elasticsearch.common.component.AbstractComponent; | ||
| import org.elasticsearch.common.inject.Inject; | ||
|
|
@@ -43,6 +42,7 @@ | |
| import java.nio.file.DirectoryStream; | ||
| import java.nio.file.Files; | ||
| import java.nio.file.Path; | ||
| import java.util.*; | ||
|
|
||
| /** | ||
| * | ||
|
|
@@ -57,7 +57,9 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL | |
| private final DanglingIndicesState danglingIndicesState; | ||
|
|
||
| @Nullable | ||
| private volatile MetaData currentMetaData; | ||
| private volatile MetaData previousMetaData; | ||
|
|
||
| private volatile ImmutableSet<String> previouslyWrittenIndices = ImmutableSet.of(); | ||
|
|
||
| @Inject | ||
| public GatewayMetaState(Settings settings, NodeEnvironment nodeEnv, MetaStateService metaStateService, | ||
|
|
@@ -76,7 +78,7 @@ public GatewayMetaState(Settings settings, NodeEnvironment nodeEnv, MetaStateSer | |
| if (DiscoveryNode.masterNode(settings) || DiscoveryNode.dataNode(settings)) { | ||
| nodeEnv.ensureAtomicMoveSupported(); | ||
| } | ||
| if (DiscoveryNode.masterNode(settings)) { | ||
| if (DiscoveryNode.masterNode(settings) || DiscoveryNode.dataNode(settings)) { | ||
| try { | ||
| ensureNoPre019State(); | ||
| pre20Upgrade(); | ||
|
|
@@ -96,55 +98,62 @@ public MetaData loadMetaState() throws Exception { | |
|
|
||
| @Override | ||
| public void clusterChanged(ClusterChangedEvent event) { | ||
| Set<String> relevantIndices = new HashSet<>(); | ||
| final ClusterState state = event.state(); | ||
| if (state.blocks().disableStatePersistence()) { | ||
| // reset the current metadata, we need to start fresh... | ||
| this.currentMetaData = null; | ||
| this.previousMetaData = null; | ||
| previouslyWrittenIndices = ImmutableSet.of(); | ||
| return; | ||
| } | ||
|
|
||
| MetaData newMetaData = state.metaData(); | ||
| // we don't check if metaData changed, since we might be called several times and we need to check dangling... | ||
|
|
||
| boolean success = true; | ||
| // only applied to master node, writing the global and index level states | ||
| if (state.nodes().localNode().masterNode()) { | ||
| // write the state if this node is a master eligible node or if it is a data node and has shards allocated on it | ||
| if (state.nodes().localNode().masterNode() || state.nodes().localNode().dataNode()) { | ||
| if (previousMetaData == null) { | ||
| try { | ||
| // we determine if or if not we write meta data on data only nodes by looking at the shard routing | ||
| // and only write if a shard of this index is allocated on this node | ||
| // however, closed indices do not appear in the shard routing. if the meta data for a closed index is | ||
| // updated it will therefore not be written in case the list of previouslyWrittenIndices is empty (because state | ||
| // persistence was disabled or the node was restarted), see getRelevantIndicesOnDataOnlyNode(). | ||
| // we therefore have to check here if we have shards on disk and add their indices to the previouslyWrittenIndices list | ||
| if (isDataOnlyNode(state)) { | ||
| ImmutableSet.Builder<String> previouslyWrittenIndicesBuilder = ImmutableSet.builder(); | ||
| for (IndexMetaData indexMetaData : newMetaData) { | ||
| IndexMetaData indexMetaDataOnDisk = null; | ||
| if (indexMetaData.state().equals(IndexMetaData.State.CLOSE)) { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we skip this loading from disk? I made the change so that we still load (see
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as far as I am concerned we could just skip it since we write the new one anyway. Yet, we might have a new meta on disk but that would be a bug too no? so I think we can drop it?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, removed this check now. |
||
| indexMetaDataOnDisk = metaStateService.loadIndexState(indexMetaData.index()); | ||
| } | ||
| if (indexMetaDataOnDisk != null) { | ||
| previouslyWrittenIndicesBuilder.add(indexMetaDataOnDisk.index()); | ||
| } | ||
| } | ||
| previouslyWrittenIndices = previouslyWrittenIndicesBuilder.addAll(previouslyWrittenIndices).build(); | ||
| } | ||
| } catch (Throwable e) { | ||
| success = false; | ||
| } | ||
| } | ||
| // check if the global state changed? | ||
| if (currentMetaData == null || !MetaData.isGlobalStateEquals(currentMetaData, newMetaData)) { | ||
| if (previousMetaData == null || !MetaData.isGlobalStateEquals(previousMetaData, newMetaData)) { | ||
| try { | ||
| metaStateService.writeGlobalState("changed", newMetaData); | ||
| } catch (Throwable e) { | ||
| success = false; | ||
| } | ||
| } | ||
|
|
||
| Iterable<IndexMetaWriteInfo> writeInfo; | ||
| relevantIndices = getRelevantIndices(event.state(), previouslyWrittenIndices); | ||
| writeInfo = resolveStatesToBeWritten(previouslyWrittenIndices, relevantIndices, previousMetaData, event.state().metaData()); | ||
| // check and write changes in indices | ||
| for (IndexMetaData indexMetaData : newMetaData) { | ||
| String writeReason = null; | ||
| IndexMetaData currentIndexMetaData; | ||
| if (currentMetaData == null) { | ||
| // a new event..., check from the state stored | ||
| try { | ||
| currentIndexMetaData = metaStateService.loadIndexState(indexMetaData.index()); | ||
| } catch (IOException ex) { | ||
| throw new ElasticsearchException("failed to load index state", ex); | ||
| } | ||
| } else { | ||
| currentIndexMetaData = currentMetaData.index(indexMetaData.index()); | ||
| } | ||
| if (currentIndexMetaData == null) { | ||
| writeReason = "freshly created"; | ||
| } else if (currentIndexMetaData.version() != indexMetaData.version()) { | ||
| writeReason = "version changed from [" + currentIndexMetaData.version() + "] to [" + indexMetaData.version() + "]"; | ||
| } | ||
|
|
||
| // we update the writeReason only if we really need to write it | ||
| if (writeReason == null) { | ||
| continue; | ||
| } | ||
|
|
||
| for (IndexMetaWriteInfo indexMetaWrite : writeInfo) { | ||
| try { | ||
| metaStateService.writeIndex(writeReason, indexMetaData, currentIndexMetaData); | ||
| metaStateService.writeIndex(indexMetaWrite.reason, indexMetaWrite.newMetaData, indexMetaWrite.previousMetaData); | ||
| } catch (Throwable e) { | ||
| success = false; | ||
| } | ||
|
|
@@ -154,8 +163,27 @@ public void clusterChanged(ClusterChangedEvent event) { | |
| danglingIndicesState.processDanglingIndices(newMetaData); | ||
|
|
||
| if (success) { | ||
| currentMetaData = newMetaData; | ||
| previousMetaData = newMetaData; | ||
| ImmutableSet.Builder<String> builder = ImmutableSet.builder(); | ||
| previouslyWrittenIndices = builder.addAll(relevantIndices).build(); | ||
| } | ||
| } | ||
|
|
||
| public static Set<String> getRelevantIndices(ClusterState state, ImmutableSet<String> previouslyWrittenIndices) { | ||
| Set<String> relevantIndices; | ||
| if (isDataOnlyNode(state)) { | ||
| relevantIndices = getRelevantIndicesOnDataOnlyNode(state, previouslyWrittenIndices); | ||
| } else if (state.nodes().localNode().masterNode() == true) { | ||
| relevantIndices = getRelevantIndicesForMasterEligibleNode(state); | ||
| } else { | ||
| relevantIndices = Collections.emptySet(); | ||
| } | ||
| return relevantIndices; | ||
| } | ||
|
|
||
|
|
||
| protected static boolean isDataOnlyNode(ClusterState state) { | ||
| return ((state.nodes().localNode().masterNode() == false) && state.nodes().localNode().dataNode()); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -229,7 +257,7 @@ private void pre20Upgrade() throws Exception { | |
| } | ||
| } | ||
| } | ||
| if (hasCustomPre20HashFunction|| pre20UseType != null) { | ||
| if (hasCustomPre20HashFunction || pre20UseType != null) { | ||
| logger.warn("Settings [{}] and [{}] are deprecated. Index settings from your old indices have been updated to record the fact that they " | ||
| + "used some custom routing logic, you can now remove these settings from your `elasticsearch.yml` file", DEPRECATED_SETTING_ROUTING_HASH_FUNCTION, DEPRECATED_SETTING_ROUTING_USE_TYPE); | ||
| } | ||
|
|
@@ -251,4 +279,82 @@ private void ensureNoPre019ShardState(NodeEnvironment nodeEnv) throws Exception | |
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Loads the current meta state for each index in the new cluster state and checks if it has to be persisted. | ||
| * Each index state that should be written to disk will be returned. This is only run for data only nodes. | ||
| * It will return only the states for indices that actually have a shard allocated on the current node. | ||
| * | ||
| * @param previouslyWrittenIndices A list of indices for which the state was already written before | ||
| * @param potentiallyUnwrittenIndices The list of indices for which state should potentially be written | ||
| * @param previousMetaData The last meta data we know of. meta data for all indices in previouslyWrittenIndices list is persisted now | ||
| * @param newMetaData The new metadata | ||
| * @return iterable over all indices states that should be written to disk | ||
| */ | ||
| public static Iterable<GatewayMetaState.IndexMetaWriteInfo> resolveStatesToBeWritten(ImmutableSet<String> previouslyWrittenIndices, Set<String> potentiallyUnwrittenIndices, MetaData previousMetaData, MetaData newMetaData) { | ||
| List<GatewayMetaState.IndexMetaWriteInfo> indicesToWrite = new ArrayList<>(); | ||
| for (String index : potentiallyUnwrittenIndices) { | ||
| IndexMetaData newIndexMetaData = newMetaData.index(index); | ||
| IndexMetaData previousIndexMetaData = previousMetaData == null ? null : previousMetaData.index(index); | ||
| String writeReason = null; | ||
| if (previouslyWrittenIndices.contains(index) == false || previousIndexMetaData == null) { | ||
| writeReason = "freshly created"; | ||
| } else if (previousIndexMetaData.version() != newIndexMetaData.version()) { | ||
| writeReason = "version changed from [" + previousIndexMetaData.version() + "] to [" + newIndexMetaData.version() + "]"; | ||
| } | ||
| if (writeReason != null) { | ||
| indicesToWrite.add(new GatewayMetaState.IndexMetaWriteInfo(newIndexMetaData, previousIndexMetaData, writeReason)); | ||
| } | ||
| } | ||
| return indicesToWrite; | ||
| } | ||
|
|
||
| public static Set<String> getRelevantIndicesOnDataOnlyNode(ClusterState state, ImmutableSet<String> previouslyWrittenIndices) { | ||
| RoutingNode newRoutingNode = state.getRoutingNodes().node(state.nodes().localNodeId()); | ||
| if (newRoutingNode == null) { | ||
| throw new IllegalStateException("cluster state does not contain this node - cannot write index meta state"); | ||
| } | ||
| Set<String> indices = new HashSet<>(); | ||
| for (MutableShardRouting routing : newRoutingNode) { | ||
| indices.add(routing.index()); | ||
| } | ||
| // we have to check the meta data also: closed indices will not appear in the routing table, but we must still write the state if we have it written on disk previously | ||
| for (IndexMetaData indexMetaData : state.metaData()) { | ||
| if (previouslyWrittenIndices.contains(indexMetaData.getIndex()) && state.metaData().getIndices().get(indexMetaData.getIndex()).state().equals(IndexMetaData.State.CLOSE)) { | ||
| indices.add(indexMetaData.getIndex()); | ||
| } | ||
| } | ||
| return indices; | ||
| } | ||
|
|
||
| public static Set<String> getRelevantIndicesForMasterEligibleNode(ClusterState state) { | ||
| Set<String> relevantIndices; | ||
| relevantIndices = new HashSet<>(); | ||
| // we have to iterate over the metadata to make sure we also capture closed indices | ||
| for (IndexMetaData indexMetaData : state.metaData()) { | ||
| relevantIndices.add(indexMetaData.getIndex()); | ||
| } | ||
| return relevantIndices; | ||
| } | ||
|
|
||
|
|
||
| public static class IndexMetaWriteInfo { | ||
| final IndexMetaData newMetaData; | ||
| final String reason; | ||
| final IndexMetaData previousMetaData; | ||
|
|
||
| public IndexMetaWriteInfo(IndexMetaData newMetaData, IndexMetaData previousMetaData, String reason) { | ||
| this.newMetaData = newMetaData; | ||
| this.reason = reason; | ||
| this.previousMetaData = previousMetaData; | ||
| } | ||
|
|
||
| public IndexMetaData getNewMetaData() { | ||
| return newMetaData; | ||
| } | ||
|
|
||
| public String getReason() { | ||
| return reason; | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering until when do we need the BWC bellow? (not saying remove now)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The pre20Upgrade() we should still do because this change is going to 1.x as well. But we probably need not check on data nodes for pre 019 state. Would it harm to keep this check or should I explicitly only check pre 019 on master nodes and pre 20 on master and data nodes?