Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,13 @@ public void add(InlineChore chore) {
}

public void add(Procedure<TEnvironment> procedure) {
LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(),
procedure.getTimeoutTimestamp());
queue.add(new DelayedProcedure<>(procedure));
if (procedure.getTimeout() > 0) {
LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(),
procedure.getTimeoutTimestamp());
queue.add(new DelayedProcedure<>(procedure));
} else {
LOG.info("Got negative timeout {} for {}, skip adding", procedure.getTimeout(), procedure);
}
}

public boolean remove(Procedure<TEnvironment> procedure) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,25 @@

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.LongConsumer;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.IdLock;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -65,18 +71,34 @@ public class MigrateReplicationQueueFromZkToTableProcedure

private List<String> disabledPeerIds;

private List<Future<?>> futures;
private CompletableFuture<?> future;

private ExecutorService executor;

private RetryCounter retryCounter;

@Override
public String getGlobalId() {
return getClass().getSimpleName();
}

private ProcedureSuspendedException suspend(Configuration conf, LongConsumer backoffConsumer)
throws ProcedureSuspendedException {
if (retryCounter == null) {
retryCounter = ProcedureUtil.createRetryCounter(conf);
}
long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
backoffConsumer.accept(backoff);
throw suspend(Math.toIntExact(backoff), true);
}

private void resetRetry() {
retryCounter = null;
}

private ExecutorService getExecutorService() {
if (executor == null) {
executor = Executors.newFixedThreadPool(3, new ThreadFactoryBuilder()
executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
.setNameFormat(getClass().getSimpleName() + "-%d").setDaemon(true).build());
}
return executor;
Expand All @@ -95,14 +117,17 @@ private void waitUntilNoPeerProcedure(MasterProcedureEnv env) throws ProcedureSu
peerProcCount = env.getMasterServices().getProcedures().stream()
.filter(p -> p instanceof PeerProcedureInterface).filter(p -> !p.isFinished()).count();
} catch (IOException e) {
LOG.warn("failed to check peer procedure status", e);
throw suspend(5000, true);
throw suspend(env.getMasterConfiguration(),
backoff -> LOG.warn("failed to check peer procedure status, sleep {} secs and retry later",
backoff / 1000, e));
}
if (peerProcCount > 0) {
LOG.info("There are still {} pending peer procedures, will sleep and check later",
peerProcCount);
throw suspend(10_000, true);
throw suspend(env.getMasterConfiguration(),
backoff -> LOG.info(
"There are still {} pending peer procedures, sleep {} secs and retry later",
peerProcCount, backoff / 1000));
}
resetRetry();
LOG.info("No pending peer procedures found, continue...");
}

Expand All @@ -122,8 +147,10 @@ protected Flow executeFromState(MasterProcedureEnv env,
try {
oldStorage.deleteAllData();
} catch (KeeperException e) {
LOG.warn("failed to delete old replication queue data, sleep and retry later", e);
suspend(10_000, true);
throw suspend(env.getMasterConfiguration(),
backoff -> LOG.warn(
"failed to delete old replication queue data, sleep {} secs and retry later",
backoff / 1000, e));
}
return Flow.NO_MORE_STATE;
}
Expand All @@ -132,6 +159,7 @@ protected Flow executeFromState(MasterProcedureEnv env,
disabledPeerIds = peers.stream().filter(ReplicationPeerDescription::isEnabled)
.map(ReplicationPeerDescription::getPeerId).collect(Collectors.toList());
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER);
resetRetry();
return Flow.HAS_MORE_STATE;
case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER:
for (String peerId : disabledPeerIds) {
Expand All @@ -140,39 +168,52 @@ protected Flow executeFromState(MasterProcedureEnv env,
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE);
return Flow.HAS_MORE_STATE;
case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE:
if (futures != null) {
// wait until all futures done
long notDone = futures.stream().filter(f -> !f.isDone()).count();
if (notDone == 0) {
boolean succ = true;
for (Future<?> future : futures) {
try {
future.get();
} catch (Exception e) {
succ = false;
LOG.warn("Failed to migrate", e);
}
}
if (succ) {
shutdownExecutorService();
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING);
return Flow.HAS_MORE_STATE;
}
// reschedule to retry migration again
futures = null;
} else {
LOG.info("There still {} pending migration tasks, will sleep and check later", notDone);
throw suspend(10_000, true);
if (future != null) {
// should have finished when we arrive here
assert future.isDone();
try {
future.get();
} catch (Exception e) {
future = null;
throw suspend(env.getMasterConfiguration(),
backoff -> LOG.warn("failed to migrate queue data, sleep {} secs and retry later",
backoff / 1000, e));
}
shutdownExecutorService();
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING);
resetRetry();
return Flow.HAS_MORE_STATE;
}
try {
futures = env.getReplicationPeerManager()
.migrateQueuesFromZk(env.getMasterServices().getZooKeeper(), getExecutorService());
} catch (IOException e) {
LOG.warn("failed to submit migration tasks", e);
throw suspend(10_000, true);
}
throw suspend(10_000, true);
future = env.getReplicationPeerManager()
.migrateQueuesFromZk(env.getMasterServices().getZooKeeper(), getExecutorService());
FutureUtils.addListener(future, (r, e) -> {
// should acquire procedure execution lock to make sure that the procedure executor has
// finished putting this procedure to the WAITING_TIMEOUT state, otherwise there could be
// race and cause unexpected result
IdLock procLock =
env.getMasterServices().getMasterProcedureExecutor().getProcExecutionLock();
IdLock.Entry lockEntry;
try {
lockEntry = procLock.getLockEntry(getProcId());
} catch (IOException ioe) {
LOG.error("Error while acquiring execution lock for procedure {}"
+ " when trying to wake it up, aborting...", ioe);
env.getMasterServices().abort("Can not acquire procedure execution lock", e);
return;
}
try {
setTimeoutFailure(env);
} finally {
procLock.releaseLockEntry(lockEntry);
}
});
// here we set timeout to -1 so the ProcedureExecutor will not schedule a Timer for us
setTimeout(-1);
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
// skip persistence is a must now since when restarting, if the procedure is in
// WAITING_TIMEOUT state and has -1 as timeout, it will block there forever...
skipPersistence();
throw new ProcedureSuspendedException();
case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING:
long rsWithLowerVersion =
env.getMasterServices().getServerManager().getOnlineServers().values().stream()
Expand All @@ -181,9 +222,11 @@ protected Flow executeFromState(MasterProcedureEnv env,
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER);
return Flow.HAS_MORE_STATE;
} else {
LOG.info("There are still {} region servers which have a major version less than {}, "
+ "will sleep and check later", rsWithLowerVersion, MIN_MAJOR_VERSION);
throw suspend(10_000, true);
throw suspend(env.getMasterConfiguration(),
backoff -> LOG.warn(
"There are still {} region servers which have a major version"
+ " less than {}, sleep {} secs and check later",
rsWithLowerVersion, MIN_MAJOR_VERSION, backoff / 1000));
}
case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER:
for (String peerId : disabledPeerIds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,17 @@
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -70,6 +69,7 @@
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId;
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkReplicationQueueData;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
Expand Down Expand Up @@ -778,25 +778,38 @@ private void migrateHFileRefs(ZKReplicationQueueStorageForMigration oldQueueStor
}
}

private interface ExceptionalRunnable {
void run() throws Exception;
}

private CompletableFuture<?> runAsync(ExceptionalRunnable task, ExecutorService executor) {
CompletableFuture<?> future = new CompletableFuture<>();
executor.execute(() -> {
try {
task.run();
future.complete(null);
} catch (Exception e) {
future.completeExceptionally(e);
}
});
return future;
}

/**
* Submit the migration tasks to the given {@code executor} and return the futures.
* Submit the migration tasks to the given {@code executor}.
*/
List<Future<?>> migrateQueuesFromZk(ZKWatcher zookeeper, ExecutorService executor)
throws IOException {
CompletableFuture<?> migrateQueuesFromZk(ZKWatcher zookeeper, ExecutorService executor) {
// the replication queue table creation is asynchronous and will be triggered by addPeer, so
// here we need to manually initialize it since we will not call addPeer.
initializeQueueStorage();
try {
initializeQueueStorage();
} catch (IOException e) {
return FutureUtils.failedFuture(e);
}
ZKReplicationQueueStorageForMigration oldStorage =
new ZKReplicationQueueStorageForMigration(zookeeper, conf);
return Arrays.asList(executor.submit(() -> {
migrateQueues(oldStorage);
return null;
}), executor.submit(() -> {
migrateLastPushedSeqIds(oldStorage);
return null;
}), executor.submit(() -> {
migrateHFileRefs(oldStorage);
return null;
}));
return CompletableFuture.allOf(runAsync(() -> migrateQueues(oldStorage), executor),
runAsync(() -> migrateLastPushedSeqIds(oldStorage), executor),
runAsync(() -> migrateHFileRefs(oldStorage), executor));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
Expand Down Expand Up @@ -146,9 +145,7 @@ private Map<String, Set<String>> prepareData() throws Exception {
@Test
public void testNoPeers() throws Exception {
prepareData();
for (Future<?> future : manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR)) {
future.get(1, TimeUnit.MINUTES);
}
manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR).get(1, TimeUnit.MINUTES);
// should have called initializer
verify(queueStorageInitializer).initialize();
// should have not migrated any data since there is no peer
Expand All @@ -165,9 +162,7 @@ public void testMigrate() throws Exception {
// value is not used in this test, so just add a mock
peers.put("peer_" + i, mock(ReplicationPeerDescription.class));
}
for (Future<?> future : manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR)) {
future.get(1, TimeUnit.MINUTES);
}
manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR).get(1, TimeUnit.MINUTES);
// should have called initializer
verify(queueStorageInitializer).initialize();
List<ReplicationQueueData> queueDatas = queueStorage.listAllQueues();
Expand Down