2020package org .elasticsearch .gateway ;
2121
2222import com .carrotsearch .hppc .cursors .ObjectObjectCursor ;
23- import org .apache .logging .log4j .LogManager ;
24- import org .apache .logging .log4j .Logger ;
2523import org .apache .lucene .util .SetOnce ;
2624import org .elasticsearch .ElasticsearchException ;
2725import org .elasticsearch .Version ;
28- import org .elasticsearch .cluster .ClusterChangedEvent ;
2926import org .elasticsearch .cluster .ClusterName ;
3027import org .elasticsearch .cluster .ClusterState ;
31- import org .elasticsearch .cluster .ClusterStateApplier ;
3228import org .elasticsearch .cluster .coordination .CoordinationState .PersistedState ;
3329import org .elasticsearch .cluster .coordination .InMemoryPersistedState ;
3430import org .elasticsearch .cluster .metadata .IndexMetaData ;
3531import org .elasticsearch .cluster .metadata .IndexTemplateMetaData ;
36- import org .elasticsearch .cluster .metadata .Manifest ;
3732import org .elasticsearch .cluster .metadata .MetaData ;
3833import org .elasticsearch .cluster .metadata .MetaDataIndexUpgradeService ;
3934import org .elasticsearch .cluster .node .DiscoveryNode ;
4035import org .elasticsearch .cluster .service .ClusterService ;
4136import org .elasticsearch .common .collect .ImmutableOpenMap ;
42- import org .elasticsearch .common .collect .Tuple ;
4337import org .elasticsearch .common .settings .Settings ;
44- import org .elasticsearch .common .unit .TimeValue ;
4538import org .elasticsearch .core .internal .io .IOUtils ;
46- import org .elasticsearch .index .Index ;
4739import org .elasticsearch .plugins .MetaDataUpgrader ;
4840import org .elasticsearch .transport .TransportService ;
4941
6557 * non-stale state, and master-ineligible nodes receive the real cluster state from the elected master after joining the cluster.
6658 */
6759public class GatewayMetaState implements Closeable {
68- private static final Logger logger = LogManager .getLogger (GatewayMetaState .class );
6960
7061 // Set by calling start()
7162 private final SetOnce <PersistedState > persistedState = new SetOnce <>();
@@ -81,45 +72,23 @@ public MetaData getMetaData() {
8172 }
8273
8374 public void start (Settings settings , TransportService transportService , ClusterService clusterService ,
84- MetaStateService metaStateService , MetaDataIndexUpgradeService metaDataIndexUpgradeService ,
75+ MetaDataIndexUpgradeService metaDataIndexUpgradeService ,
8576 MetaDataUpgrader metaDataUpgrader , LucenePersistedStateFactory lucenePersistedStateFactory ) {
8677 assert persistedState .get () == null : "should only start once, but already have " + persistedState .get ();
8778
88- if (DiscoveryNode .isMasterNode (settings )) {
79+ if (DiscoveryNode .isMasterNode (settings ) || DiscoveryNode . isDataNode ( settings ) ) {
8980 try {
9081 persistedState .set (lucenePersistedStateFactory .loadPersistedState ((version , metadata ) ->
9182 prepareInitialClusterState (transportService , clusterService ,
9283 ClusterState .builder (ClusterName .CLUSTER_NAME_SETTING .get (settings ))
9384 .version (version )
94- .metaData (upgradeMetaDataForMasterEligibleNode (metadata , metaDataIndexUpgradeService , metaDataUpgrader ))
95- .build ())));
85+ .metaData (upgradeMetaDataForNode (metadata , metaDataIndexUpgradeService , metaDataUpgrader ))
86+ .build ()))
87+ );
9688 } catch (IOException e ) {
9789 throw new ElasticsearchException ("failed to load metadata" , e );
9890 }
99- }
100-
101- if (DiscoveryNode .isDataNode (settings )) {
102- final Tuple <Manifest , ClusterState > manifestClusterStateTuple ;
103- try {
104- upgradeMetaData (settings , metaStateService , metaDataIndexUpgradeService , metaDataUpgrader );
105- manifestClusterStateTuple = loadStateAndManifest (ClusterName .CLUSTER_NAME_SETTING .get (settings ), metaStateService );
106- } catch (IOException e ) {
107- throw new ElasticsearchException ("failed to load metadata" , e );
108- }
109-
110- final IncrementalClusterStateWriter incrementalClusterStateWriter
111- = new IncrementalClusterStateWriter (settings , clusterService .getClusterSettings (), metaStateService ,
112- manifestClusterStateTuple .v1 (),
113- prepareInitialClusterState (transportService , clusterService , manifestClusterStateTuple .v2 ()),
114- transportService .getThreadPool ()::relativeTimeInMillis );
115-
116- clusterService .addLowPriorityApplier (new GatewayClusterApplier (incrementalClusterStateWriter ));
117-
118- if (DiscoveryNode .isMasterNode (settings ) == false ) {
119- persistedState .set (
120- new InMemoryPersistedState (manifestClusterStateTuple .v1 ().getCurrentTerm (), manifestClusterStateTuple .v2 ()));
121- }
122- } else if (DiscoveryNode .isMasterNode (settings ) == false ) {
91+ } else {
12392 persistedState .set (
12493 new InMemoryPersistedState (0L , ClusterState .builder (ClusterName .CLUSTER_NAME_SETTING .get (settings )).build ()));
12594 }
@@ -138,76 +107,12 @@ ClusterState prepareInitialClusterState(TransportService transportService, Clust
138107 }
139108
140109 // exposed so it can be overridden by tests
141- MetaData upgradeMetaDataForMasterEligibleNode (MetaData metaData ,
142- MetaDataIndexUpgradeService metaDataIndexUpgradeService ,
143- MetaDataUpgrader metaDataUpgrader ) {
110+ MetaData upgradeMetaDataForNode (MetaData metaData ,
111+ MetaDataIndexUpgradeService metaDataIndexUpgradeService ,
112+ MetaDataUpgrader metaDataUpgrader ) {
144113 return upgradeMetaData (metaData , metaDataIndexUpgradeService , metaDataUpgrader );
145114 }
146115
147- // exposed so it can be overridden by tests
148- void upgradeMetaData (Settings settings , MetaStateService metaStateService , MetaDataIndexUpgradeService metaDataIndexUpgradeService ,
149- MetaDataUpgrader metaDataUpgrader ) throws IOException {
150- if (isMasterOrDataNode (settings )) {
151- try {
152- final Tuple <Manifest , MetaData > metaStateAndData = metaStateService .loadFullState ();
153- final Manifest manifest = metaStateAndData .v1 ();
154- final MetaData metaData = metaStateAndData .v2 ();
155-
156- // We finished global state validation and successfully checked all indices for backward compatibility
157- // and found no non-upgradable indices, which means the upgrade can continue.
158- // Now it's safe to overwrite global and index metadata.
159- // We don't re-write metadata if it's not upgraded by upgrade plugins, because
160- // if there is manifest file, it means metadata is properly persisted to all data paths
161- // if there is no manifest file (upgrade from 6.x to 7.x) metadata might be missing on some data paths,
162- // but anyway we will re-write it as soon as we receive first ClusterState
163- final IncrementalClusterStateWriter .AtomicClusterStateWriter writer
164- = new IncrementalClusterStateWriter .AtomicClusterStateWriter (metaStateService , manifest );
165- final MetaData upgradedMetaData = upgradeMetaData (metaData , metaDataIndexUpgradeService , metaDataUpgrader );
166-
167- final long globalStateGeneration ;
168- if (MetaData .isGlobalStateEquals (metaData , upgradedMetaData ) == false ) {
169- globalStateGeneration = writer .writeGlobalState ("upgrade" , upgradedMetaData );
170- } else {
171- globalStateGeneration = manifest .getGlobalGeneration ();
172- }
173-
174- Map <Index , Long > indices = new HashMap <>(manifest .getIndexGenerations ());
175- for (IndexMetaData indexMetaData : upgradedMetaData ) {
176- if (metaData .hasIndexMetaData (indexMetaData ) == false ) {
177- final long generation = writer .writeIndex ("upgrade" , indexMetaData );
178- indices .put (indexMetaData .getIndex (), generation );
179- }
180- }
181-
182- final Manifest newManifest = new Manifest (manifest .getCurrentTerm (), manifest .getClusterStateVersion (),
183- globalStateGeneration , indices );
184- writer .writeManifestAndCleanup ("startup" , newManifest );
185- } catch (Exception e ) {
186- logger .error ("failed to read or upgrade local state, exiting..." , e );
187- throw e ;
188- }
189- }
190- }
191-
192- private static Tuple <Manifest ,ClusterState > loadStateAndManifest (ClusterName clusterName ,
193- MetaStateService metaStateService ) throws IOException {
194- final long startNS = System .nanoTime ();
195- final Tuple <Manifest , MetaData > manifestAndMetaData = metaStateService .loadFullState ();
196- final Manifest manifest = manifestAndMetaData .v1 ();
197-
198- final ClusterState clusterState = ClusterState .builder (clusterName )
199- .version (manifest .getClusterStateVersion ())
200- .metaData (manifestAndMetaData .v2 ()).build ();
201-
202- logger .debug ("took {} to load state" , TimeValue .timeValueMillis (TimeValue .nsecToMSec (System .nanoTime () - startNS )));
203-
204- return Tuple .tuple (manifest , clusterState );
205- }
206-
207- private static boolean isMasterOrDataNode (Settings settings ) {
208- return DiscoveryNode .isMasterNode (settings ) || DiscoveryNode .isDataNode (settings );
209- }
210-
211116 /**
212117 * Elasticsearch 2.0 removed several deprecated features and as well as support for Lucene 3.x. This method calls
213118 * {@link MetaDataIndexUpgradeService} to makes sure that indices are compatible with the current version. The
@@ -262,36 +167,4 @@ public void close() throws IOException {
262167 IOUtils .close (persistedState .get ());
263168 }
264169
265- private static class GatewayClusterApplier implements ClusterStateApplier {
266-
267- private final IncrementalClusterStateWriter incrementalClusterStateWriter ;
268-
269- private GatewayClusterApplier (IncrementalClusterStateWriter incrementalClusterStateWriter ) {
270- this .incrementalClusterStateWriter = incrementalClusterStateWriter ;
271- }
272-
273- @ Override
274- public void applyClusterState (ClusterChangedEvent event ) {
275- if (event .state ().blocks ().disableStatePersistence ()) {
276- incrementalClusterStateWriter .setIncrementalWrite (false );
277- return ;
278- }
279-
280- try {
281- // Hack: This is to ensure that non-master-eligible Zen2 nodes always store a current term
282- // that's higher than the last accepted term.
283- // TODO: can we get rid of this hack?
284- if (event .state ().term () > incrementalClusterStateWriter .getPreviousManifest ().getCurrentTerm ()) {
285- incrementalClusterStateWriter .setCurrentTerm (event .state ().term ());
286- }
287-
288- incrementalClusterStateWriter .updateClusterState (event .state ());
289- incrementalClusterStateWriter .setIncrementalWrite (true );
290- } catch (WriteStateException e ) {
291- logger .warn ("Exception occurred when storing new meta data" , e );
292- }
293- }
294-
295- }
296-
297170}
0 commit comments