Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.TableName;
Expand All @@ -36,7 +35,6 @@
import org.apache.hadoop.hbase.backup.BackupRequest;
import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
Expand Down Expand Up @@ -158,10 +156,7 @@ public void execute() throws IOException {
// snapshots for the same reason as the log rolls.
List<BulkLoad> bulkLoadsToDelete = backupManager.readBulkloadRows(tableList);

Map<String, String> props = new HashMap<>();
props.put("backupRoot", backupInfo.getBackupRootDir());
admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE,
LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props);
BackupUtils.logRoll(conn, backupInfo.getBackupRootDir(), conf);

newTimestamps = backupManager.readRegionServerLastLogRollResult();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -29,9 +28,7 @@
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.util.CommonFSUtils;
Expand Down Expand Up @@ -84,13 +81,8 @@ public Map<String, Long> getIncrBackupLogFileMap() throws IOException {
}

LOG.info("Execute roll log procedure for incremental backup ...");
HashMap<String, String> props = new HashMap<>();
props.put("backupRoot", backupInfo.getBackupRootDir());
BackupUtils.logRoll(conn, backupInfo.getBackupRootDir(), conf);

try (Admin admin = conn.getAdmin()) {
admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE,
LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props);
}
newTimestamps = readRegionServerLastLogRollResult();

logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import org.apache.hadoop.hbase.backup.RestoreRequest;
import org.apache.hadoop.hbase.backup.impl.BackupManifest;
import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
Expand All @@ -65,6 +67,7 @@
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;

Expand Down Expand Up @@ -770,4 +773,52 @@ public static String findMostRecentBackupId(String[] backupIds) {
return BackupRestoreConstants.BACKUPID_PREFIX + recentTimestamp;
}

/**
* roll WAL writer for all region servers and record the newest log roll result
*/
public static void logRoll(Connection conn, String backupRootDir, Configuration conf)
throws IOException {
boolean legacy = conf.getBoolean("hbase.backup.logroll.legacy.used", false);
if (legacy) {
logRollV1(conn, backupRootDir);
} else {
logRollV2(conn, backupRootDir);
}
}

private static void logRollV1(Connection conn, String backupRootDir) throws IOException {
try (Admin admin = conn.getAdmin()) {
admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE,
LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME,
ImmutableMap.of("backupRoot", backupRootDir));
}
}

private static void logRollV2(Connection conn, String backupRootDir) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that this method handles cleaning up any servers that no longer exist on the cluster, which means we'll hang onto oldWALs from those hosts indefinitely due to the BackupLogCleaner

Please correct me if I'm misunderstanding, but I think we also want to make sure that we're removing entries from the system tables for hosts that are no longer part of the cluster

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need something like

BackupSystemTable

public void deleteRegionServerLastLogRollResult(String server, String backupRoot) throws IOException {
    LOG.trace("delete region server last roll log result to backup system table");

    try (Table table = connection.getTable(tableName)) {
      Delete delete = new Delete(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL, server));
      table.delete(delete);
    }
  }

BackupUtils

private static void logRollV2(Connection conn, String backupRootDir) throws IOException {
    BackupSystemTable backupSystemTable = new BackupSystemTable(conn);
    HashMap<String, Long> lastLogRollResult =
      backupSystemTable.readRegionServerLastLogRollResult(backupRootDir);
    try (Admin admin = conn.getAdmin()) {
      Map<ServerName, Long> newLogRollResult = admin.rollAllWALWriters();

      for (Map.Entry<ServerName, Long> entry : newLogRollResult.entrySet()) {
        ServerName serverName = entry.getKey();
        long newHighestWALFilenum = entry.getValue();

        String address = serverName.getAddress().toString();
        Long lastHighestWALFilenum = lastLogRollResult.get(address);
        if (lastHighestWALFilenum != null && lastHighestWALFilenum > newHighestWALFilenum) {
          LOG.warn("Won't update last roll log result for server {}: current = {}, new = {}",
            serverName, lastHighestWALFilenum, newHighestWALFilenum);
        } else {
          backupSystemTable.writeRegionServerLastLogRollResult(address, newHighestWALFilenum,
            backupRootDir);
          if (LOG.isDebugEnabled()) {
            LOG.debug("updated last roll log result for {} from {} to {}", serverName,
              lastHighestWALFilenum, newHighestWALFilenum);
          }
        }
      }

      // New Code Here
      for (String server: lastLogRollResult.keySet()) {
        if (!newLogRollResult.containsKey(ServerName.parseServerName(server))) {
          backupSystemTable.deleteRegionServerLastLogRollResult(server, backupRootDir);
        }
      }
    }
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that this method handles cleaning up any servers that no longer exist on the cluster, which means we'll hang onto oldWALs from those hosts indefinitely due to the BackupLogCleaner

Please correct me if I'm misunderstanding, but I think we also want to make sure that we're removing entries from the system tables for hosts that are no longer part of the cluster

Thanks for reviewing.

I think the functionality of logRollV2 should be same as logRollV1, and I don't think any additional functionality should be introduced or existing functionality should be reduced.

As for the problem of cleaning up the dead server log roll result you mentioned, I have a few questions, would you mind explaining more to help me understand ?

  1. Will keeping old WALs from dead servers indefinitely cause any problems?
  2. If clean it up, is there any chance for potential data loss?
  3. Does zk-based log roll (ie. the logRollV1) procedure need to clean it too ?

Thanks.

Copy link
Contributor

@hgromer hgromer Sep 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and I don't think any additional functionality should be introduced or existing functionality should be reduced.

I don't necessarily agree here, though I do agree v1 isn't doing this cleanup either. To clarify, this PR doesn't introduce any issues, but thanks to your changes I think we have a good way to solve the issue I've presented. This is a beta feature, and I feel that we can iterate on the behavior of the system, esepcially if it means we're improving the system's efficiency by reducing storage overhead.

  1. Yes; the oldWALs can take up a non-trivial amount of space and should be cleaned up when they are no longer necessary. We're seeing cases where we are storing terabytes of unused, deletable data, which is expensive
  2. I'd like to talk this out, and make sure my logic makes sense. There are two types of backups. For full backups, it makes sense that we don't lose any data. We roll the WAL files, and then take a snapshot of all the HFiles on the cluster, so those WAL files are backed up. For incremental backups, we roll the WAL files then backup all WAL files from [<old_start_code>, newTimestamps). For both cases, I do not think there's any possibility of a data loss. I think as long as we delete entries from the system table after the backup completes, we should be okay
  3. It'd be nice, but I think it'd be a lot harder b/c logRollV1 never updates the backup system table with the newer timestamps as far as I can tell. Given this feature is still in beta, I'm happy to move forward with the v2 functionality and mark v1 as deprecated

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the kind reply @hgromer

After checking the code, I also feel that deleting the dead server log roll result is not likely to cause data loss, so considering that it can save a lot of unnecessary storage space, I support deleting too. If you don't mind, could you please file another issue and open a new PR to follow up on this issue? I can help review it.

Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me, thank you. I'll create a jira and put up a PR

BackupSystemTable backupSystemTable = new BackupSystemTable(conn);
HashMap<String, Long> lastLogRollResult =
backupSystemTable.readRegionServerLastLogRollResult(backupRootDir);
try (Admin admin = conn.getAdmin()) {
Map<ServerName, Long> newLogRollResult = admin.rollAllWALWriters();

for (Map.Entry<ServerName, Long> entry : newLogRollResult.entrySet()) {
ServerName serverName = entry.getKey();
long newHighestWALFilenum = entry.getValue();

String address = serverName.getAddress().toString();
Long lastHighestWALFilenum = lastLogRollResult.get(address);
if (lastHighestWALFilenum != null && lastHighestWALFilenum > newHighestWALFilenum) {
LOG.warn("Won't update last roll log result for server {}: current = {}, new = {}",
serverName, lastHighestWALFilenum, newHighestWALFilenum);
} else {
backupSystemTable.writeRegionServerLastLogRollResult(address, newHighestWALFilenum,
backupRootDir);
if (LOG.isDebugEnabled()) {
LOG.debug("updated last roll log result for {} from {} to {}", serverName,
lastHighestWALFilenum, newHighestWALFilenum);
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -45,7 +44,6 @@
import org.apache.hadoop.hbase.backup.impl.FullTableBackupClient;
import org.apache.hadoop.hbase.backup.impl.IncrementalBackupManager;
import org.apache.hadoop.hbase.backup.impl.IncrementalTableBackupClient;
import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
Expand Down Expand Up @@ -239,10 +237,7 @@ public void execute() throws IOException {
// the snapshot.
LOG.info("Execute roll log procedure for full backup ...");

Map<String, String> props = new HashMap<>();
props.put("backupRoot", backupInfo.getBackupRootDir());
admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE,
LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props);
BackupUtils.logRoll(conn, backupInfo.getBackupRootDir(), conf);
failStageIf(Stage.stage_2);
newTimestamps = backupManager.readRegionServerLastLogRollResult();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.File;
import java.util.List;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
Expand Down Expand Up @@ -70,17 +71,17 @@ public void TestIncBackupMergeRestore() throws Exception {

// #2 - insert some data to table1
Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
LOG.debug("writing {} rows to {}", ADD_ROWS, table1);

Assert.assertEquals(TEST_UTIL.countRows(t1), NB_ROWS_IN_BATCH + ADD_ROWS);
Assert.assertEquals(HBaseTestingUtil.countRows(t1), NB_ROWS_IN_BATCH + ADD_ROWS);
t1.close();
LOG.debug("written " + ADD_ROWS + " rows to " + table1);
LOG.debug("written {} rows to {}", ADD_ROWS, table1);

Table t2 = insertIntoTable(conn, table2, famName, 1, ADD_ROWS);

Assert.assertEquals(TEST_UTIL.countRows(t2), NB_ROWS_IN_BATCH + ADD_ROWS);
Assert.assertEquals(HBaseTestingUtil.countRows(t2), NB_ROWS_IN_BATCH + ADD_ROWS);
t2.close();
LOG.debug("written " + ADD_ROWS + " rows to " + table2);
LOG.debug("written {} rows to {}", ADD_ROWS, table2);

// #3 - incremental backup for multiple tables
tables = Lists.newArrayList(table1, table2);
Expand Down Expand Up @@ -112,15 +113,15 @@ public void TestIncBackupMergeRestore() throws Exception {
tablesRestoreIncMultiple, tablesMapIncMultiple, true));

Table hTable = conn.getTable(table1_restore);
LOG.debug("After incremental restore: " + hTable.getDescriptor());
int countRows = TEST_UTIL.countRows(hTable, famName);
LOG.debug("f1 has " + countRows + " rows");
LOG.debug("After incremental restore: {}", hTable.getDescriptor());
int countRows = HBaseTestingUtil.countRows(hTable, famName);
LOG.debug("f1 has {} rows", countRows);
Assert.assertEquals(NB_ROWS_IN_BATCH + 2 * ADD_ROWS, countRows);

hTable.close();

hTable = conn.getTable(table2_restore);
Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH + 2 * ADD_ROWS);
Assert.assertEquals(HBaseTestingUtil.countRows(hTable), NB_ROWS_IN_BATCH + 2 * ADD_ROWS);
hTable.close();

admin.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1404,6 +1404,16 @@ Future<Boolean> abortProcedureAsync(long procId, boolean mayInterruptIfRunning)
*/
void rollWALWriter(ServerName serverName) throws IOException, FailedLogCloseException;

/**
* Roll log writer for all RegionServers. Note that unlike
* {@link Admin#rollWALWriter(ServerName)}, this method is synchronous, which means it will block
* until all RegionServers have completed the log roll, or a RegionServer fails due to an
* exception that retry will not work.
* @return server and the highest wal filenum of server before performing log roll
* @throws IOException if a remote or network exception occurs
*/
Map<ServerName, Long> rollAllWALWriters() throws IOException;

/**
* Helper that delegates to getClusterMetrics().getMasterCoprocessorNames().
* @return an array of master coprocessors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,11 @@ public void rollWALWriter(ServerName serverName) throws IOException, FailedLogCl
get(admin.rollWALWriter(serverName));
}

@Override
public Map<ServerName, Long> rollAllWALWriters() throws IOException {
return get(admin.rollAllWALWriters());
}

@Override
public CompactionState getCompactionState(TableName tableName) throws IOException {
return get(admin.getCompactionState(tableName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1270,6 +1270,15 @@ default CompletableFuture<Integer> getMasterInfoPort() {
*/
CompletableFuture<Void> rollWALWriter(ServerName serverName);

/**
* Roll log writer for all RegionServers. Note that unlike
* {@link Admin#rollWALWriter(ServerName)}, this method is synchronous, which means it will block
* until all RegionServers have completed the log roll, or a RegionServer fails due to an
* exception that retry will not work.
* @return server and the highest wal filenum of server before performing log roll
*/
CompletableFuture<Map<ServerName, Long>> rollAllWALWriters();

/**
* Clear compacting queues on a region server.
* @param serverName The servername of the region server.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,11 @@ public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
return wrap(rawAdmin.rollWALWriter(serverName));
}

@Override
public CompletableFuture<Map<ServerName, Long>> rollAllWALWriters() {
return wrap(rawAdmin.rollAllWALWriters());
}

@Override
public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) {
return wrap(rawAdmin.clearCompactionQueues(serverName, queues));
Expand Down
Loading