2020package org .elasticsearch .gateway ;
2121
2222import com .carrotsearch .hppc .cursors .ObjectObjectCursor ;
23+ import org .apache .logging .log4j .LogManager ;
24+ import org .apache .logging .log4j .Logger ;
2325import org .apache .lucene .util .SetOnce ;
2426import org .elasticsearch .ElasticsearchException ;
2527import org .elasticsearch .Version ;
2628import org .elasticsearch .cluster .ClusterName ;
2729import org .elasticsearch .cluster .ClusterState ;
30+ import org .elasticsearch .cluster .coordination .CoordinationMetaData ;
2831import org .elasticsearch .cluster .coordination .CoordinationState .PersistedState ;
2932import org .elasticsearch .cluster .coordination .InMemoryPersistedState ;
3033import org .elasticsearch .cluster .metadata .IndexMetaData ;
3740import org .elasticsearch .common .collect .ImmutableOpenMap ;
3841import org .elasticsearch .common .collect .Tuple ;
3942import org .elasticsearch .common .settings .Settings ;
43+ import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
44+ import org .elasticsearch .common .util .concurrent .EsExecutors ;
45+ import org .elasticsearch .common .util .concurrent .EsRejectedExecutionException ;
46+ import org .elasticsearch .common .util .concurrent .EsThreadPoolExecutor ;
4047import org .elasticsearch .core .internal .io .IOUtils ;
4148import org .elasticsearch .env .NodeMetaData ;
49+ import org .elasticsearch .node .Node ;
4250import org .elasticsearch .plugins .MetaDataUpgrader ;
51+ import org .elasticsearch .threadpool .ThreadPool ;
4352import org .elasticsearch .transport .TransportService ;
4453
4554import java .io .Closeable ;
4655import java .io .IOException ;
4756import java .io .UncheckedIOException ;
57+ import java .util .Collections ;
4858import java .util .HashMap ;
4959import java .util .Map ;
60+ import java .util .Objects ;
61+ import java .util .concurrent .TimeUnit ;
5062import java .util .function .BiConsumer ;
5163import java .util .function .Consumer ;
5264import java .util .function .Function ;
5365import java .util .function .UnaryOperator ;
5466
67+ import static org .elasticsearch .common .util .concurrent .EsExecutors .daemonThreadFactory ;
68+
5569/**
5670 * Loads (and maybe upgrades) cluster metadata at startup, and persistently stores cluster metadata for future restarts.
5771 *
@@ -100,16 +114,20 @@ public void start(Settings settings, TransportService transportService, ClusterS
100114 }
101115
102116 final PersistedClusterStateService .Writer persistenceWriter = persistedClusterStateService .createWriter ();
103- final LucenePersistedState lucenePersistedState ;
117+ final PersistedState persistedState ;
104118 boolean success = false ;
105119 try {
106120 final ClusterState clusterState = prepareInitialClusterState (transportService , clusterService ,
107121 ClusterState .builder (ClusterName .CLUSTER_NAME_SETTING .get (settings ))
108122 .version (lastAcceptedVersion )
109123 .metaData (upgradeMetaDataForNode (metaData , metaDataIndexUpgradeService , metaDataUpgrader ))
110124 .build ());
111- lucenePersistedState = new LucenePersistedState (
112- persistenceWriter , currentTerm , clusterState );
125+ if (DiscoveryNode .isMasterNode (settings )) {
126+ persistedState = new LucenePersistedState (persistenceWriter , currentTerm , clusterState );
127+ } else {
128+ persistedState = new AsyncLucenePersistedState (settings , transportService .getThreadPool (),
129+ new LucenePersistedState (persistenceWriter , currentTerm , clusterState ));
130+ }
113131 if (DiscoveryNode .isDataNode (settings )) {
114132 metaStateService .unreferenceAll (); // unreference legacy files (only keep them for dangling indices functionality)
115133 } else {
@@ -125,7 +143,7 @@ public void start(Settings settings, TransportService transportService, ClusterS
125143 }
126144 }
127145
128- persistedState .set (lucenePersistedState );
146+ this . persistedState .set (persistedState );
129147 } catch (IOException e ) {
130148 throw new ElasticsearchException ("failed to load metadata" , e );
131149 }
@@ -227,6 +245,146 @@ public void close() throws IOException {
227245 IOUtils .close (persistedState .get ());
228246 }
229247
248+ // visible for testing
249+ public boolean allPendingAsyncStatesWritten () {
250+ final PersistedState ps = persistedState .get ();
251+ if (ps instanceof AsyncLucenePersistedState ) {
252+ return ((AsyncLucenePersistedState ) ps ).allPendingAsyncStatesWritten ();
253+ } else {
254+ return true ;
255+ }
256+ }
257+
258+ static class AsyncLucenePersistedState extends InMemoryPersistedState {
259+
260+ private static final Logger logger = LogManager .getLogger (AsyncLucenePersistedState .class );
261+
262+ static final String THREAD_NAME = "AsyncLucenePersistedState#updateTask" ;
263+
264+ private final EsThreadPoolExecutor threadPoolExecutor ;
265+ private final PersistedState persistedState ;
266+
267+ boolean newCurrentTermQueued = false ;
268+ boolean newStateQueued = false ;
269+
270+ private final Object mutex = new Object ();
271+
272+ AsyncLucenePersistedState (Settings settings , ThreadPool threadPool , PersistedState persistedState ) {
273+ super (persistedState .getCurrentTerm (), persistedState .getLastAcceptedState ());
274+ final String nodeName = Objects .requireNonNull (Node .NODE_NAME_SETTING .get (settings ));
275+ threadPoolExecutor = EsExecutors .newFixed (
276+ nodeName + "/" + THREAD_NAME ,
277+ 1 , 1 ,
278+ daemonThreadFactory (nodeName , THREAD_NAME ),
279+ threadPool .getThreadContext ());
280+ this .persistedState = persistedState ;
281+ }
282+
283+ @ Override
284+ public void setCurrentTerm (long currentTerm ) {
285+ synchronized (mutex ) {
286+ super .setCurrentTerm (currentTerm );
287+ if (newCurrentTermQueued ) {
288+ logger .trace ("term update already queued (setting term to {})" , currentTerm );
289+ } else {
290+ logger .trace ("queuing term update (setting term to {})" , currentTerm );
291+ newCurrentTermQueued = true ;
292+ scheduleUpdate ();
293+ }
294+ }
295+ }
296+
297+ @ Override
298+ public void setLastAcceptedState (ClusterState clusterState ) {
299+ synchronized (mutex ) {
300+ super .setLastAcceptedState (clusterState );
301+ if (newStateQueued ) {
302+ logger .trace ("cluster state update already queued (setting cluster state to {})" , clusterState .version ());
303+ } else {
304+ logger .trace ("queuing cluster state update (setting cluster state to {})" , clusterState .version ());
305+ newStateQueued = true ;
306+ scheduleUpdate ();
307+ }
308+ }
309+ }
310+
311+ private void scheduleUpdate () {
312+ assert Thread .holdsLock (mutex );
313+ try {
314+ threadPoolExecutor .execute (new AbstractRunnable () {
315+
316+ @ Override
317+ public void onFailure (Exception e ) {
318+ logger .error ("Exception occurred when storing new meta data" , e );
319+ }
320+
321+ @ Override
322+ protected void doRun () {
323+ final Long term ;
324+ final ClusterState clusterState ;
325+ synchronized (mutex ) {
326+ if (newCurrentTermQueued ) {
327+ term = getCurrentTerm ();
328+ newCurrentTermQueued = false ;
329+ } else {
330+ term = null ;
331+ }
332+ if (newStateQueued ) {
333+ clusterState = getLastAcceptedState ();
334+ newStateQueued = false ;
335+ } else {
336+ clusterState = null ;
337+ }
338+ }
339+ // write current term before last accepted state so that it is never below term in last accepted state
340+ if (term != null ) {
341+ persistedState .setCurrentTerm (term );
342+ }
343+ if (clusterState != null ) {
344+ persistedState .setLastAcceptedState (resetVotingConfiguration (clusterState ));
345+ }
346+ }
347+ });
348+ } catch (EsRejectedExecutionException e ) {
349+ // ignore cases where we are shutting down..., there is really nothing interesting to be done here...
350+ if (threadPoolExecutor .isShutdown () == false ) {
351+ assert false : "only expect rejections when shutting down" ;
352+ throw e ;
353+ }
354+ }
355+ }
356+
357+ static final CoordinationMetaData .VotingConfiguration staleStateConfiguration =
358+ new CoordinationMetaData .VotingConfiguration (Collections .singleton ("STALE_STATE_CONFIG" ));
359+
360+ static ClusterState resetVotingConfiguration (ClusterState clusterState ) {
361+ CoordinationMetaData newCoordinationMetaData = CoordinationMetaData .builder (clusterState .coordinationMetaData ())
362+ .lastAcceptedConfiguration (staleStateConfiguration )
363+ .lastCommittedConfiguration (staleStateConfiguration )
364+ .build ();
365+ return ClusterState .builder (clusterState ).metaData (MetaData .builder (clusterState .metaData ())
366+ .coordinationMetaData (newCoordinationMetaData ).build ()).build ();
367+ }
368+
369+ @ Override
370+ public void close () throws IOException {
371+ try {
372+ ThreadPool .terminate (threadPoolExecutor , 10 , TimeUnit .SECONDS );
373+ } finally {
374+ persistedState .close ();
375+ }
376+ }
377+
378+ boolean allPendingAsyncStatesWritten () {
379+ synchronized (mutex ) {
380+ if (newCurrentTermQueued || newStateQueued ) {
381+ return false ;
382+ }
383+ return threadPoolExecutor .getActiveCount () == 0 ;
384+ }
385+ }
386+ }
387+
230388 /**
231389 * Encapsulates the incremental writing of metadata to a {@link PersistedClusterStateService.Writer}.
232390 */
0 commit comments