Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
*/
package org.apache.hadoop.hbase.mapreduce;

import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
Expand All @@ -33,6 +35,7 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand Down Expand Up @@ -65,22 +68,41 @@ public class RowCounter extends AbstractHBaseTool {
private final static String OPT_END_TIME = "endtime";
private final static String OPT_RANGE = "range";
private final static String OPT_EXPECTED_COUNT = "expectedCount";
private final static String OPT_COUNT_DELETE_MARKERS = "countDeleteMarkers";

private String tableName;
private List<MultiRowRangeFilter.RowRange> rowRangeList;
private long startTime;
private long endTime;
private long expectedCount;
private boolean countDeleteMarkers;
private List<String> columns = new ArrayList<>();

private Job job;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesnot look necessary, we have been validating counters with following logic for existing tests. Please update tests to take a similar approach:

   * Run the RowCounter map reduce job and verify the row count.
   * @param args          the command line arguments to be used for rowcounter job.
   * @param expectedCount the expected row count (result of map reduce job).
   * @throws Exception in case of any unexpected error.
   */
  private void runCreateSubmittableJobWithArgs(String[] args, int expectedCount) throws Exception {
    Job job = RowCounter.createSubmittableJob(TEST_UTIL.getConfiguration(), args);
    long start = EnvironmentEdgeManager.currentTime();
    job.waitForCompletion(true);
    long duration = EnvironmentEdgeManager.currentTime() - start;
    LOG.debug("row count duration (ms): " + duration);
    assertTrue(job.isSuccessful());
    Counter counter = job.getCounters().findCounter(RowCounter.RowCounterMapper.Counters.ROWS);
    assertEquals(expectedCount, counter.getValue());
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NihalJain, the method runCreateSubmittableJobWithArgs internally calls RowCounter.createSubmittableJob(TEST_UTIL.getConfiguration(), args). However, the method createSubmittableJob is marked for deprecation - code link. So ideally, I believe (please correct me if I am wrong), we should not be making a change to that method. To use that method, we have to change the scan behaviour based on the flag.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @shubham-roy I mean we could rewriten a helper in tests similar to above example method runCreateSubmittableJobWithArgs and make assertions. IMO we should try to get rid of deprecated API as another task than mixing implementations and doing same thing in different ways at different places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we should try to get rid of deprecated API as another task than mixing implementations and doing same thing in different ways at different places.

@NihalJain , don't you think that the access to the job object via a getter method (which I exposed) could be a good starting point to getting rid of the deprecated method createSubmittableJob. I already used it in a way that could be easily extended to other use cases as well. LMK what do you think.

Copy link
Contributor

@NihalJain NihalJain Nov 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure but I would prefer to do that as another cleanup task for separation of concerns

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure but I would prefer to do that as another cleanup task for separation of concerns

@NihalJain , I agree and I am also not touching any of the other tests. I just used whatever is needed for my testing in an extensible way. Fixing of remaining tests can be taken up as a separate cleanup task.

Copy link
Contributor

@NihalJain NihalJain Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will leave this upto others as i am still not convinced. +0 from me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@virajjasani , can you please have a look at this thread and let us know of your thoughts on the same?


/**
* Mapper that runs the count.
*/
static class RowCounterMapper extends TableMapper<ImmutableBytesWritable, Result> {

/** Counter enumeration to count the actual rows. */
/** Counter enumeration to count the actual rows, cells and delete markers. */
public static enum Counters {
ROWS
ROWS,
DELETE,
DELETE_COLUMN,
DELETE_FAMILY,
DELETE_FAMILY_VERSION,
ROWS_WITH_DELETE_MARKER
}

private boolean countDeleteMarkers;

@Override
protected void
setup(Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Result>.Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
countDeleteMarkers = conf.getBoolean(OPT_COUNT_DELETE_MARKERS, false);
}

/**
Expand All @@ -95,6 +117,37 @@ public static enum Counters {
public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException {
// Count every row containing data, whether it's in qualifiers or values
context.getCounter(Counters.ROWS).increment(1);

if (countDeleteMarkers) {
boolean rowContainsDeleteMarker = false;
for (Cell cell : values.rawCells()) {
Cell.Type type = cell.getType();
switch (type) {
case Delete:
rowContainsDeleteMarker = true;
context.getCounter(Counters.DELETE).increment(1);
break;
case DeleteColumn:
rowContainsDeleteMarker = true;
context.getCounter(Counters.DELETE_COLUMN).increment(1);
break;
case DeleteFamily:
rowContainsDeleteMarker = true;
context.getCounter(Counters.DELETE_FAMILY).increment(1);
break;
case DeleteFamilyVersion:
rowContainsDeleteMarker = true;
context.getCounter(Counters.DELETE_FAMILY_VERSION).increment(1);
break;
default:
break;
}
}

if (rowContainsDeleteMarker) {
context.getCounter(Counters.ROWS_WITH_DELETE_MARKER).increment(1);
}
}
}
}

Expand All @@ -105,11 +158,14 @@ public void map(ImmutableBytesWritable row, Result values, Context context) thro
* @throws IOException When setting up the job fails.
*/
public Job createSubmittableJob(Configuration conf) throws IOException {
conf.setBoolean(OPT_COUNT_DELETE_MARKERS, this.countDeleteMarkers);
Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
job.setJarByClass(RowCounter.class);
Scan scan = new Scan();
// raw scan will be needed to account for delete markers when --countDeleteMarkers flag is set
scan.setRaw(this.countDeleteMarkers);
scan.setCacheBlocks(false);
setScanFilter(scan, rowRangeList);
setScanFilter(scan, rowRangeList, this.countDeleteMarkers);

for (String columnName : this.columns) {
String family = StringUtils.substringBefore(columnName, ":");
Expand Down Expand Up @@ -147,13 +203,15 @@ public static Job createSubmittableJob(Configuration conf, String[] args) throws
List<MultiRowRangeFilter.RowRange> rowRangeList = null;
long startTime = 0;
long endTime = 0;
boolean countDeleteMarkers = false;

StringBuilder sb = new StringBuilder();

final String rangeSwitch = "--range=";
final String startTimeArgKey = "--starttime=";
final String endTimeArgKey = "--endtime=";
final String expectedCountArg = "--expected-count=";
final String countDeleteMarkersArg = "--countDeleteMarkers";

// First argument is table name, starting from second
for (int i = 1; i < args.length; i++) {
Expand All @@ -179,10 +237,15 @@ public static Job createSubmittableJob(Configuration conf, String[] args) throws
Long.parseLong(args[i].substring(expectedCountArg.length())));
continue;
}
if (args[i].startsWith(countDeleteMarkersArg)) {
countDeleteMarkers = true;
continue;
}
// if no switch, assume column names
sb.append(args[i]);
sb.append(" ");
}
conf.setBoolean(OPT_COUNT_DELETE_MARKERS, countDeleteMarkers);
if (endTime < startTime) {
printUsage("--endtime=" + endTime + " needs to be greater than --starttime=" + startTime);
return null;
Expand All @@ -192,7 +255,9 @@ public static Job createSubmittableJob(Configuration conf, String[] args) throws
job.setJarByClass(RowCounter.class);
Scan scan = new Scan();
scan.setCacheBlocks(false);
setScanFilter(scan, rowRangeList);
// raw scan will be needed to account for delete markers when --countDeleteMarkers flag is set
scan.setRaw(countDeleteMarkers);
setScanFilter(scan, rowRangeList, countDeleteMarkers);
if (sb.length() > 0) {
for (String columnName : sb.toString().trim().split(" ")) {
String family = StringUtils.substringBefore(columnName, ":");
Expand Down Expand Up @@ -250,9 +315,11 @@ private static List<MultiRowRangeFilter.RowRange> parseRowRangeParameter(String
* Otherwise, method sets filter which is instance of {@link FirstKeyOnlyFilter}. If rowRangeList
* contains exactly one element, startRow and stopRow are set to the scan.
*/
private static void setScanFilter(Scan scan, List<MultiRowRangeFilter.RowRange> rowRangeList) {
private static void setScanFilter(Scan scan, List<MultiRowRangeFilter.RowRange> rowRangeList,
boolean countDeleteMarkers) {
final int size = rowRangeList == null ? 0 : rowRangeList.size();
if (size <= 1) {
// all cells will be needed if --countDeleteMarkers flag is set, hence, skipping filter
if (size <= 1 && !countDeleteMarkers) {
scan.setFilter(new FirstKeyOnlyFilter());
}
if (size == 1) {
Expand Down Expand Up @@ -295,10 +362,15 @@ protected void addOptions() {
.desc("[startKey],[endKey][;[startKey],[endKey]...]]").longOpt(OPT_RANGE).build();
Option expectedOption = Option.builder(null).valueSeparator('=').hasArg(true)
.desc("expected number of rows to be count.").longOpt(OPT_EXPECTED_COUNT).build();
Option countDeleteMarkersOption = Option.builder(null).hasArg(false)
.desc("counts the number of Delete Markers of all types, i.e. "
+ "(DELETE, DELETE_COLUMN, DELETE_FAMILY, DELETE_FAMILY_VERSION)")
.longOpt(OPT_COUNT_DELETE_MARKERS).build();
addOption(startTimeOption);
addOption(endTimeOption);
addOption(rangeOption);
addOption(expectedOption);
addOption(countDeleteMarkersOption);
}

@Override
Expand All @@ -316,6 +388,7 @@ protected void processOptions(CommandLine cmd) throws IllegalArgumentException {
this.startTime = cmd.getOptionValue(OPT_START_TIME) == null
? 0
: Long.parseLong(cmd.getOptionValue(OPT_START_TIME));
this.countDeleteMarkers = cmd.hasOption(OPT_COUNT_DELETE_MARKERS);

for (int i = 1; i < cmd.getArgList().size(); i++) {
String argument = cmd.getArgList().get(i);
Expand Down Expand Up @@ -347,7 +420,7 @@ protected void processOldArgs(List<String> args) {

@Override
protected int doWork() throws Exception {
Job job = createSubmittableJob(getConf());
job = createSubmittableJob(getConf());
if (job == null) {
return -1;
}
Expand Down Expand Up @@ -388,4 +461,10 @@ protected CommandLineParser newParser() {
return new RowCounterCommandLineParser();
}

@RestrictedApi(explanation = "Only visible for testing", link = "",
allowedOnPath = ".*/src/test/.*")
Job getMapReduceJob() {
return job;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.LargeTests;
Expand All @@ -37,6 +39,7 @@
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.LauncherSecurityManager;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -524,6 +527,137 @@ public void testInvalidTable() throws Exception {
}
}

/**
* Step 1: Add 10 rows(row1, row2, row3, row4, row5, row6, row7, row8, row9, row10) to a table.
* Each row contains 1 column family and 4 columns and values for two different timestamps - 5 &
* 10.
* <p>
* Step 2: Delete the latest version of column A for row1. --> 1 X Delete
* <p>
* Step 3: Delete the cell for timestamp 5 of column B for row1. --> 1 X Delete
* <p>
* Step 4: Delete a column family for row2 and row4. --> 2 X DeleteFamily
* <p>
* Step 5: Delete all versions of a specific column for row3, row5 and row6. --> 3 X DeleteColumn
* <p>
* Step 6: Delete all columns for timestamp 5 for row 7. --> 1 X DeleteFamilyVersion
* <p>
* Case 1: Run row counter without countDeleteMarkers and validate counter values.
* <p>
* Case 2: Run row counter with countDeleteMarkers flag and validate counter values.
* <p>
* Case 3: Run row counter with countDeleteMarkers flag for a row range and validate counter
* values.
*/
@Test
public void testRowCounterWithCountDeleteMarkersOption() throws Exception {
// Test Setup

final TableName tableName =
TableName.valueOf(TABLE_NAME + "_" + "withCountDeleteMarkersOption");
// Row keys are represented in this way because of HBASE-15287
final byte[][] rowKeys = { Bytes.toBytesBinary("\\x00row1"), Bytes.toBytesBinary("\\x00row2"),
Bytes.toBytesBinary("\\x00row3"), Bytes.toBytesBinary("\\x00row4"),
Bytes.toBytesBinary("\\x00row5"), Bytes.toBytesBinary("\\x00row6"),
Bytes.toBytesBinary("\\x00row7"), Bytes.toBytesBinary("\\x00row8"),
Bytes.toBytesBinary("\\x00row9"), Bytes.toBytesBinary("\\x00row10") };
final byte[] columnFamily = Bytes.toBytes("cf");
final byte[][] columns =
{ Bytes.toBytes("A"), Bytes.toBytes("B"), Bytes.toBytes("C"), Bytes.toBytes("D") };
final byte[][] values = { Bytes.toBytes("a"), Bytes.toBytes("b") };

try (Table table = TEST_UTIL.createTable(tableName, columnFamily)) {
// Step 1: Insert rows with columns
for (byte[] rowKey : rowKeys) {
Put put = new Put(rowKey);
for (byte[] col : columns) {
long timestamp = 5L;
for (byte[] value : values) {
put.addColumn(columnFamily, col, timestamp, value);
timestamp += 5L;
}
}
table.put(put);
}
TEST_UTIL.getAdmin().flush(tableName);

// Steps 2-6
Delete deleteA = new Delete(rowKeys[0]).addColumn(columnFamily, columns[0]);
Delete deleteB = new Delete(rowKeys[0]).addColumn(columnFamily, columns[1], 5L);
Delete deleteC = new Delete(rowKeys[1]).addFamily(columnFamily);
Delete deleteD = new Delete(rowKeys[2]).addColumns(columnFamily, columns[0]);
Delete deleteE = new Delete(rowKeys[3]).addFamily(columnFamily);
Delete deleteF = new Delete(rowKeys[4]).addColumns(columnFamily, columns[0]);
Delete deleteG = new Delete(rowKeys[5]).addColumns(columnFamily, columns[0]);
Delete deleteH = new Delete(rowKeys[6]).addFamilyVersion(columnFamily, 5L);

table.delete(deleteA);
table.delete(deleteB);
table.delete(deleteC);
table.delete(deleteD);
table.delete(deleteE);
table.delete(deleteF);
table.delete(deleteG);
table.delete(deleteH);
TEST_UTIL.getAdmin().flush(tableName);
}

RowCounter rowCounterWithoutCountDeleteMarkers = new RowCounter();
RowCounter rowCounterWithCountDeleteMarkers = new RowCounter();
RowCounter rowCounterForRangeWithCountDeleteMarkers = new RowCounter();
rowCounterWithoutCountDeleteMarkers.setConf(new Configuration(TEST_UTIL.getConfiguration()));
rowCounterWithCountDeleteMarkers.setConf(new Configuration(TEST_UTIL.getConfiguration()));
rowCounterForRangeWithCountDeleteMarkers
.setConf(new Configuration(TEST_UTIL.getConfiguration()));

// Invocation

rowCounterWithoutCountDeleteMarkers.run(new String[] { tableName.getNameAsString() });
rowCounterWithCountDeleteMarkers
.run(new String[] { tableName.getNameAsString(), "--countDeleteMarkers" });
rowCounterForRangeWithCountDeleteMarkers.run(new String[] { tableName.getNameAsString(),
"--countDeleteMarkers", "--range=\\x00row8,\\x00row9" });

// Validation

// Case 1:
validateCounterCounts(rowCounterWithoutCountDeleteMarkers.getMapReduceJob().getCounters(), 8, 0,
0, 0, 0, 0);

// Case 2:
validateCounterCounts(rowCounterWithCountDeleteMarkers.getMapReduceJob().getCounters(), 10, 7,
2, 3, 2, 1);

// Case 3:
validateCounterCounts(rowCounterForRangeWithCountDeleteMarkers.getMapReduceJob().getCounters(),
1, 0, 0, 0, 0, 0);
}

private void validateCounterCounts(Counters counters, long rowCount,
long rowsWithDeleteMarkersCount, long deleteCount, long deleteColumnCount,
long deleteFamilyCount, long deleteFamilyVersionCount) {

long actualRowCount =
counters.findCounter(RowCounter.RowCounterMapper.Counters.ROWS).getValue();
long actualRowsWithDeleteMarkersCount =
counters.findCounter(RowCounter.RowCounterMapper.Counters.ROWS_WITH_DELETE_MARKER).getValue();
long actualDeleteCount =
counters.findCounter(RowCounter.RowCounterMapper.Counters.DELETE).getValue();
long actualDeleteColumnCount =
counters.findCounter(RowCounter.RowCounterMapper.Counters.DELETE_COLUMN).getValue();
long actualDeleteFamilyCount =
counters.findCounter(RowCounter.RowCounterMapper.Counters.DELETE_FAMILY).getValue();
long actualDeleteFamilyVersionCount =
counters.findCounter(RowCounter.RowCounterMapper.Counters.DELETE_FAMILY_VERSION).getValue();

assertEquals(rowCount, actualRowCount);
assertEquals(rowsWithDeleteMarkersCount, actualRowsWithDeleteMarkersCount);
assertEquals(deleteCount, actualDeleteCount);
assertEquals(deleteColumnCount, actualDeleteColumnCount);
assertEquals(deleteFamilyCount, actualDeleteFamilyCount);
assertEquals(deleteFamilyVersionCount, actualDeleteFamilyVersionCount);
}

private void assertUsageContent(String usage) {
assertTrue(usage
.contains("usage: hbase rowcounter " + "<tablename> [options] [<column1> <column2>...]"));
Expand Down