2222import com .carrotsearch .hppc .cursors .ObjectObjectCursor ;
2323import org .apache .logging .log4j .LogManager ;
2424import org .apache .logging .log4j .Logger ;
25- import org .apache .logging .log4j .message .ParameterizedMessage ;
2625import org .apache .lucene .util .SetOnce ;
2726import org .elasticsearch .ElasticsearchException ;
2827import org .elasticsearch .Version ;
4342import org .elasticsearch .common .collect .Tuple ;
4443import org .elasticsearch .common .settings .Settings ;
4544import org .elasticsearch .common .unit .TimeValue ;
45+ import org .elasticsearch .core .internal .io .IOUtils ;
4646import org .elasticsearch .index .Index ;
4747import org .elasticsearch .plugins .MetaDataUpgrader ;
4848import org .elasticsearch .transport .TransportService ;
4949
50+ import java .io .Closeable ;
5051import java .io .IOException ;
5152import java .util .HashMap ;
5253import java .util .Map ;
6364 * ClusterState#metaData()} because it might be stale or incomplete. Master-eligible nodes must perform an election to find a complete and
6465 * non-stale state, and master-ineligible nodes receive the real cluster state from the elected master after joining the cluster.
6566 */
66- public class GatewayMetaState {
67+ public class GatewayMetaState implements Closeable {
6768 private static final Logger logger = LogManager .getLogger (GatewayMetaState .class );
6869
6970 // Set by calling start()
@@ -81,49 +82,46 @@ public MetaData getMetaData() {
8182
8283 public void start (Settings settings , TransportService transportService , ClusterService clusterService ,
8384 MetaStateService metaStateService , MetaDataIndexUpgradeService metaDataIndexUpgradeService ,
84- MetaDataUpgrader metaDataUpgrader ) {
85+ MetaDataUpgrader metaDataUpgrader , LucenePersistedStateFactory lucenePersistedStateFactory ) {
8586 assert persistedState .get () == null : "should only start once, but already have " + persistedState .get ();
8687
87- final Tuple <Manifest , ClusterState > manifestClusterStateTuple ;
88- try {
89- upgradeMetaData (settings , metaStateService , metaDataIndexUpgradeService , metaDataUpgrader );
90- manifestClusterStateTuple = loadStateAndManifest (ClusterName .CLUSTER_NAME_SETTING .get (settings ), metaStateService );
91- } catch (IOException e ) {
92- throw new ElasticsearchException ("failed to load metadata" , e );
88+ if (DiscoveryNode .isMasterNode (settings )) {
89+ try {
90+ persistedState .set (lucenePersistedStateFactory .loadPersistedState ((version , metadata ) ->
91+ prepareInitialClusterState (transportService , clusterService ,
92+ ClusterState .builder (ClusterName .CLUSTER_NAME_SETTING .get (settings ))
93+ .version (version )
94+ .metaData (upgradeMetaDataForMasterEligibleNode (metadata , metaDataIndexUpgradeService , metaDataUpgrader ))
95+ .build ())));
96+ } catch (IOException e ) {
97+ throw new ElasticsearchException ("failed to load metadata" , e );
98+ }
9399 }
94100
95- final IncrementalClusterStateWriter incrementalClusterStateWriter
96- = new IncrementalClusterStateWriter (settings , clusterService .getClusterSettings (), metaStateService ,
101+ if (DiscoveryNode .isDataNode (settings )) {
102+ final Tuple <Manifest , ClusterState > manifestClusterStateTuple ;
103+ try {
104+ upgradeMetaData (settings , metaStateService , metaDataIndexUpgradeService , metaDataUpgrader );
105+ manifestClusterStateTuple = loadStateAndManifest (ClusterName .CLUSTER_NAME_SETTING .get (settings ), metaStateService );
106+ } catch (IOException e ) {
107+ throw new ElasticsearchException ("failed to load metadata" , e );
108+ }
109+
110+ final IncrementalClusterStateWriter incrementalClusterStateWriter
111+ = new IncrementalClusterStateWriter (settings , clusterService .getClusterSettings (), metaStateService ,
97112 manifestClusterStateTuple .v1 (),
98113 prepareInitialClusterState (transportService , clusterService , manifestClusterStateTuple .v2 ()),
99114 transportService .getThreadPool ()::relativeTimeInMillis );
100- if (DiscoveryNode .isMasterNode (settings ) == false ) {
101- if (DiscoveryNode .isDataNode (settings )) {
102- // Master-eligible nodes persist index metadata for all indices regardless of whether they hold any shards or not. It's
103- // vitally important to the safety of the cluster coordination system that master-eligible nodes persist this metadata when
104- // _accepting_ the cluster state (i.e. before it is committed). This persistence happens on the generic threadpool.
105- //
106- // In contrast, master-ineligible data nodes only persist the index metadata for shards that they hold. When all shards of
107- // an index are moved off such a node the IndicesStore is responsible for removing the corresponding index directory,
108- // including the metadata, and does so on the cluster applier thread.
109- //
110- // This presents a problem: if a shard is unassigned from a node and then reassigned back to it again then there is a race
111- // between the IndicesStore deleting the index folder and the CoordinationState concurrently trying to write the updated
112- // metadata into it. We could probably solve this with careful synchronization, but in fact there is no need. The persisted
113- // state on master-ineligible data nodes is mostly ignored - it's only there to support dangling index imports, which is
114- // inherently unsafe anyway. Thus we can safely delay metadata writes on master-ineligible data nodes until applying the
115- // cluster state, which is what this does:
116- clusterService .addLowPriorityApplier (new GatewayClusterApplier (incrementalClusterStateWriter ));
117- }
118115
119- // Master-ineligible nodes do not need to persist the cluster state when accepting it because they are not in the voting
120- // configuration, so it's ok if they have a stale or incomplete cluster state when restarted. We track the latest cluster state
121- // in memory instead.
122- persistedState .set (new InMemoryPersistedState (manifestClusterStateTuple .v1 ().getCurrentTerm (), manifestClusterStateTuple .v2 ()));
123- } else {
124- // Master-ineligible nodes must persist the cluster state when accepting it because they must reload the (complete, fresh)
125- // last-accepted cluster state when restarted.
126- persistedState .set (new GatewayPersistedState (incrementalClusterStateWriter ));
116+ clusterService .addLowPriorityApplier (new GatewayClusterApplier (incrementalClusterStateWriter ));
117+
118+ if (DiscoveryNode .isMasterNode (settings ) == false ) {
119+ persistedState .set (
120+ new InMemoryPersistedState (manifestClusterStateTuple .v1 ().getCurrentTerm (), manifestClusterStateTuple .v2 ()));
121+ }
122+ } else if (DiscoveryNode .isMasterNode (settings ) == false ) {
123+ persistedState .set (
124+ new InMemoryPersistedState (0L , ClusterState .builder (ClusterName .CLUSTER_NAME_SETTING .get (settings )).build ()));
127125 }
128126 }
129127
@@ -139,6 +137,13 @@ ClusterState prepareInitialClusterState(TransportService transportService, Clust
139137 .apply (clusterState );
140138 }
141139
140+ // exposed so it can be overridden by tests
141+ MetaData upgradeMetaDataForMasterEligibleNode (MetaData metaData ,
142+ MetaDataIndexUpgradeService metaDataIndexUpgradeService ,
143+ MetaDataUpgrader metaDataUpgrader ) {
144+ return upgradeMetaData (metaData , metaDataIndexUpgradeService , metaDataUpgrader );
145+ }
146+
142147 // exposed so it can be overridden by tests
143148 void upgradeMetaData (Settings settings , MetaStateService metaStateService , MetaDataIndexUpgradeService metaDataIndexUpgradeService ,
144149 MetaDataUpgrader metaDataUpgrader ) throws IOException {
@@ -252,6 +257,10 @@ private static boolean applyPluginUpgraders(ImmutableOpenMap<String, IndexTempla
252257 return false ;
253258 }
254259
260+ @ Override
261+ public void close () throws IOException {
262+ IOUtils .close (persistedState .get ());
263+ }
255264
256265 private static class GatewayClusterApplier implements ClusterStateApplier {
257266
@@ -285,48 +294,4 @@ public void applyClusterState(ClusterChangedEvent event) {
285294
286295 }
287296
288- private static class GatewayPersistedState implements PersistedState {
289-
290- private final IncrementalClusterStateWriter incrementalClusterStateWriter ;
291-
292- GatewayPersistedState (IncrementalClusterStateWriter incrementalClusterStateWriter ) {
293- this .incrementalClusterStateWriter = incrementalClusterStateWriter ;
294- }
295-
296- @ Override
297- public long getCurrentTerm () {
298- return incrementalClusterStateWriter .getPreviousManifest ().getCurrentTerm ();
299- }
300-
301- @ Override
302- public ClusterState getLastAcceptedState () {
303- final ClusterState previousClusterState = incrementalClusterStateWriter .getPreviousClusterState ();
304- assert previousClusterState .nodes ().getLocalNode () != null : "Cluster state is not fully built yet" ;
305- return previousClusterState ;
306- }
307-
308- @ Override
309- public void setCurrentTerm (long currentTerm ) {
310- try {
311- incrementalClusterStateWriter .setCurrentTerm (currentTerm );
312- } catch (WriteStateException e ) {
313- logger .error (new ParameterizedMessage ("Failed to set current term to {}" , currentTerm ), e );
314- e .rethrowAsErrorOrUncheckedException ();
315- }
316- }
317-
318- @ Override
319- public void setLastAcceptedState (ClusterState clusterState ) {
320- try {
321- incrementalClusterStateWriter .setIncrementalWrite (
322- incrementalClusterStateWriter .getPreviousClusterState ().term () == clusterState .term ());
323- incrementalClusterStateWriter .updateClusterState (clusterState );
324- } catch (WriteStateException e ) {
325- logger .error (new ParameterizedMessage ("Failed to set last accepted state with version {}" , clusterState .version ()), e );
326- e .rethrowAsErrorOrUncheckedException ();
327- }
328- }
329-
330- }
331-
332297}
0 commit comments