Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public class ImportTsv extends Configured implements Tool {
// If true, bad lines are logged to stderr. Default: false.
public final static String LOG_BAD_LINES_CONF_KEY = "importtsv.log.bad.lines";
public final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines";
public final static String SKIP_EMPTY_COLUMNS = "importtsv.skip.empty.columns";
public final static String COLUMNS_CONF_KEY = "importtsv.columns";
public final static String SEPARATOR_CONF_KEY = "importtsv.separator";
public final static String ATTRIBUTE_SEPERATOR_CONF_KEY = "attributes.seperator";
Expand Down Expand Up @@ -685,6 +686,7 @@ private static void usage(final String errorMsg) {
" table. If table does not exist, it is created but deleted in the end.\n" +
" -D" + SKIP_LINES_CONF_KEY + "=false - fail if encountering an invalid line\n" +
" -D" + LOG_BAD_LINES_CONF_KEY + "=true - logs invalid lines to stderr\n" +
" -D" + SKIP_EMPTY_COLUMNS + "=false - If true then skip empty columns in bulk import\n" +
" '-D" + SEPARATOR_CONF_KEY + "=|' - eg separate on pipes instead of tabs\n" +
" -D" + TIMESTAMP_CONF_KEY + "=currentTimeAsLong - use the specified timestamp for the import\n" +
" -D" + MAPPER_CONF_KEY + "=my.Mapper - A user-defined Mapper to use instead of " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public class TsvImporterMapper

/** Should skip bad lines */
private boolean skipBadLines;
/** Should skip empty columns*/
private boolean skipEmptyColumns;
private Counter badLineCount;
private boolean logBadLines;

Expand Down Expand Up @@ -133,6 +135,8 @@ protected void doSetup(Context context) {
// configuration.
ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0);

skipEmptyColumns = context.getConfiguration().getBoolean(
ImportTsv.SKIP_EMPTY_COLUMNS, false);
skipBadLines = context.getConfiguration().getBoolean(
ImportTsv.SKIP_LINES_CONF_KEY, true);
badLineCount = context.getCounter("ImportTsv", "Bad Lines");
Expand Down Expand Up @@ -178,7 +182,8 @@ public void map(LongWritable offset, Text value,
for (int i = 0; i < parsed.getColumnCount(); i++) {
if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
|| i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()
|| i == parser.getCellTTLColumnIndex()) {
|| i == parser.getCellTTLColumnIndex() || (skipEmptyColumns
&& parsed.getColumnLength(i) == 0)) {
continue;
}
populatePut(lineBytes, parsed, put, i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,19 @@ public void testTsvImporterTextMapperWithInvalidData() throws Exception {
doMROnTableTest(util, tn, FAMILY, data, args, 1, 4);
util.deleteTable(tn);
}

@Test
public void testSkipEmptyColumns() throws Exception {
Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B");
args.put(ImportTsv.SEPARATOR_CONF_KEY, ",");
args.put(ImportTsv.SKIP_EMPTY_COLUMNS, "true");
// 2 Rows of data as input. Both rows are valid and only 3 columns are no-empty among 4
String data = "KEY,1234,VALUE1,VALUE2\nKEY,1235,,VALUE2\n";
doMROnTableTest(util, tn, FAMILY, data, args, 1, 3);
util.deleteTable(tn);
}

private Tool doMROnTableTest(String data, int valueMultiplier) throws Exception {
return doMROnTableTest(util, tn, FAMILY, data, args, valueMultiplier,-1);
Expand Down