1717 */
1818package org .apache .hadoop .hbase .master .replication ;
1919
20+ import static org .apache .hadoop .hbase .shaded .protobuf .generated .MasterProcedureProtos .MigrateReplicationQueueFromZkToTableState .MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER ;
2021import static org .apache .hadoop .hbase .shaded .protobuf .generated .MasterProcedureProtos .MigrateReplicationQueueFromZkToTableState .MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER ;
22+ import static org .apache .hadoop .hbase .shaded .protobuf .generated .MasterProcedureProtos .MigrateReplicationQueueFromZkToTableState .MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER ;
2123import static org .apache .hadoop .hbase .shaded .protobuf .generated .MasterProcedureProtos .MigrateReplicationQueueFromZkToTableState .MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER ;
2224import static org .apache .hadoop .hbase .shaded .protobuf .generated .MasterProcedureProtos .MigrateReplicationQueueFromZkToTableState .MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE ;
2325import static org .apache .hadoop .hbase .shaded .protobuf .generated .MasterProcedureProtos .MigrateReplicationQueueFromZkToTableState .MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE ;
@@ -111,6 +113,26 @@ private void shutdownExecutorService() {
111113 }
112114 }
113115
116+ private void disableReplicationLogCleaner (MasterProcedureEnv env )
117+ throws ProcedureSuspendedException {
118+ if (!env .getReplicationPeerManager ().getReplicationLogCleanerBarrier ().disable ()) {
119+ // it is not likely that we can reach here as we will schedule this procedure immediately
120+ // after master restarting, where ReplicationLogCleaner should have not started its first run
121+ // yet. But anyway, let's make the code more robust. And it is safe to wait a bit here since
122+ // there will be no data in the new replication queue storage before we execute this procedure
123+ // so ReplicationLogCleaner will quit immediately without doing anything.
124+ throw suspend (env .getMasterConfiguration (),
125+ backoff -> LOG .info (
126+ "Can not disable replication log cleaner, sleep {} secs and retry later" ,
127+ backoff / 1000 ));
128+ }
129+ resetRetry ();
130+ }
131+
132+ private void enableReplicationLogCleaner (MasterProcedureEnv env ) {
133+ env .getReplicationPeerManager ().getReplicationLogCleanerBarrier ().enable ();
134+ }
135+
114136 private void waitUntilNoPeerProcedure (MasterProcedureEnv env ) throws ProcedureSuspendedException {
115137 long peerProcCount ;
116138 try {
@@ -136,6 +158,10 @@ protected Flow executeFromState(MasterProcedureEnv env,
136158 MigrateReplicationQueueFromZkToTableState state )
137159 throws ProcedureSuspendedException , ProcedureYieldException , InterruptedException {
138160 switch (state ) {
161+ case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER :
162+ disableReplicationLogCleaner (env );
163+ setNextState (MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE );
164+ return Flow .HAS_MORE_STATE ;
139165 case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE :
140166 waitUntilNoPeerProcedure (env );
141167 List <ReplicationPeerDescription > peers = env .getReplicationPeerManager ().listPeers (null );
@@ -152,7 +178,8 @@ protected Flow executeFromState(MasterProcedureEnv env,
152178 "failed to delete old replication queue data, sleep {} secs and retry later" ,
153179 backoff / 1000 , e ));
154180 }
155- return Flow .NO_MORE_STATE ;
181+ setNextState (MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER );
182+ return Flow .HAS_MORE_STATE ;
156183 }
157184 // here we do not care the peers which have already been disabled, as later we do not need
158185 // to enable them
@@ -232,6 +259,10 @@ protected Flow executeFromState(MasterProcedureEnv env,
232259 for (String peerId : disabledPeerIds ) {
233260 addChildProcedure (new EnablePeerProcedure (peerId ));
234261 }
262+ setNextState (MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER );
263+ return Flow .HAS_MORE_STATE ;
264+ case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER :
265+ enableReplicationLogCleaner (env );
235266 return Flow .NO_MORE_STATE ;
236267 default :
237268 throw new UnsupportedOperationException ("unhandled state=" + state );
@@ -263,7 +294,19 @@ protected int getStateId(MigrateReplicationQueueFromZkToTableState state) {
263294
264295 @ Override
265296 protected MigrateReplicationQueueFromZkToTableState getInitialState () {
266- return MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE ;
297+ return MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER ;
298+ }
299+
300+ @ Override
301+ protected void afterReplay (MasterProcedureEnv env ) {
302+ if (getCurrentState () == getInitialState ()) {
303+ // do not need to disable log cleaner or acquire lock if we are in the initial state, later
304+ // when executing the procedure we will try to disable and acquire.
305+ return ;
306+ }
307+ if (!env .getReplicationPeerManager ().getReplicationLogCleanerBarrier ().disable ()) {
308+ throw new IllegalStateException ("can not disable log cleaner, this should not happen" );
309+ }
267310 }
268311
269312 @ Override
0 commit comments