77
88import org .apache .logging .log4j .LogManager ;
99import org .apache .logging .log4j .Logger ;
10+ import org .apache .logging .log4j .message .ParameterizedMessage ;
11+ import org .elasticsearch .ElasticsearchException ;
1012import org .elasticsearch .action .ActionListener ;
1113import org .elasticsearch .action .admin .cluster .state .ClusterStateRequest ;
1214import org .elasticsearch .client .Client ;
1719import org .elasticsearch .cluster .metadata .IndexMetaData ;
1820import org .elasticsearch .cluster .metadata .MetaData ;
1921import org .elasticsearch .cluster .service .ClusterService ;
22+ import org .elasticsearch .common .collect .Tuple ;
2023import org .elasticsearch .common .settings .Settings ;
2124import org .elasticsearch .common .unit .TimeValue ;
25+ import org .elasticsearch .common .util .concurrent .AtomicArray ;
2226import org .elasticsearch .common .util .concurrent .CountDown ;
2327import org .elasticsearch .index .Index ;
2428import org .elasticsearch .license .LicenseUtils ;
2731import org .elasticsearch .xpack .ccr .CcrSettings ;
2832import org .elasticsearch .xpack .core .ccr .AutoFollowMetadata ;
2933import org .elasticsearch .xpack .core .ccr .AutoFollowMetadata .AutoFollowPattern ;
34+ import org .elasticsearch .xpack .core .ccr .AutoFollowStats ;
3035import org .elasticsearch .xpack .core .ccr .action .CreateAndFollowIndexAction ;
3136import org .elasticsearch .xpack .core .ccr .action .FollowIndexAction ;
3237
3338import java .util .ArrayList ;
39+ import java .util .Collections ;
3440import java .util .HashMap ;
41+ import java .util .LinkedHashMap ;
3542import java .util .List ;
3643import java .util .Map ;
3744import java .util .Objects ;
38- import java .util .concurrent . atomic . AtomicReference ;
45+ import java .util .TreeMap ;
3946import java .util .function .BiConsumer ;
4047import java .util .function .Consumer ;
4148import java .util .function .Function ;
4754public class AutoFollowCoordinator implements ClusterStateApplier {
4855
4956 private static final Logger LOGGER = LogManager .getLogger (AutoFollowCoordinator .class );
57+ private static final int MAX_AUTO_FOLLOW_ERRORS = 256 ;
5058
5159 private final Client client ;
5260 private final TimeValue pollInterval ;
@@ -56,6 +64,12 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
5664
5765 private volatile boolean localNodeMaster = false ;
5866
67+ // The following fields are read and updated under a lock:
68+ private long numberOfSuccessfulIndicesAutoFollowed = 0 ;
69+ private long numberOfFailedIndicesAutoFollowed = 0 ;
70+ private long numberOfFailedRemoteClusterStateRequests = 0 ;
71+ private final LinkedHashMap <String , ElasticsearchException > recentAutoFollowErrors ;
72+
5973 public AutoFollowCoordinator (
6074 Settings settings ,
6175 Client client ,
@@ -69,6 +83,47 @@ public AutoFollowCoordinator(
6983
7084 this .pollInterval = CcrSettings .CCR_AUTO_FOLLOW_POLL_INTERVAL .get (settings );
7185 clusterService .addStateApplier (this );
86+
87+ this .recentAutoFollowErrors = new LinkedHashMap <String , ElasticsearchException >() {
88+ @ Override
89+ protected boolean removeEldestEntry (final Map .Entry <String , ElasticsearchException > eldest ) {
90+ return size () > MAX_AUTO_FOLLOW_ERRORS ;
91+ }
92+ };
93+ }
94+
95+ public synchronized AutoFollowStats getStats () {
96+ return new AutoFollowStats (
97+ numberOfFailedIndicesAutoFollowed ,
98+ numberOfFailedRemoteClusterStateRequests ,
99+ numberOfSuccessfulIndicesAutoFollowed ,
100+ new TreeMap <>(recentAutoFollowErrors )
101+ );
102+ }
103+
104+ synchronized void updateStats (List <AutoFollowResult > results ) {
105+ for (AutoFollowResult result : results ) {
106+ if (result .clusterStateFetchException != null ) {
107+ recentAutoFollowErrors .put (result .clusterAlias ,
108+ new ElasticsearchException (result .clusterStateFetchException ));
109+ numberOfFailedRemoteClusterStateRequests ++;
110+ LOGGER .warn (new ParameterizedMessage ("failure occurred while fetching cluster state in leader cluster [{}]" ,
111+ result .clusterAlias ), result .clusterStateFetchException );
112+ } else {
113+ for (Map .Entry <Index , Exception > entry : result .autoFollowExecutionResults .entrySet ()) {
114+ if (entry .getValue () != null ) {
115+ numberOfFailedIndicesAutoFollowed ++;
116+ recentAutoFollowErrors .put (result .clusterAlias + ":" + entry .getKey ().getName (),
117+ new ElasticsearchException (entry .getValue ()));
118+ LOGGER .warn (new ParameterizedMessage ("failure occurred while auto following index [{}] in leader cluster [{}]" ,
119+ entry .getKey (), result .clusterAlias ), entry .getValue ());
120+ } else {
121+ numberOfSuccessfulIndicesAutoFollowed ++;
122+ }
123+ }
124+ }
125+
126+ }
72127 }
73128
74129 private void doAutoFollow () {
@@ -94,10 +149,8 @@ private void doAutoFollow() {
94149 return ;
95150 }
96151
97- Consumer <Exception > handler = e -> {
98- if (e != null ) {
99- LOGGER .warn ("failure occurred during auto-follower coordination" , e );
100- }
152+ Consumer <List <AutoFollowResult >> handler = results -> {
153+ updateStats (results );
101154 threadPool .schedule (pollInterval , ThreadPool .Names .SAME , this ::doAutoFollow );
102155 };
103156 AutoFollower operation = new AutoFollower (handler , followerClusterState ) {
@@ -178,101 +231,97 @@ public void applyClusterState(ClusterChangedEvent event) {
178231
179232 abstract static class AutoFollower {
180233
181- private final Consumer <Exception > handler ;
234+ private final Consumer <List < AutoFollowResult > > handler ;
182235 private final ClusterState followerClusterState ;
183236 private final AutoFollowMetadata autoFollowMetadata ;
184237
185238 private final CountDown autoFollowPatternsCountDown ;
186- private final AtomicReference < Exception > autoFollowPatternsErrorHolder = new AtomicReference <>() ;
239+ private final AtomicArray < AutoFollowResult > autoFollowResults ;
187240
188- AutoFollower (final Consumer <Exception > handler , final ClusterState followerClusterState ) {
241+ AutoFollower (final Consumer <List < AutoFollowResult > > handler , final ClusterState followerClusterState ) {
189242 this .handler = handler ;
190243 this .followerClusterState = followerClusterState ;
191244 this .autoFollowMetadata = followerClusterState .getMetaData ().custom (AutoFollowMetadata .TYPE );
192245 this .autoFollowPatternsCountDown = new CountDown (autoFollowMetadata .getPatterns ().size ());
246+ this .autoFollowResults = new AtomicArray <>(autoFollowMetadata .getPatterns ().size ());
193247 }
194248
195249 void autoFollowIndices () {
250+ int i = 0 ;
196251 for (Map .Entry <String , AutoFollowPattern > entry : autoFollowMetadata .getPatterns ().entrySet ()) {
197- String clusterAlias = entry . getKey () ;
198- AutoFollowPattern autoFollowPattern = entry .getValue ();
199- List < String > followedIndices = autoFollowMetadata . getFollowedLeaderIndexUUIDs (). get ( clusterAlias );
252+ final int slot = i ;
253+ final String clusterAlias = entry .getKey ();
254+ final AutoFollowPattern autoFollowPattern = entry . getValue ( );
200255
201256 getLeaderClusterState (autoFollowPattern .getHeaders (), clusterAlias , (leaderClusterState , e ) -> {
202257 if (leaderClusterState != null ) {
203258 assert e == null ;
204- handleClusterAlias (clusterAlias , autoFollowPattern , followedIndices , leaderClusterState );
259+ final List <String > followedIndices = autoFollowMetadata .getFollowedLeaderIndexUUIDs ().get (clusterAlias );
260+ final List <Index > leaderIndicesToFollow =
261+ getLeaderIndicesToFollow (autoFollowPattern , leaderClusterState , followerClusterState , followedIndices );
262+ if (leaderIndicesToFollow .isEmpty ()) {
263+ finalise (slot , new AutoFollowResult (clusterAlias ));
264+ } else {
265+ Consumer <AutoFollowResult > resultHandler = result -> finalise (slot , result );
266+ checkAutoFollowPattern (clusterAlias , autoFollowPattern , leaderIndicesToFollow , resultHandler );
267+ }
205268 } else {
206- finalise (e );
269+ finalise (slot , new AutoFollowResult ( clusterAlias , e ) );
207270 }
208271 });
272+ i ++;
209273 }
210274 }
211275
212- private void handleClusterAlias (String clusterAlias , AutoFollowPattern autoFollowPattern ,
213- List <String > followedIndexUUIDs , ClusterState leaderClusterState ) {
214- final List <Index > leaderIndicesToFollow =
215- getLeaderIndicesToFollow (autoFollowPattern , leaderClusterState , followerClusterState , followedIndexUUIDs );
216- if (leaderIndicesToFollow .isEmpty ()) {
217- finalise (null );
218- } else {
219- final CountDown leaderIndicesCountDown = new CountDown (leaderIndicesToFollow .size ());
220- final AtomicReference <Exception > leaderIndicesErrorHolder = new AtomicReference <>();
221- for (Index indexToFollow : leaderIndicesToFollow ) {
222- final String leaderIndexName = indexToFollow .getName ();
223- final String followIndexName = getFollowerIndexName (autoFollowPattern , leaderIndexName );
224-
225- String leaderIndexNameWithClusterAliasPrefix = clusterAlias .equals ("_local_" ) ? leaderIndexName :
226- clusterAlias + ":" + leaderIndexName ;
227- FollowIndexAction .Request followRequest =
228- new FollowIndexAction .Request (leaderIndexNameWithClusterAliasPrefix , followIndexName ,
229- autoFollowPattern .getMaxBatchOperationCount (), autoFollowPattern .getMaxConcurrentReadBatches (),
230- autoFollowPattern .getMaxOperationSizeInBytes (), autoFollowPattern .getMaxConcurrentWriteBatches (),
231- autoFollowPattern .getMaxWriteBufferSize (), autoFollowPattern .getMaxRetryDelay (),
232- autoFollowPattern .getIdleShardRetryDelay ());
233-
234- // Execute if the create and follow api call succeeds:
235- Runnable successHandler = () -> {
236- LOGGER .info ("Auto followed leader index [{}] as follow index [{}]" , leaderIndexName , followIndexName );
237-
238- // This function updates the auto follow metadata in the cluster to record that the leader index has been followed:
239- // (so that we do not try to follow it in subsequent auto follow runs)
240- Function <ClusterState , ClusterState > function = recordLeaderIndexAsFollowFunction (clusterAlias , indexToFollow );
241- // The coordinator always runs on the elected master node, so we can update cluster state here:
242- updateAutoFollowMetadata (function , updateError -> {
243- if (updateError != null ) {
244- LOGGER .error ("Failed to mark leader index [" + leaderIndexName + "] as auto followed" , updateError );
245- if (leaderIndicesErrorHolder .compareAndSet (null , updateError ) == false ) {
246- leaderIndicesErrorHolder .get ().addSuppressed (updateError );
247- }
248- } else {
249- LOGGER .debug ("Successfully marked leader index [{}] as auto followed" , leaderIndexName );
250- }
251- if (leaderIndicesCountDown .countDown ()) {
252- finalise (leaderIndicesErrorHolder .get ());
253- }
254- });
255- };
256- // Execute if the create and follow apu call fails:
257- Consumer <Exception > failureHandler = followError -> {
258- assert followError != null ;
259- LOGGER .warn ("Failed to auto follow leader index [" + leaderIndexName + "]" , followError );
260- if (leaderIndicesCountDown .countDown ()) {
261- finalise (followError );
262- }
263- };
264- createAndFollow (autoFollowPattern .getHeaders (), followRequest , successHandler , failureHandler );
265- }
276+ private void checkAutoFollowPattern (String clusterAlias , AutoFollowPattern autoFollowPattern ,
277+ List <Index > leaderIndicesToFollow , Consumer <AutoFollowResult > resultHandler ) {
278+
279+ final CountDown leaderIndicesCountDown = new CountDown (leaderIndicesToFollow .size ());
280+ final AtomicArray <Tuple <Index , Exception >> results = new AtomicArray <>(leaderIndicesToFollow .size ());
281+ for (int i = 0 ; i < leaderIndicesToFollow .size (); i ++) {
282+ final Index indexToFollow = leaderIndicesToFollow .get (i );
283+ final int slot = i ;
284+ followLeaderIndex (clusterAlias , indexToFollow , autoFollowPattern , error -> {
285+ results .set (slot , new Tuple <>(indexToFollow , error ));
286+ if (leaderIndicesCountDown .countDown ()) {
287+ resultHandler .accept (new AutoFollowResult (clusterAlias , results .asList ()));
288+ }
289+ });
266290 }
267291 }
268292
269- private void finalise (Exception failure ) {
270- if (autoFollowPatternsErrorHolder .compareAndSet (null , failure ) == false ) {
271- autoFollowPatternsErrorHolder .get ().addSuppressed (failure );
272- }
293+ private void followLeaderIndex (String clusterAlias , Index indexToFollow ,
294+ AutoFollowPattern pattern , Consumer <Exception > onResult ) {
295+ final String leaderIndexName = indexToFollow .getName ();
296+ final String followIndexName = getFollowerIndexName (pattern , leaderIndexName );
297+
298+ String leaderIndexNameWithClusterAliasPrefix = clusterAlias .equals ("_local_" ) ? leaderIndexName :
299+ clusterAlias + ":" + leaderIndexName ;
300+ FollowIndexAction .Request request =
301+ new FollowIndexAction .Request (leaderIndexNameWithClusterAliasPrefix , followIndexName ,
302+ pattern .getMaxBatchOperationCount (), pattern .getMaxConcurrentReadBatches (),
303+ pattern .getMaxOperationSizeInBytes (), pattern .getMaxConcurrentWriteBatches (),
304+ pattern .getMaxWriteBufferSize (), pattern .getMaxRetryDelay (),
305+ pattern .getIdleShardRetryDelay ());
306+
307+ // Execute if the create and follow api call succeeds:
308+ Runnable successHandler = () -> {
309+ LOGGER .info ("Auto followed leader index [{}] as follow index [{}]" , leaderIndexName , followIndexName );
310+
311+ // This function updates the auto follow metadata in the cluster to record that the leader index has been followed:
312+ // (so that we do not try to follow it in subsequent auto follow runs)
313+ Function <ClusterState , ClusterState > function = recordLeaderIndexAsFollowFunction (clusterAlias , indexToFollow );
314+ // The coordinator always runs on the elected master node, so we can update cluster state here:
315+ updateAutoFollowMetadata (function , onResult );
316+ };
317+ createAndFollow (pattern .getHeaders (), request , successHandler , onResult );
318+ }
273319
320+ private void finalise (int slot , AutoFollowResult result ) {
321+ assert autoFollowResults .get (slot ) == null ;
322+ autoFollowResults .set (slot , result );
274323 if (autoFollowPatternsCountDown .countDown ()) {
275- handler .accept (autoFollowPatternsErrorHolder . get ());
324+ handler .accept (autoFollowResults . asList ());
276325 }
277326 }
278327
@@ -347,4 +396,33 @@ abstract void updateAutoFollowMetadata(
347396 );
348397
349398 }
399+
400+ static class AutoFollowResult {
401+
402+ final String clusterAlias ;
403+ final Exception clusterStateFetchException ;
404+ final Map <Index , Exception > autoFollowExecutionResults ;
405+
406+ AutoFollowResult (String clusterAlias , List <Tuple <Index , Exception >> results ) {
407+ this .clusterAlias = clusterAlias ;
408+
409+ Map <Index , Exception > autoFollowExecutionResults = new HashMap <>();
410+ for (Tuple <Index , Exception > result : results ) {
411+ autoFollowExecutionResults .put (result .v1 (), result .v2 ());
412+ }
413+
414+ this .clusterStateFetchException = null ;
415+ this .autoFollowExecutionResults = Collections .unmodifiableMap (autoFollowExecutionResults );
416+ }
417+
418+ AutoFollowResult (String clusterAlias , Exception e ) {
419+ this .clusterAlias = clusterAlias ;
420+ this .clusterStateFetchException = e ;
421+ this .autoFollowExecutionResults = Collections .emptyMap ();
422+ }
423+
424+ AutoFollowResult (String clusterAlias ) {
425+ this (clusterAlias , (Exception ) null );
426+ }
427+ }
350428}
0 commit comments