Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
Expand All @@ -50,7 +50,6 @@

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;


/**
* A job with a a map and reduce phase to count cells in a table.
* The counter lists the following stats for a given table:
Expand All @@ -59,8 +58,11 @@
* 2. Total number of CFs across all rows
* 3. Total qualifiers across all rows
* 4. Total occurrence of each CF
* 5. Total occurrence of each qualifier
* 5. Total occurrence of each qualifier
* 6. Total number of versions of each qualifier.
* 7. Total size of serialized cells of each CF.
* 8. Total size of serialized cells of each qualifier.
* 9. Total size of serialized cells across all rows.
* </pre>
*
* The cellcounter can take optional parameters to use a user
Expand All @@ -86,13 +88,14 @@ public class CellCounter extends Configured implements Tool {
* Mapper that runs the count.
*/
static class CellCounterMapper
extends TableMapper<Text, IntWritable> {
extends TableMapper<Text, LongWritable> {
/**
* Counter enumeration to count the actual rows.
*/
public static enum Counters {
ROWS,
CELLS
CELLS,
SIZE
}

private Configuration conf;
Expand Down Expand Up @@ -143,34 +146,41 @@ public void map(ImmutableBytesWritable row, Result values,
currentFamily = null;
currentQualifier = null;
context.getCounter(Counters.ROWS).increment(1);
context.write(new Text("Total ROWS"), new IntWritable(1));
context.write(new Text("Total ROWS"), new LongWritable(1));
}
if (!values.isEmpty()) {
int cellCount = 0;
for (Cell value : values.listCells()) {
cellCount++;
long size = value.getSerializedSize();
if (currentFamily == null || !CellUtil.matchingFamily(value, currentFamily)) {
currentFamily = CellUtil.cloneFamily(value);
currentFamilyName = Bytes.toStringBinary(currentFamily);
currentQualifier = null;
context.getCounter("CF", currentFamilyName).increment(1);
if (1 == context.getCounter("CF", currentFamilyName).getValue()) {
context.write(new Text("Total Families Across all Rows"), new IntWritable(1));
context.write(new Text(currentFamily), new IntWritable(1));
context.write(new Text("Total Families Across all Rows"), new LongWritable(1));
context.write(new Text(currentFamily), new LongWritable(1));
}
context.getCounter(Counters.SIZE).increment(size);
context.write(new Text("Total SIZE"), new LongWritable(size));
context.getCounter("CF", currentFamilyName + "_Size").increment(size);
context.write(new Text(currentFamilyName + "_Size"), new LongWritable(size));
}
if (currentQualifier == null || !CellUtil.matchingQualifier(value, currentQualifier)) {
if (currentQualifier == null || !CellUtil.matchingQualifier(value, currentQualifier)){
currentQualifier = CellUtil.cloneQualifier(value);
currentQualifierName = currentFamilyName + separator +
Bytes.toStringBinary(currentQualifier);
currentRowQualifierName = currentRowKey + separator + currentQualifierName;

context.write(new Text("Total Qualifiers across all Rows"),
new IntWritable(1));
context.write(new Text(currentQualifierName), new IntWritable(1));
new LongWritable(1));
context.write(new Text(currentQualifierName), new LongWritable(1));
context.getCounter("Q", currentQualifierName + "_Size").increment(size);
context.write(new Text(currentQualifierName + "_Size"), new LongWritable(size));
}
// Increment versions
context.write(new Text(currentRowQualifierName + "_Versions"), new IntWritable(1));
context.write(new Text(currentRowQualifierName + "_Versions"), new LongWritable(1));
}
context.getCounter(Counters.CELLS).increment(cellCount);
}
Expand All @@ -181,20 +191,20 @@ public void map(ImmutableBytesWritable row, Result values,
}
}

static class IntSumReducer<Key> extends Reducer<Key, IntWritable,
Key, IntWritable> {
static class LongSumReducer<Key> extends Reducer<Key, LongWritable, Key, LongWritable> {

private LongWritable result = new LongWritable();

private IntWritable result = new IntWritable();
public void reduce(Key key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
public void reduce(Key key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException {
long sum = 0;
for (LongWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}

}

/**
Expand All @@ -217,13 +227,13 @@ public static Job createSubmittableJob(Configuration conf, String[] args)
TableMapReduceUtil.initTableMapperJob(tableName, scan,
CellCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setOutputValueClass(LongWritable.class);
FileOutputFormat.setOutputPath(job, outputDir);
job.setReducerClass(IntSumReducer.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(LongSumReducer.class);
job.setCombinerClass(LongSumReducer.class);
return job;
}

Expand Down