diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java index f21ced9bf2ff..2293fd4f8149 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java @@ -25,7 +25,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.hbase.TableName; @@ -36,7 +35,6 @@ import org.apache.hadoop.hbase.backup.BackupRequest; import org.apache.hadoop.hbase.backup.BackupRestoreFactory; import org.apache.hadoop.hbase.backup.BackupType; -import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; @@ -158,10 +156,7 @@ public void execute() throws IOException { // snapshots for the same reason as the log rolls. List bulkLoadsToDelete = backupManager.readBulkloadRows(tableList); - Map props = new HashMap<>(); - props.put("backupRoot", backupInfo.getBackupRootDir()); - admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE, - LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props); + BackupUtils.logRoll(conn, backupInfo.getBackupRootDir(), conf); newTimestamps = backupManager.readRegionServerLastLogRollResult(); diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java index c92c0747e83c..20884edf836e 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java @@ -19,7 +19,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; @@ -29,9 +28,7 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; import org.apache.hadoop.hbase.backup.util.BackupUtils; -import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.util.CommonFSUtils; @@ -84,13 +81,8 @@ public Map getIncrBackupLogFileMap() throws IOException { } LOG.info("Execute roll log procedure for incremental backup ..."); - HashMap props = new HashMap<>(); - props.put("backupRoot", backupInfo.getBackupRootDir()); + BackupUtils.logRoll(conn, backupInfo.getBackupRootDir(), conf); - try (Admin admin = conn.getAdmin()) { - admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE, - LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props); - } newTimestamps = readRegionServerLastLogRollResult(); logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode); diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java index 15159ed73e46..183cc2054f1a 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java @@ -49,6 +49,8 @@ import org.apache.hadoop.hbase.backup.RestoreRequest; import org.apache.hadoop.hbase.backup.impl.BackupManifest; import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; +import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.RegionInfo; @@ -65,6 +67,7 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.base.Splitter; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; @@ -770,4 +773,52 @@ public static String findMostRecentBackupId(String[] backupIds) { return BackupRestoreConstants.BACKUPID_PREFIX + recentTimestamp; } + /** + * roll WAL writer for all region servers and record the newest log roll result + */ + public static void logRoll(Connection conn, String backupRootDir, Configuration conf) + throws IOException { + boolean legacy = conf.getBoolean("hbase.backup.logroll.legacy.used", false); + if (legacy) { + logRollV1(conn, backupRootDir); + } else { + logRollV2(conn, backupRootDir); + } + } + + private static void logRollV1(Connection conn, String backupRootDir) throws IOException { + try (Admin admin = conn.getAdmin()) { + admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE, + LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, + ImmutableMap.of("backupRoot", backupRootDir)); + } + } + + private static void logRollV2(Connection conn, String backupRootDir) throws IOException { + BackupSystemTable backupSystemTable = new BackupSystemTable(conn); + HashMap lastLogRollResult = + backupSystemTable.readRegionServerLastLogRollResult(backupRootDir); + try (Admin admin = conn.getAdmin()) { + Map newLogRollResult = admin.rollAllWALWriters(); + + for (Map.Entry entry : newLogRollResult.entrySet()) { + ServerName serverName = entry.getKey(); + long newHighestWALFilenum = entry.getValue(); + + String address = serverName.getAddress().toString(); + Long lastHighestWALFilenum = lastLogRollResult.get(address); + if (lastHighestWALFilenum != null && lastHighestWALFilenum > newHighestWALFilenum) { + LOG.warn("Won't update last roll log result for server {}: current = {}, new = {}", + serverName, lastHighestWALFilenum, newHighestWALFilenum); + } else { + backupSystemTable.writeRegionServerLastLogRollResult(address, newHighestWALFilenum, + backupRootDir); + if (LOG.isDebugEnabled()) { + LOG.debug("updated last roll log result for {} from {} to {}", serverName, + lastHighestWALFilenum, newHighestWALFilenum); + } + } + } + } + } } diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java index b5f58508441b..a14fce59faf2 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -45,7 +44,6 @@ import org.apache.hadoop.hbase.backup.impl.FullTableBackupClient; import org.apache.hadoop.hbase.backup.impl.IncrementalBackupManager; import org.apache.hadoop.hbase.backup.impl.IncrementalTableBackupClient; -import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; @@ -239,10 +237,7 @@ public void execute() throws IOException { // the snapshot. LOG.info("Execute roll log procedure for full backup ..."); - Map props = new HashMap<>(); - props.put("backupRoot", backupInfo.getBackupRootDir()); - admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE, - LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props); + BackupUtils.logRoll(conn, backupInfo.getBackupRootDir(), conf); failStageIf(Stage.stage_2); newTimestamps = backupManager.readRegionServerLastLogRollResult(); diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMerge.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMerge.java index 38204f68e31a..b91976325447 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMerge.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMerge.java @@ -23,6 +23,7 @@ import java.io.File; import java.util.List; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; import org.apache.hadoop.hbase.backup.util.BackupUtils; @@ -70,17 +71,17 @@ public void TestIncBackupMergeRestore() throws Exception { // #2 - insert some data to table1 Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS); - LOG.debug("writing " + ADD_ROWS + " rows to " + table1); + LOG.debug("writing {} rows to {}", ADD_ROWS, table1); - Assert.assertEquals(TEST_UTIL.countRows(t1), NB_ROWS_IN_BATCH + ADD_ROWS); + Assert.assertEquals(HBaseTestingUtil.countRows(t1), NB_ROWS_IN_BATCH + ADD_ROWS); t1.close(); - LOG.debug("written " + ADD_ROWS + " rows to " + table1); + LOG.debug("written {} rows to {}", ADD_ROWS, table1); Table t2 = insertIntoTable(conn, table2, famName, 1, ADD_ROWS); - Assert.assertEquals(TEST_UTIL.countRows(t2), NB_ROWS_IN_BATCH + ADD_ROWS); + Assert.assertEquals(HBaseTestingUtil.countRows(t2), NB_ROWS_IN_BATCH + ADD_ROWS); t2.close(); - LOG.debug("written " + ADD_ROWS + " rows to " + table2); + LOG.debug("written {} rows to {}", ADD_ROWS, table2); // #3 - incremental backup for multiple tables tables = Lists.newArrayList(table1, table2); @@ -112,15 +113,15 @@ public void TestIncBackupMergeRestore() throws Exception { tablesRestoreIncMultiple, tablesMapIncMultiple, true)); Table hTable = conn.getTable(table1_restore); - LOG.debug("After incremental restore: " + hTable.getDescriptor()); - int countRows = TEST_UTIL.countRows(hTable, famName); - LOG.debug("f1 has " + countRows + " rows"); + LOG.debug("After incremental restore: {}", hTable.getDescriptor()); + int countRows = HBaseTestingUtil.countRows(hTable, famName); + LOG.debug("f1 has {} rows", countRows); Assert.assertEquals(NB_ROWS_IN_BATCH + 2 * ADD_ROWS, countRows); hTable.close(); hTable = conn.getTable(table2_restore); - Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH + 2 * ADD_ROWS); + Assert.assertEquals(HBaseTestingUtil.countRows(hTable), NB_ROWS_IN_BATCH + 2 * ADD_ROWS); hTable.close(); admin.close(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index 75dd2ef07b38..43a004a471cc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -1404,6 +1404,16 @@ Future abortProcedureAsync(long procId, boolean mayInterruptIfRunning) */ void rollWALWriter(ServerName serverName) throws IOException, FailedLogCloseException; + /** + * Roll log writer for all RegionServers. Note that unlike + * {@link Admin#rollWALWriter(ServerName)}, this method is synchronous, which means it will block + * until all RegionServers have completed the log roll, or a RegionServer fails due to an + * exception that retry will not work. + * @return server and the highest wal filenum of server before performing log roll + * @throws IOException if a remote or network exception occurs + */ + Map rollAllWALWriters() throws IOException; + /** * Helper that delegates to getClusterMetrics().getMasterCoprocessorNames(). * @return an array of master coprocessors diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java index c13dfc33e3d2..c866f434e63a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java @@ -635,6 +635,11 @@ public void rollWALWriter(ServerName serverName) throws IOException, FailedLogCl get(admin.rollWALWriter(serverName)); } + @Override + public Map rollAllWALWriters() throws IOException { + return get(admin.rollAllWALWriters()); + } + @Override public CompactionState getCompactionState(TableName tableName) throws IOException { return get(admin.getCompactionState(tableName)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 331aa4a254af..d808aecc815c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -1270,6 +1270,15 @@ default CompletableFuture getMasterInfoPort() { */ CompletableFuture rollWALWriter(ServerName serverName); + /** + * Roll log writer for all RegionServers. Note that unlike + * {@link Admin#rollWALWriter(ServerName)}, this method is synchronous, which means it will block + * until all RegionServers have completed the log roll, or a RegionServer fails due to an + * exception that retry will not work. + * @return server and the highest wal filenum of server before performing log roll + */ + CompletableFuture> rollAllWALWriters(); + /** * Clear compacting queues on a region server. * @param serverName The servername of the region server. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 69f353600036..33ac47c73d69 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -691,6 +691,11 @@ public CompletableFuture rollWALWriter(ServerName serverName) { return wrap(rawAdmin.rollWALWriter(serverName)); } + @Override + public CompletableFuture> rollAllWALWriters() { + return wrap(rawAdmin.rollAllWALWriters()); + } + @Override public CompletableFuture clearCompactionQueues(ServerName serverName, Set queues) { return wrap(rawAdmin.clearCompactionQueues(serverName, queues)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 7cb0e4689510..2373e936726e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -105,6 +105,7 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; +import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; @@ -149,6 +150,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.LastHighestWalFilenum; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; @@ -263,6 +265,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RollAllWALWritersRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RollAllWALWritersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest; @@ -497,28 +501,70 @@ public void run(PRESP resp) { return future; } + /** + * short-circuit call for + * {@link RawAsyncHBaseAdmin#procedureCall(Object, MasterRpcCall, Converter, Converter, ProcedureBiConsumer)} + * by ignoring procedure result + */ private CompletableFuture procedureCall(PREQ preq, MasterRpcCall rpcCall, Converter respConverter, - ProcedureBiConsumer consumer) { + ProcedureBiConsumer consumer) { + return procedureCall(preq, rpcCall, respConverter, result -> null, consumer); + } + + /** + * short-circuit call for procedureCall(Consumer, Object, MasterRpcCall, Converter, Converter, + * ProcedureBiConsumer) by skip setting priority for request + */ + private CompletableFuture procedureCall(PREQ preq, + MasterRpcCall rpcCall, Converter respConverter, + Converter resultConverter, ProcedureBiConsumer consumer) { return procedureCall(b -> { - }, preq, rpcCall, respConverter, consumer); + }, preq, rpcCall, respConverter, resultConverter, consumer); } + /** + * short-circuit call for procedureCall(TableName, Object, MasterRpcCall, Converter, Converter, + * ProcedureBiConsumer) by ignoring procedure result + */ private CompletableFuture procedureCall(TableName tableName, PREQ preq, MasterRpcCall rpcCall, Converter respConverter, - ProcedureBiConsumer consumer) { - return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer); + ProcedureBiConsumer consumer) { + return procedureCall(tableName, preq, rpcCall, respConverter, result -> null, consumer); + } + + /** + * short-circuit call for procedureCall(Consumer, Object, MasterRpcCall, Converter, Converter, + * ProcedureBiConsumer) by skip setting priority for request + */ + private CompletableFuture procedureCall(TableName tableName, PREQ preq, + MasterRpcCall rpcCall, Converter respConverter, + Converter resultConverter, ProcedureBiConsumer consumer) { + return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, resultConverter, + consumer); } - private CompletableFuture procedureCall( + /** + * @param type of request + * @param type of response + * @param type of procedure call result + * @param prioritySetter prioritySetter set priority by table for request + * @param preq procedure call request + * @param rpcCall procedure rpc call + * @param respConverter extract proc id from procedure call response + * @param resultConverter extract result from procedure call result + * @param consumer action performs on result + * @return procedure call result, null if procedure is void + */ + private CompletableFuture procedureCall( Consumer> prioritySetter, PREQ preq, MasterRpcCall rpcCall, Converter respConverter, - ProcedureBiConsumer consumer) { - MasterRequestCallerBuilder builder = this. newMasterCaller().action((controller, - stub) -> this. call(controller, stub, preq, rpcCall, respConverter)); + Converter resultConverter, ProcedureBiConsumer consumer) { + MasterRequestCallerBuilder builder = this. newMasterCaller() + .action((controller, stub) -> this.call(controller, stub, preq, rpcCall, respConverter)); prioritySetter.accept(builder); CompletableFuture procFuture = builder.call(); - CompletableFuture future = waitProcedureResult(procFuture); + CompletableFuture future = waitProcedureResult(procFuture, resultConverter); addListener(future, consumer); return future; } @@ -1935,7 +1981,7 @@ public CompletableFuture appendReplicationPeerTableCFs(String id, return failedFuture(new ReplicationException("tableCfs is null")); } - CompletableFuture future = new CompletableFuture(); + CompletableFuture future = new CompletableFuture<>(); addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { if (!completeExceptionally(future, error)) { ReplicationPeerConfig newPeerConfig = @@ -1957,7 +2003,7 @@ public CompletableFuture removeReplicationPeerTableCFs(String id, return failedFuture(new ReplicationException("tableCfs is null")); } - CompletableFuture future = new CompletableFuture(); + CompletableFuture future = new CompletableFuture<>(); addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { if (!completeExceptionally(future, error)) { ReplicationPeerConfig newPeerConfig = null; @@ -2056,7 +2102,7 @@ public CompletableFuture snapshot(SnapshotDescription snapshotDesc) { private void waitSnapshotFinish(SnapshotDescription snapshot, CompletableFuture future, SnapshotResponse resp) { if (resp.hasProcId()) { - getProcedureResult(resp.getProcId(), future, 0); + getProcedureResult(resp.getProcId(), src -> null, future, 0); addListener(future, new SnapshotProcedureBiConsumer(snapshot.getTableName())); } else { long expectedTimeout = resp.getExpectedTimeout(); @@ -2272,7 +2318,7 @@ private CompletableFuture internalRestoreSnapshot(String snapshotName, Tab .action((controller, stub) -> this. call(controller, stub, builder.build(), (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId())) - .call()); + .call(), result -> null); } @Override @@ -2684,14 +2730,14 @@ private void verifySplitKeys(byte[][] splitKeys) { } } - private static abstract class ProcedureBiConsumer implements BiConsumer { + private static abstract class ProcedureBiConsumer implements BiConsumer { abstract void onFinished(); abstract void onError(Throwable error); @Override - public void accept(Void v, Throwable error) { + public void accept(T value, Throwable error) { if (error != null) { onError(error); return; @@ -2700,7 +2746,7 @@ public void accept(Void v, Throwable error) { } } - private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer { + private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer { protected final TableName tableName; TableProcedureBiConsumer(TableName tableName) { @@ -2725,7 +2771,7 @@ void onError(Throwable error) { } } - private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer { + private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer { protected final String namespaceName; NamespaceProcedureBiConsumer(String namespaceName) { @@ -2740,12 +2786,12 @@ String getDescription() { @Override void onFinished() { - LOG.info(getDescription() + " completed"); + LOG.info("{} completed", getDescription()); } @Override void onError(Throwable error) { - LOG.info(getDescription() + " failed with " + error.getMessage()); + LOG.info("{} failed with {}", getDescription(), error.getMessage()); } } @@ -2984,7 +3030,7 @@ String getOperationType() { } } - private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer { + private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer { private final String peerId; private final Supplier getOperation; @@ -2999,28 +3045,44 @@ String getDescription() { @Override void onFinished() { - LOG.info(getDescription() + " completed"); + LOG.info("{} completed", getDescription()); } @Override void onError(Throwable error) { - LOG.info(getDescription() + " failed with " + error.getMessage()); + LOG.info("{} failed with {}", getDescription(), error.getMessage()); } } - private CompletableFuture waitProcedureResult(CompletableFuture procFuture) { - CompletableFuture future = new CompletableFuture<>(); + private static final class RollAllWALWritersBiConsumer + extends ProcedureBiConsumer> { + + @Override + void onFinished() { + LOG.info("Rolling all WAL writers completed"); + } + + @Override + void onError(Throwable error) { + LOG.warn("Rolling all WAL writers failed with {}", error.getMessage()); + } + } + + private CompletableFuture waitProcedureResult(CompletableFuture procFuture, + Converter converter) { + CompletableFuture future = new CompletableFuture<>(); addListener(procFuture, (procId, error) -> { if (error != null) { future.completeExceptionally(error); return; } - getProcedureResult(procId, future, 0); + getProcedureResult(procId, converter, future, 0); }); return future; } - private void getProcedureResult(long procId, CompletableFuture future, int retries) { + private void getProcedureResult(long procId, Converter converter, + CompletableFuture future, int retries) { addListener( this. newMasterCaller() .action((controller, stub) -> this. call(controller, stub, if (error != null) { LOG.warn("failed to get the procedure result procId={}", procId, ConnectionUtils.translateException(error)); - retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), + retryTimer.newTimeout(t -> getProcedureResult(procId, converter, future, retries + 1), ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); return; } if (response.getState() == GetProcedureResultResponse.State.RUNNING) { - retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), + retryTimer.newTimeout(t -> getProcedureResult(procId, converter, future, retries + 1), ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); return; } @@ -3045,7 +3107,11 @@ GetProcedureResultResponse> call(controller, stub, IOException ioe = ForeignExceptionUtil.toIOException(response.getException()); future.completeExceptionally(ioe); } else { - future.complete(null); + try { + future.complete(converter.convert(response.getResult())); + } catch (IOException e) { + future.completeExceptionally(e); + } } }); } @@ -3188,6 +3254,20 @@ Void> adminCall(controller, stub, RequestConverter.buildRollWALWriterRequest(), .serverName(serverName).call(); } + @Override + public CompletableFuture> rollAllWALWriters() { + return this + .> procedureCall( + RequestConverter.buildRollAllWALWritersRequest(ng.getNonceGroup(), ng.newNonce()), + (s, c, req, done) -> s.rollAllWALWriters(c, req, done), resp -> resp.getProcId(), + result -> LastHighestWalFilenum.parseFrom(result.toByteArray()).getFileNumMap() + .entrySet().stream().collect(Collectors + .toUnmodifiableMap(e -> ServerName.valueOf(e.getKey()), Map.Entry::getValue)), + new RollAllWALWritersBiConsumer()); + } + @Override public CompletableFuture clearCompactionQueues(ServerName serverName, Set queues) { return this. newAdminCaller() diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 3bbfac500ce5..37fdb1ba6fe7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -139,6 +139,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RegionSpecifierAndState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RollAllWALWritersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest; @@ -860,6 +861,11 @@ public static RollWALWriterRequest buildRollWALWriterRequest() { return RollWALWriterRequest.getDefaultInstance(); } + public static RollAllWALWritersRequest buildRollAllWALWritersRequest(long nonceGroup, + long nonce) { + return RollAllWALWritersRequest.newBuilder().setNonceGroup(nonceGroup).setNonce(nonce).build(); + } + /** * Create a new GetServerInfoRequest * @return a GetServerInfoRequest diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java index 4f8a7320fb40..37292d5feefc 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java @@ -65,7 +65,7 @@ public static void addListener(CompletableFuture future, try { // See this post on stack overflow(shorten since the url is too long), // https://s.apache.org/completionexception - // For a chain of CompleableFuture, only the first child CompletableFuture can get the + // For a chain of CompletableFuture, only the first child CompletableFuture can get the // original exception, others will get a CompletionException, which wraps the original // exception. So here we unwrap it before passing it to the callback action. action.accept(resp, unwrapCompletionException(error)); diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java index e6a9d8fb2bdf..6e68ce5f1900 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java @@ -262,7 +262,7 @@ public interface RemoteProcedure { * Called when RS tells the remote procedure is succeeded through the * {@code reportProcedureDone} method. */ - void remoteOperationCompleted(TEnv env); + void remoteOperationCompleted(TEnv env, byte[] remoteResultData); /** * Called when RS tells the remote procedure is failed through the {@code reportProcedureDone} diff --git a/hbase-protocol-shaded/src/main/protobuf/HBase.proto b/hbase-protocol-shaded/src/main/protobuf/HBase.proto index 0fd3d667d4d0..c66ee7eb9791 100644 --- a/hbase-protocol-shaded/src/main/protobuf/HBase.proto +++ b/hbase-protocol-shaded/src/main/protobuf/HBase.proto @@ -289,3 +289,7 @@ message RotateFileData { required int64 timestamp = 1; required bytes data = 2; } + +message LastHighestWalFilenum { + map file_num = 1; +} diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto index a8adaa27453f..768a1d7544ea 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto @@ -799,6 +799,15 @@ message ModifyColumnStoreFileTrackerResponse { message FlushMasterStoreRequest {} message FlushMasterStoreResponse {} +message RollAllWALWritersRequest { + optional uint64 nonce_group = 1 [default = 0]; + optional uint64 nonce = 2 [default = 0]; +} + +message RollAllWALWritersResponse { + optional uint64 proc_id = 1; +} + service MasterService { /** Used by the client to get the number of regions that have received the updated schema */ rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest) @@ -1270,6 +1279,9 @@ service MasterService { rpc FlushTable(FlushTableRequest) returns(FlushTableResponse); + + rpc rollAllWALWriters(RollAllWALWritersRequest) + returns(RollAllWALWritersResponse); } // HBCK Service definitions. diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto index e3b43afd66aa..554d7ec9c410 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto @@ -839,3 +839,21 @@ message ReloadQuotasProcedureStateData { required ServerName target_server = 1; optional ForeignExceptionMessage error = 2; } + +enum LogRollProcedureState { + LOG_ROLL_ROLL_LOG_ON_RS = 1; + LOG_ROLL_COLLECT_RS_HIGHEST_WAL_FILENUM = 2; + LOG_ROLL_UNREGISTER_SERVER_LISTENER = 3; +} + +message LogRollRemoteProcedureStateData { + required ServerName target_server = 1; +} + +message RSLogRollParameter { +} + +message LogRollRemoteProcedureResult { + optional ServerName server_name = 1; + optional uint64 last_highest_wal_filenum = 2; +} diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/RegionServerStatus.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/RegionServerStatus.proto index e68ba8e72869..3d2d8c6ff5fd 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/RegionServerStatus.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/RegionServerStatus.proto @@ -160,6 +160,7 @@ message RemoteProcedureResult { optional ForeignExceptionMessage error = 3; // Master active time as fencing token optional int64 initiating_master_active_time = 4; + optional bytes proc_result_data = 5; } message ReportProcedureDoneRequest { repeated RemoteProcedureResult result = 1; diff --git a/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto b/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto index 230795f27479..30eb328fd3cd 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto @@ -420,5 +420,4 @@ service AdminService { rpc GetCachedFilesList(GetCachedFilesListRequest) returns(GetCachedFilesListResponse); - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java index fce32333577d..fee132b7a4d3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java @@ -303,7 +303,13 @@ public enum EventType { * RS reload quotas.
* RS_RELOAD_QUOTAS */ - RS_RELOAD_QUOTAS(90, ExecutorType.RS_RELOAD_QUOTAS_OPERATIONS); + RS_RELOAD_QUOTAS(90, ExecutorType.RS_RELOAD_QUOTAS_OPERATIONS), + + /** + * RS log roll.
+ * RS_LOG_ROLL + */ + RS_LOG_ROLL(91, ExecutorType.RS_LOG_ROLL); private final int code; private final ExecutorType executor; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java index 1d689d276aa1..668cd701c0d9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java @@ -56,7 +56,8 @@ public enum ExecutorType { RS_CLAIM_REPLICATION_QUEUE(35), RS_SNAPSHOT_OPERATIONS(36), RS_FLUSH_OPERATIONS(37), - RS_RELOAD_QUOTAS_OPERATIONS(38); + RS_RELOAD_QUOTAS_OPERATIONS(38), + RS_LOG_ROLL(39); ExecutorType(int value) { } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 317f571ba8ac..cab0ef3dbdd2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -160,6 +160,7 @@ import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure; import org.apache.hadoop.hbase.master.procedure.FlushTableProcedure; import org.apache.hadoop.hbase.master.procedure.InitMetaProcedure; +import org.apache.hadoop.hbase.master.procedure.LogRollProcedure; import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; @@ -4194,11 +4195,11 @@ public SpaceQuotaSnapshotNotifier getSpaceQuotaSnapshotNotifier() { return (RemoteProcedure) procedure; } - public void remoteProcedureCompleted(long procId) { + public void remoteProcedureCompleted(long procId, byte[] remoteResultData) { LOG.debug("Remote procedure done, pid={}", procId); RemoteProcedure procedure = getRemoteProcedure(procId); if (procedure != null) { - procedure.remoteOperationCompleted(procedureExecutor.getEnvironment()); + procedure.remoteOperationCompleted(procedureExecutor.getEnvironment(), remoteResultData); } } @@ -4532,7 +4533,7 @@ public long flushTable(TableName tableName, List columnFamilies, long no @Override protected void run() throws IOException { getMaster().getMasterCoprocessorHost().preTableFlush(tableName); - LOG.info(getClientIdAuditPrefix() + " flush " + tableName); + LOG.info("{} flush {}", getClientIdAuditPrefix(), tableName); submitProcedure( new FlushTableProcedure(procedureExecutor.getEnvironment(), tableName, columnFamilies)); getMaster().getMasterCoprocessorHost().postTableFlush(tableName); @@ -4544,4 +4545,21 @@ protected String getDescription() { } }); } + + @Override + public long rollAllWALWriters(long nonceGroup, long nonce) throws IOException { + return MasterProcedureUtil + .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { + @Override + protected void run() { + LOG.info("{} roll all wal writers", getClientIdAuditPrefix()); + submitProcedure(new LogRollProcedure()); + } + + @Override + protected String getDescription() { + return "RollAllWALWriters"; + } + }); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index fc246d38d513..de911b54ee9a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -321,6 +321,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RegionSpecifierAndState; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RollAllWALWritersRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RollAllWALWritersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest; @@ -1372,7 +1374,7 @@ public IsSnapshotDoneResponse isSnapshotDone(RpcController controller, @Override public GetProcedureResultResponse getProcedureResult(RpcController controller, GetProcedureResultRequest request) throws ServiceException { - LOG.debug("Checking to see if procedure is done pid=" + request.getProcId()); + LOG.debug("Checking to see if procedure is done pid={}", request.getProcId()); try { server.checkInitialized(); GetProcedureResultResponse.Builder builder = GetProcedureResultResponse.newBuilder(); @@ -2575,7 +2577,9 @@ public ReportProcedureDoneResponse reportProcedureDone(RpcController controller, } request.getResultList().forEach(result -> { if (result.getStatus() == RemoteProcedureResult.Status.SUCCESS) { - server.remoteProcedureCompleted(result.getProcId()); + byte[] remoteResultData = + result.hasProcResultData() ? result.getProcResultData().toByteArray() : null; + server.remoteProcedureCompleted(result.getProcId(), remoteResultData); } else { server.remoteProcedureFailed(result.getProcId(), RemoteProcedureException.fromProto(result.getError())); @@ -3662,4 +3666,15 @@ public FlushTableResponse flushTable(RpcController controller, FlushTableRequest throw new ServiceException(ioe); } } + + @Override + public RollAllWALWritersResponse rollAllWALWriters(RpcController rpcController, + RollAllWALWritersRequest request) throws ServiceException { + try { + long procId = server.rollAllWALWriters(request.getNonceGroup(), request.getNonce()); + return RollAllWALWritersResponse.newBuilder().setProcId(procId).build(); + } catch (IOException ioe) { + throw new ServiceException(ioe); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index e9c98d624460..0573b1a75628 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -515,4 +515,10 @@ long flushTable(final TableName tableName, final List columnFamilies, * @return procedure Id */ long truncateRegion(RegionInfo regionInfo, long nonceGroup, long nonce) throws IOException; + + /** + * Roll WAL writer for all RegionServers + * @return procedure id + */ + long rollAllWALWriters(long nonceGroup, long nonce) throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 55cfc28bb53a..b99f0448e8f0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -236,6 +236,14 @@ public boolean unregisterListener(final ServerListener listener) { return this.listeners.remove(listener); } + /** + * Removes all of the ServerListeners of this collection that satisfy the given predicate. + * @param filter a predicate which returns true for ServerListener to be removed + */ + public boolean unregisterListenerIf(final Predicate filter) { + return this.listeners.removeIf(filter); + } + /** * Let the server manager know a new regionserver has come online * @param request the startup request diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java index a828b5b668fc..cb3b91ca0e20 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java @@ -103,7 +103,7 @@ public Optional remoteCallBuild(Maste newRemoteOperation(MasterProcedureEnv env); @Override - public void remoteOperationCompleted(MasterProcedureEnv env) { + public void remoteOperationCompleted(MasterProcedureEnv env, byte[] remoteResultData) { // should not be called since we use reportRegionStateTransition to report the result throw new UnsupportedOperationException(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java index e0712f1d2aa3..4cf685f50a0a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java @@ -166,7 +166,7 @@ protected boolean abort(final MasterProcedureEnv env) { } @Override - public void remoteOperationCompleted(MasterProcedureEnv env) { + public void remoteOperationCompleted(MasterProcedureEnv env, byte[] remoteResultData) { // should not be called for region operation until we modified the open/close region procedure throw new UnsupportedOperationException(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java index 7c67f0e3ee90..af482aeff281 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java @@ -149,7 +149,7 @@ public void remoteCallFailed(MasterProcedureEnv env, ServerName serverName, IOEx } @Override - public void remoteOperationCompleted(MasterProcedureEnv env) { + public void remoteOperationCompleted(MasterProcedureEnv env, byte[] remoteResultData) { complete(env, null); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/LogRollProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/LogRollProcedure.java new file mode 100644 index 000000000000..a61b2c4afa55 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/LogRollProcedure.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.ServerListener; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.LastHighestWalFilenum; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.LogRollProcedureState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.LogRollRemoteProcedureResult; + +/** + * The procedure to perform WAL rolling on all of RegionServers. + */ +@InterfaceAudience.Private +public class LogRollProcedure + extends StateMachineProcedure + implements GlobalProcedureInterface { + + private static final Logger LOG = LoggerFactory.getLogger(LogRollProcedure.class); + + public LogRollProcedure() { + } + + @Override + protected Flow executeFromState(MasterProcedureEnv env, LogRollProcedureState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + LOG.info("{} execute state={}", this, state); + + final ServerManager serverManager = env.getMasterServices().getServerManager(); + + try { + switch (state) { + case LOG_ROLL_ROLL_LOG_ON_RS: + // avoid potential new region server missing + serverManager.registerListener(new NewServerWALRoller(env)); + + final List subProcedures = + serverManager.getOnlineServersList().stream().map(LogRollRemoteProcedure::new).toList(); + addChildProcedure(subProcedures.toArray(new LogRollRemoteProcedure[0])); + setNextState(LogRollProcedureState.LOG_ROLL_COLLECT_RS_HIGHEST_WAL_FILENUM); + return Flow.HAS_MORE_STATE; + case LOG_ROLL_COLLECT_RS_HIGHEST_WAL_FILENUM: + // get children procedure + List children = + env.getMasterServices().getMasterProcedureExecutor().getProcedures().stream() + .filter(p -> p instanceof LogRollRemoteProcedure) + .filter(p -> p.getParentProcId() == getProcId()).map(p -> (LogRollRemoteProcedure) p) + .toList(); + LastHighestWalFilenum.Builder builder = LastHighestWalFilenum.newBuilder(); + for (Procedure child : children) { + LogRollRemoteProcedureResult result = + LogRollRemoteProcedureResult.parseFrom(child.getResult()); + builder.putFileNum(ProtobufUtil.toServerName(result.getServerName()).toString(), + result.getLastHighestWalFilenum()); + } + setResult(builder.build().toByteArray()); + setNextState(LogRollProcedureState.LOG_ROLL_UNREGISTER_SERVER_LISTENER); + return Flow.HAS_MORE_STATE; + case LOG_ROLL_UNREGISTER_SERVER_LISTENER: + serverManager.unregisterListenerIf(l -> l instanceof NewServerWALRoller); + return Flow.NO_MORE_STATE; + } + } catch (Exception e) { + setFailure("log-roll", e); + } + return Flow.NO_MORE_STATE; + } + + @Override + public String getGlobalId() { + return getClass().getSimpleName(); + } + + private static final class NewServerWALRoller implements ServerListener { + + private final MasterProcedureEnv env; + + public NewServerWALRoller(MasterProcedureEnv env) { + this.env = env; + } + + @Override + public void serverAdded(ServerName server) { + env.getMasterServices().getMasterProcedureExecutor() + .submitProcedure(new LogRollRemoteProcedure(server)); + } + } + + @Override + protected void rollbackState(MasterProcedureEnv env, LogRollProcedureState state) { + // nothing to rollback + } + + @Override + protected LogRollProcedureState getState(int stateId) { + return LogRollProcedureState.forNumber(stateId); + } + + @Override + protected int getStateId(LogRollProcedureState state) { + return state.getNumber(); + } + + @Override + protected LogRollProcedureState getInitialState() { + return LogRollProcedureState.LOG_ROLL_ROLL_LOG_ON_RS; + } + + @Override + protected boolean abort(MasterProcedureEnv env) { + return false; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.serializeStateData(serializer); + + if (getResult() != null && getResult().length > 0) { + serializer.serialize(LastHighestWalFilenum.parseFrom(getResult())); + } else { + serializer.serialize(LastHighestWalFilenum.getDefaultInstance()); + } + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.deserializeStateData(serializer); + + if (getResult() == null) { + LastHighestWalFilenum lastHighestWalFilenum = + serializer.deserialize(LastHighestWalFilenum.class); + if (lastHighestWalFilenum != null) { + if ( + lastHighestWalFilenum.getFileNumMap().isEmpty() + && getCurrentState() == LogRollProcedureState.LOG_ROLL_UNREGISTER_SERVER_LISTENER + ) { + LOG.warn("pid = {}, current state is the last state, but rsHighestWalFilenumMap is " + + "empty, this should not happen. Are all region servers down ?", getProcId()); + } else { + setResult(lastHighestWalFilenum.toByteArray()); + } + } + } + } + + @Override + protected void toStringClassDetails(StringBuilder sb) { + sb.append(getClass().getSimpleName()); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/LogRollRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/LogRollRemoteProcedure.java new file mode 100644 index 000000000000..df8e02ed6010 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/LogRollRemoteProcedure.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.util.Optional; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; +import org.apache.hadoop.hbase.regionserver.LogRollCallable; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.LogRollRemoteProcedureStateData; + +/** + * The remote procedure to perform WAL rolling on the specific RegionServer without retrying. + */ +@InterfaceAudience.Private +public class LogRollRemoteProcedure extends ServerRemoteProcedure + implements ServerProcedureInterface { + + public LogRollRemoteProcedure() { + } + + public LogRollRemoteProcedure(ServerName targetServer) { + this.targetServer = targetServer; + } + + @Override + protected void rollback(MasterProcedureEnv env) { + throw new UnsupportedOperationException(); + } + + @Override + protected boolean abort(MasterProcedureEnv env) { + return false; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + serializer.serialize(LogRollRemoteProcedureStateData.newBuilder() + .setTargetServer(ProtobufUtil.toServerName(targetServer)).build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + LogRollRemoteProcedureStateData data = + serializer.deserialize(LogRollRemoteProcedureStateData.class); + this.targetServer = ProtobufUtil.toServerName(data.getTargetServer()); + } + + @Override + public Optional remoteCallBuild(MasterProcedureEnv env, ServerName serverName) { + return Optional.of(new ServerOperation(this, getProcId(), LogRollCallable.class, + LogRollRemoteProcedureStateData.getDefaultInstance().toByteArray(), + env.getMasterServices().getMasterActiveTime())); + } + + @Override + public ServerName getServerName() { + return targetServer; + } + + @Override + public boolean hasMetaTableRegion() { + return false; + } + + @Override + public ServerOperationType getServerOperationType() { + return ServerOperationType.LOG_ROLL; + } + + @Override + protected boolean complete(MasterProcedureEnv env, Throwable error) { + // do not retry. just returns. + if (error != null) { + LOG.warn("Failed to roll wal for {}", targetServer, error); + return false; + } else { + return true; + } + } + + @Override + public synchronized void remoteOperationCompleted(MasterProcedureEnv env, + byte[] remoteResultData) { + setResult(remoteResultData); + super.remoteOperationCompleted(env, remoteResultData); + } + + @Override + protected void toStringClassDetails(StringBuilder sb) { + sb.append(getClass().getSimpleName()).append(" targetServer=").append(targetServer); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java index e73b23a3f965..b7ff6db67dbb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java @@ -62,6 +62,11 @@ public enum ServerOperationType { * Re-read the hbase:quotas table and update {@link QuotaCache}. */ RELOAD_QUOTAS, + + /** + * send roll log request to region server and handle the response + */ + LOG_ROLL } /** Returns Name of this server instance. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java index 57912f419039..55920bd47b38 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java @@ -44,6 +44,7 @@ public boolean requireExclusiveLock(Procedure proc) { case CLAIM_REPLICATION_QUEUE_REMOTE: case VERIFY_SNAPSHOT: case RELOAD_QUOTAS: + case LOG_ROLL: return false; default: break; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java index 0c89b6396417..563961d765e5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java @@ -123,7 +123,8 @@ public synchronized void remoteCallFailed(MasterProcedureEnv env, ServerName ser } @Override - public synchronized void remoteOperationCompleted(MasterProcedureEnv env) { + public synchronized void remoteOperationCompleted(MasterProcedureEnv env, + byte[] remoteResultData) { state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_REPORT_SUCCEED; remoteOperationDone(env, null); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotRegionProcedure.java index 05621767e7f8..f4df40b168f0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotRegionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotRegionProcedure.java @@ -108,7 +108,7 @@ public void remoteCallFailed(MasterProcedureEnv env, ServerName serverName, IOEx } @Override - public void remoteOperationCompleted(MasterProcedureEnv env) { + public void remoteOperationCompleted(MasterProcedureEnv env, byte[] remoteResultData) { complete(env, null); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/BaseRSProcedureCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/BaseRSProcedureCallable.java index 68aac1ef6e2d..7ea98d00cc7b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/BaseRSProcedureCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/BaseRSProcedureCallable.java @@ -28,12 +28,11 @@ public abstract class BaseRSProcedureCallable implements RSProcedureCallable { private Exception initError; @Override - public final Void call() throws Exception { + public final byte[] call() throws Exception { if (initError != null) { throw initError; } - doCall(); - return null; + return doCall(); } @Override @@ -46,7 +45,7 @@ public final void init(byte[] parameter, HRegionServer rs) { } } - protected abstract void doCall() throws Exception; + protected abstract byte[] doCall() throws Exception; protected abstract void initParameter(byte[] parameter) throws Exception; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/RSProcedureCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/RSProcedureCallable.java index 635d2b6f87a5..7ed9ff7664b4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/RSProcedureCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/RSProcedureCallable.java @@ -26,7 +26,7 @@ * A general interface for a sub procedure runs at RS side. */ @InterfaceAudience.Private -public interface RSProcedureCallable extends Callable { +public interface RSProcedureCallable extends Callable { /** * Initialize the callable diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java index 3dd932a1736d..e39317290bbb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java @@ -43,7 +43,7 @@ public class FlushRegionCallable extends BaseRSProcedureCallable { private List columnFamilies; @Override - protected void doCall() throws Exception { + protected byte[] doCall() throws Exception { HRegion region = rs.getRegion(regionInfo.getEncodedName()); if (region == null) { throw new NotServingRegionException("region=" + regionInfo.getRegionNameAsString()); @@ -64,6 +64,7 @@ protected void doCall() throws Exception { LOG.debug("Closing region operation on {}", region); region.closeRegionOperation(); } + return null; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 2bddf7f9d275..46fcfd03a693 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1965,6 +1965,9 @@ executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SNAPSHOT_OP executorService.startExecutorService( executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_RELOAD_QUOTAS_OPERATIONS) .setCorePoolSize(rsRefreshQuotasThreads)); + final int logRollThreads = conf.getInt("hbase.regionserver.executor.log.roll.threads", 1); + executorService.startExecutorService(executorService.new ExecutorConfig() + .setExecutorType(ExecutorType.RS_LOG_ROLL).setCorePoolSize(logRollThreads)); Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller", uncaughtExceptionHandler); @@ -2199,7 +2202,7 @@ public void stop(final String msg) { */ public void stop(final String msg, final boolean force, final User user) { if (!this.stopped) { - LOG.info("***** STOPPING region server '" + this + "' *****"); + LOG.info("***** STOPPING region server '{}' *****", this); if (this.rsHost != null) { // when forced via abort don't allow CPs to override try { @@ -3547,9 +3550,9 @@ void executeProcedure(long procId, long initiatingMasterActiveTime, .submit(new RSProcedureHandler(this, procId, initiatingMasterActiveTime, callable)); } - public void remoteProcedureComplete(long procId, long initiatingMasterActiveTime, - Throwable error) { - procedureResultReporter.complete(procId, initiatingMasterActiveTime, error); + public void remoteProcedureComplete(long procId, long initiatingMasterActiveTime, Throwable error, + byte[] procResultData) { + procedureResultReporter.complete(procId, initiatingMasterActiveTime, error, procResultData); } void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRollCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRollCallable.java new file mode 100644 index 000000000000..11dc28c2a682 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRollCallable.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable; +import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.AbstractWALRoller; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.LogRollRemoteProcedureResult; + +@InterfaceAudience.Private +public class LogRollCallable extends BaseRSProcedureCallable { + + private static final Logger LOG = LoggerFactory.getLogger(LogRollCallable.class); + + private int maxRollRetry; + + @Override + protected byte[] doCall() throws Exception { + for (int nAttempt = 0; nAttempt < maxRollRetry; nAttempt++) { + try { + Pair filenumPairBefore = getFilenumPair(); + + rs.getWalRoller().requestRollAll(); + rs.getWalRoller().waitUntilWalRollFinished(); + + Pair filenumPairAfter = getFilenumPair(); + LOG.info( + "Before rolling log, highest filenum = {} default WAL filenum = {}, After " + + "rolling log, highest filenum = {} default WAL filenum = {}", + filenumPairBefore.getFirst(), filenumPairBefore.getSecond(), filenumPairAfter.getFirst(), + filenumPairAfter.getSecond()); + return LogRollRemoteProcedureResult.newBuilder() + .setServerName(ProtobufUtil.toServerName(rs.getServerName())) + .setLastHighestWalFilenum(filenumPairBefore.getFirst()).build().toByteArray(); + } catch (Exception e) { + LOG.warn("Failed rolling log on attempt={}", nAttempt, e); + if (nAttempt == maxRollRetry - 1) { + throw e; + } + } + } + return null; + } + + private Pair getFilenumPair() throws IOException { + long highestFilenum = rs.getWALs().stream() + .mapToLong(wal -> ((AbstractFSWAL) wal).getFilenum()).max().orElse(-1L); + long defaultWALFilenum = ((AbstractFSWAL) rs.getWAL(null)).getFilenum(); + return Pair.newPair(highestFilenum, defaultWALFilenum); + } + + @Override + protected void initParameter(byte[] parameter) throws Exception { + this.maxRollRetry = rs.getConfiguration().getInt(AbstractWALRoller.WAL_ROLL_RETRIES, 1); + } + + @Override + public EventType getEventType() { + return EventType.RS_LOG_ROLL; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index fa0c2fd3ff15..ab2503e65d03 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -3954,7 +3954,7 @@ private void executeProcedures(RemoteProcedureRequest request) { LOG.warn("Failed to instantiating remote procedure {}, pid={}", request.getProcClass(), request.getProcId(), e); server.remoteProcedureComplete(request.getProcId(), request.getInitiatingMasterActiveTime(), - e); + e, null); return; } callable.init(request.getProcData().toByteArray(), server); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReloadQuotasCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReloadQuotasCallable.java index e134dfda7ac8..de23db37856a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReloadQuotasCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReloadQuotasCallable.java @@ -29,9 +29,10 @@ public class ReloadQuotasCallable extends BaseRSProcedureCallable { private static final Logger LOG = LoggerFactory.getLogger(ReloadQuotasCallable.class); @Override - protected void doCall() throws Exception { + protected byte[] doCall() throws Exception { LOG.info("Reloading quotas"); rs.getRegionServerRpcQuotaManager().reload(); + return null; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java index 21016fe59dd0..7fcf363a919c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java @@ -28,6 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RemoteProcedureResult; @@ -51,7 +52,8 @@ public RemoteProcedureResultReporter(HRegionServer server) { this.server = server; } - public void complete(long procId, long initiatingMasterActiveTime, Throwable error) { + public void complete(long procId, long initiatingMasterActiveTime, Throwable error, + byte[] procReturnValue) { RemoteProcedureResult.Builder builder = RemoteProcedureResult.newBuilder().setProcId(procId) .setInitiatingMasterActiveTime(initiatingMasterActiveTime); if (error != null) { @@ -62,6 +64,9 @@ public void complete(long procId, long initiatingMasterActiveTime, Throwable err LOG.debug("Successfully complete execution of pid={}", procId); builder.setStatus(RemoteProcedureResult.Status.SUCCESS); } + if (procReturnValue != null) { + builder.setProcResultData(ByteString.copyFrom(procReturnValue)); + } results.add(builder.build()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotRegionCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotRegionCallable.java index 0693aee87508..7158671efb1b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotRegionCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotRegionCallable.java @@ -41,7 +41,7 @@ public class SnapshotRegionCallable extends BaseRSProcedureCallable { private ForeignExceptionDispatcher monitor; @Override - protected void doCall() throws Exception { + protected byte[] doCall() throws Exception { HRegion region = rs.getRegion(regionInfo.getEncodedName()); if (region == null) { throw new NotServingRegionException( @@ -78,6 +78,7 @@ protected void doCall() throws Exception { LOG.debug("Closing snapshot operation on {}", region); region.closeRegionOperation(Region.Operation.SNAPSHOT); } + return null; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotVerifyCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotVerifyCallable.java index db7908d81be8..76a3c1cf84e9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotVerifyCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotVerifyCallable.java @@ -32,8 +32,9 @@ public class SnapshotVerifyCallable extends BaseRSProcedureCallable { private RegionInfo region; @Override - protected void doCall() throws Exception { + protected byte[] doCall() throws Exception { rs.getRsSnapshotVerifier().verifyRegion(snapshot, region); + return null; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java index 151c865db794..e6ae50f6e9ab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java @@ -79,7 +79,7 @@ public static class ErrorWALSplitException extends HBaseIOException { } @Override - protected void doCall() throws Exception { + protected byte[] doCall() throws Exception { // grab a lock splitWALLock = splitWALLocks.acquireLock(walPath); try { @@ -97,6 +97,7 @@ protected void doCall() throws Exception { } finally { splitWALLock.unlock(); } + return null; } public String getWalPath() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java index 6eacc6b78e6a..3e150144f2c0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java @@ -51,14 +51,16 @@ public RSProcedureHandler(HRegionServer rs, long procId, long initiatingMasterAc @Override public void process() { Throwable error = null; + byte[] procResultData = null; try { MDC.put("pid", Long.toString(procId)); - callable.call(); + procResultData = callable.call(); } catch (Throwable t) { - LOG.error("pid=" + this.procId, t); + LOG.error("pid={}", this.procId, t); error = t; } finally { - ((HRegionServer) server).remoteProcedureComplete(procId, initiatingMasterActiveTime, error); + ((HRegionServer) server).remoteProcedureComplete(procId, initiatingMasterActiveTime, error, + procResultData); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ClaimReplicationQueueCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ClaimReplicationQueueCallable.java index 2b7e14f9f7aa..73fa29766186 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ClaimReplicationQueueCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ClaimReplicationQueueCallable.java @@ -39,9 +39,10 @@ public EventType getEventType() { } @Override - protected void doCall() throws Exception { + protected byte[] doCall() throws Exception { PeerProcedureHandler handler = rs.getReplicationSourceService().getPeerProcedureHandler(); handler.claimReplicationQueue(queueId); + return null; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java index 094a61dcdd1f..5d4454c14484 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java @@ -43,7 +43,7 @@ public class RefreshPeerCallable extends BaseRSProcedureCallable { private int stage; @Override - protected void doCall() throws Exception { + protected byte[] doCall() throws Exception { LOG.info("Received a peer change event, peerId=" + peerId + ", type=" + type); PeerProcedureHandler handler = rs.getReplicationSourceService().getPeerProcedureHandler(); switch (type) { @@ -68,6 +68,7 @@ protected void doCall() throws Exception { default: throw new IllegalArgumentException("Unknown peer modification type: " + type); } + return null; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java index 427fe80b0c36..ed368e18981d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java @@ -69,7 +69,7 @@ public class ReplaySyncReplicationWALCallable extends BaseRSProcedureCallable { private final KeyLocker peersLock = new KeyLocker<>(); @Override - protected void doCall() throws Exception { + protected byte[] doCall() throws Exception { LOG.info("Received a replay sync replication wals {} event, peerId={}", wals, peerId); if (rs.getReplicationSinkService() != null) { Lock peerLock = peersLock.acquireLock(wals.get(0)); @@ -81,6 +81,7 @@ protected void doCall() throws Exception { peerLock.unlock(); } } + return null; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SwitchRpcThrottleRemoteCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SwitchRpcThrottleRemoteCallable.java index d09c821b9edc..fd35464e686c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SwitchRpcThrottleRemoteCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SwitchRpcThrottleRemoteCallable.java @@ -34,8 +34,9 @@ public class SwitchRpcThrottleRemoteCallable extends BaseRSProcedureCallable { private boolean rpcThrottleEnabled; @Override - protected void doCall() throws Exception { + protected byte[] doCall() throws Exception { rs.getRegionServerRpcQuotaManager().switchRpcThrottle(rpcThrottleEnabled); + return null; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java index c900333af9eb..5e6457211344 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java @@ -69,7 +69,7 @@ public abstract class AbstractWALRoller extends Thread impl * Configure for the max count of log rolling retry. The real retry count is also limited by the * timeout of log rolling via {@link #WAL_ROLL_WAIT_TIMEOUT} */ - protected static final String WAL_ROLL_RETRIES = "hbase.regionserver.logroll.retries"; + public static final String WAL_ROLL_RETRIES = "hbase.regionserver.logroll.retries"; protected final ConcurrentMap wals = new ConcurrentHashMap<>(); protected final T abortable; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java index e78ca7d0cdb7..daaa2e5c2b99 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java @@ -568,4 +568,9 @@ public long flushTable(TableName tableName, List columnFamilies, long no long nonce) throws IOException { return 0; } + + @Override + public long rollAllWALWriters(long nonceGroup, long nonce) throws IOException { + return 0; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestLogRollProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestLogRollProcedure.java new file mode 100644 index 000000000000..1b587097ddad --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestLogRollProcedure.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import static org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.DISPATCH_DELAY_CONF_KEY; +import static org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.DISPATCH_MAX_QUEUE_SIZE_CONF_KEY; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.SingleProcessHBaseCluster; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category(MediumTests.class) +public class TestLogRollProcedure { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestLogRollProcedure.class); + + @Rule + public TestName name = new TestName(); + + private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + + private Configuration conf; + + @Before + public void setUp() throws Exception { + conf = TEST_UTIL.getConfiguration(); + conf.set(DISPATCH_DELAY_CONF_KEY, "2000"); + conf.set(DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, "128"); + TEST_UTIL.startMiniCluster(2); + } + + @After + public void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testSimpleLogRoll() throws IOException { + HRegionServer rs = TEST_UTIL.getHBaseCluster().getRegionServer(0); + long fileNumBefore = ((AbstractFSWAL) rs.getWAL(null)).getFilenum(); + + TEST_UTIL.getAdmin().rollAllWALWriters(); + + long fileNumAfter = ((AbstractFSWAL) rs.getWAL(null)).getFilenum(); + assertTrue(fileNumAfter > fileNumBefore); + } + + @Test + public void testMasterRestarts() throws IOException { + SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + HRegionServer rs = cluster.getRegionServer(0); + long fileNumBefore = ((AbstractFSWAL) rs.getWAL(null)).getFilenum(); + + LogRollProcedure procedure = new LogRollProcedure(); + long procId = cluster.getMaster().getMasterProcedureExecutor().submitProcedure(procedure); + + TEST_UTIL.waitFor(60000, () -> cluster.getMaster().getMasterProcedureExecutor().getProcedures() + .stream().anyMatch(p -> p instanceof LogRollRemoteProcedure)); + ServerName serverName = cluster.getMaster().getServerName(); + cluster.killMaster(serverName); + cluster.waitForMasterToStop(serverName, 30000); + cluster.startMaster(); + cluster.waitForActiveAndReadyMaster(); + + ProcedureExecutor exec = cluster.getMaster().getMasterProcedureExecutor(); + TEST_UTIL.waitFor(30000, () -> exec.isRunning() && exec.isFinished(procId)); + + long fileNumAfter = ((AbstractFSWAL) rs.getWAL(null)).getFilenum(); + + assertTrue(fileNumAfter > fileNumBefore); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java index 1500a3c00cd3..f828f5ce1ba6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java @@ -188,7 +188,8 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws } @Override - public synchronized void remoteOperationCompleted(MasterProcedureEnv env) { + public synchronized void remoteOperationCompleted(MasterProcedureEnv env, + byte[] remoteResultData) { complete(env, null); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestRegisterPeerWorkerWhenRestarting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestRegisterPeerWorkerWhenRestarting.java index 1c4abd15eaf0..c0a37c20e88b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestRegisterPeerWorkerWhenRestarting.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestRegisterPeerWorkerWhenRestarting.java @@ -57,14 +57,14 @@ public HMasterForTest(Configuration conf) throws IOException { } @Override - public void remoteProcedureCompleted(long procId) { + public void remoteProcedureCompleted(long procId, byte[] data) { if ( FAIL && getMasterProcedureExecutor() .getProcedure(procId) instanceof SyncReplicationReplayWALRemoteProcedure ) { throw new RuntimeException("Inject error"); } - super.remoteProcedureCompleted(procId); + super.remoteProcedureCompleted(procId, data); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java index 35c868413e19..4d592b49d0d3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java @@ -526,6 +526,11 @@ public void rollWALWriter(ServerName serverName) throws IOException, FailedLogCl admin.rollWALWriter(serverName); } + @Override + public Map rollAllWALWriters() throws IOException { + return admin.rollAllWALWriters(); + } + public CompactionState getCompactionState(TableName tableName) throws IOException { return admin.getCompactionState(tableName); } diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index 5ceaf2a08c72..93cc312338c9 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -179,6 +179,12 @@ def wal_roll(server_name) # TODO: remove older hlog_roll version alias hlog_roll wal_roll + #---------------------------------------------------------------------------------------------- + # Requests all region servers to roll wal writer + def wal_roll_all + @admin.rollAllWALWriters + end + #---------------------------------------------------------------------------------------------- # Requests a table or region split def split(table_or_region_name, split_point = nil) diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb index 46b38dd96b89..6be3854b8a57 100644 --- a/hbase-shell/src/main/ruby/shell.rb +++ b/hbase-shell/src/main/ruby/shell.rb @@ -467,6 +467,7 @@ def self.exception_handler(hide_traceback) unassign zk_dump wal_roll + wal_roll_all hbck_chore_run catalogjanitor_run catalogjanitor_switch diff --git a/hbase-shell/src/main/ruby/shell/commands/wal_roll_all.rb b/hbase-shell/src/main/ruby/shell/commands/wal_roll_all.rb new file mode 100644 index 000000000000..13d764495653 --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/wal_roll_all.rb @@ -0,0 +1,37 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +module Shell + module Commands + class WalRollAll < Command + def help + <<-EOF +Request all region servers to roll wal writer. Note that this method is synchronous, +which means it will block until all RegionServers have completed the log roll, +or a RegionServer fails due to an exception that retry will not work. Here is how +you would run the command in the hbase shell: + hbase> wal_roll_all +EOF + end + + def command + admin.wal_roll_all + end + end + end +end diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java index 0eff84bba7c8..a0d73dcca21c 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java @@ -826,7 +826,11 @@ public String getLocks() { @Override public void rollWALWriter(ServerName serverName) { throw new NotImplementedException("rollWALWriter not supported in ThriftAdmin"); + } + @Override + public Map rollAllWALWriters() { + throw new NotImplementedException("rollAllWALWriters not supported in ThriftAdmin"); } @Override