Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
Expand Down Expand Up @@ -156,6 +157,8 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
protected static byte[] RECORD_ID_QUALIFIER = Bytes.toBytes("r");
protected static byte[] TARGET_URI_QUALIFIER = Bytes.toBytes("u");

private static final int VERIFICATION_READ_RETRIES = 10;

public static enum Counts {
REFERENCED, UNREFERENCED, CORRUPT
}
Expand Down Expand Up @@ -516,8 +519,6 @@ int run(Path warcFileInput, Path outputDir)
job.setOutputValueClass(BytesWritable.class);
TableMapReduceUtil.addDependencyJars(job);

LOG.info("Submitting job." +
" This will take time proportional to the number of input files, please be patient.");
boolean success = job.waitForCompletion(true);
if (!success) {
LOG.error("Failure during job " + job.getJobID());
Expand Down Expand Up @@ -551,18 +552,20 @@ public static class LoaderMapper

Configuration conf;
Connection conn;
Table table;
BufferedMutator mutator;

@Override
protected void setup(Context context) throws IOException, InterruptedException {
conn = ConnectionFactory.createConnection(context.getConfiguration());
table = conn.getTable(getTablename(conn.getConfiguration()));
conf = context.getConfiguration();
conn = ConnectionFactory.createConnection(conf);
mutator = conn.getBufferedMutator(getTablename(conf));
mutator.setWriteBufferPeriodicFlush(10 * 1000); // default is 1 sec, increase to 10
}

@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
try {
table.close();
mutator.close();
} catch (Exception e) {
LOG.warn("Exception closing Table", e);
}
Expand All @@ -582,7 +585,7 @@ protected void map(LongWritable key, WARCWritable value, Context output)
if (warcHeader.getRecordType().equals("response") && targetURI != null) {
String contentType = warcHeader.getField("WARC-Identified-Payload-Type");
if (contentType != null) {
LOG.debug("Processing record id=" + recordID + ", targetURI=\"" + targetURI + "\"");
LOG.info("Processing uri=\"" + targetURI + "\", id=" + recordID);
long now = EnvironmentEdgeManager.currentTime();

// Make row key
Expand Down Expand Up @@ -623,7 +626,7 @@ protected void map(LongWritable key, WARCWritable value, Context output)
if (ipAddr != null) {
put.addColumn(INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER, now, Bytes.toBytes(ipAddr));
}
table.put(put);
mutator.mutate(put);

// Write records out for later verification, one per HBase field except for the
// content record, which will be verified by CRC64.
Expand Down Expand Up @@ -651,39 +654,43 @@ protected void map(LongWritable key, WARCWritable value, Context output)
private byte[] rowKeyFromTargetURI(String targetUri)
throws URISyntaxException, IllegalArgumentException {
URI uri = new URI(targetUri);
StringBuffer sb = new StringBuffer();
// Ignore the scheme
// Reverse the components of the hostname
String reversedHost;
if (uri.getHost() != null) {
StringBuffer sb = new StringBuffer();
String[] hostComponents = uri.getHost().split("\\.");
for (int i = hostComponents.length - 1; i >= 0; i--) {
sb.append(hostComponents[i]);
if (i != 0) {
sb.append('.');
}
}
reversedHost = sb.toString();
} else {
throw new IllegalArgumentException("URI is missing host component");
}
// Port
if (uri.getPort() != -1) {
StringBuffer sb = new StringBuffer();
sb.append(reversedHost);
if (uri.getPort() >= 0) {
sb.append(':');
sb.append(uri.getPort());
}
if (uri.getRawPath() != null) {
sb.append(uri.getRawPath());
if (uri.getPath() != null) {
sb.append('/');
sb.append(uri.getPath());
}
if (uri.getRawQuery() != null) {
if (uri.getQuery() != null) {
sb.append('?');
sb.append(uri.getRawQuery());
sb.append(uri.getQuery());
}
if (uri.getRawFragment() != null) {
if (uri.getFragment() != null) {
sb.append('#');
sb.append(uri.getRawFragment());
sb.append(uri.getFragment());
}
// Constrain the key size to the maximum allowed row key length
if (sb.length() > HConstants.MAX_ROW_LENGTH) {
sb.setLength(HConstants.MAX_ROW_LENGTH);
if (sb.length() > HConstants.MAX_ROW_LENGTH) {
throw new IllegalArgumentException("Key would be too large (length=" + sb.length() +
", limit=" + HConstants.MAX_ROW_LENGTH);
}
return Bytes.toBytes(sb.toString());
}
Expand Down Expand Up @@ -767,73 +774,92 @@ protected void map(HBaseKeyWritable key, BytesWritable value, Context output)
byte[] qualifier = Bytes.copy(key.getQualifierArray(), key.getQualifierOffset(),
key.getQualifierLength());
long ts = key.getTimestamp();
int retries = VERIFICATION_READ_RETRIES;

if (Bytes.equals(INFO_FAMILY_NAME, family) &&
Bytes.equals(CRC_QUALIFIER, qualifier)) {
while (true) {

long expectedCRC64 = Bytes.toLong(value.getBytes(), 0, value.getLength());
if (Bytes.equals(INFO_FAMILY_NAME, family) &&
Bytes.equals(CRC_QUALIFIER, qualifier)) {

Result result =
table.get(new Get(row)
long expectedCRC64 = Bytes.toLong(value.getBytes(), 0, value.getLength());
Result result = table.get(new Get(row)
.addColumn(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER)
.addColumn(INFO_FAMILY_NAME, CRC_QUALIFIER)
.setTimestamp(ts));

byte[] content = result.getValue(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER);
if (content == null) {
LOG.info("Row " + Bytes.toStringBinary(row) + ": missing content");
output.getCounter(Counts.UNREFERENCED).increment(1);
return;
} else {
CRC64 crc = new CRC64();
crc.update(content);
if (crc.getValue() != expectedCRC64) {
LOG.info("Row " + Bytes.toStringBinary(row) + ": corrupt content");
byte[] content = result.getValue(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER);
if (content == null) {
if (retries-- > 0) {
continue;
}
LOG.error("Row " + Bytes.toStringBinary(row) + ": missing content");
output.getCounter(Counts.UNREFERENCED).increment(1);
return;
} else {
CRC64 crc = new CRC64();
crc.update(content);
if (crc.getValue() != expectedCRC64) {
if (retries-- > 0) {
continue;
}
LOG.error("Row " + Bytes.toStringBinary(row) + ": corrupt content");
output.getCounter(Counts.CORRUPT).increment(1);
return;
}
}
byte[] crc = result.getValue(INFO_FAMILY_NAME, CRC_QUALIFIER);
if (crc == null) {
if (retries-- > 0) {
continue;
}
LOG.error("Row " + Bytes.toStringBinary(row) + ": missing i:c");
output.getCounter(Counts.UNREFERENCED).increment(1);
return;
}
if (Bytes.toLong(crc) != expectedCRC64) {
if (retries-- > 0) {
continue;
}
LOG.error("Row " + Bytes.toStringBinary(row) + ": i:c mismatch");
output.getCounter(Counts.CORRUPT).increment(1);
return;
}
}
byte[] crc = result.getValue(INFO_FAMILY_NAME, CRC_QUALIFIER);
if (crc == null) {
LOG.info("Row " + Bytes.toStringBinary(row) + ": missing i:c");
output.getCounter(Counts.UNREFERENCED).increment(1);
return;
}
if (Bytes.toLong(crc) != expectedCRC64) {
LOG.info("Row " + Bytes.toStringBinary(row) + ": i:c mismatch");
output.getCounter(Counts.CORRUPT).increment(1);
return;
}

} else {
} else {

Result result =
table.get(new Get(row)
Result result = table.get(new Get(row)
.addColumn(family, qualifier)
.setTimestamp(ts));
byte[] bytes = result.getValue(family, qualifier);
if (bytes == null) {
if (retries-- > 0) {
continue;
}
LOG.error("Row " + Bytes.toStringBinary(row) + ": missing " +
Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier));
output.getCounter(Counts.UNREFERENCED).increment(1);
return;
}
if (!Bytes.equals(bytes, 0, bytes.length, value.getBytes(), 0, value.getLength())) {
if (retries-- > 0) {
continue;
}
LOG.error("Row " + Bytes.toStringBinary(row) + ": " +
Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier) +
" mismatch");
output.getCounter(Counts.CORRUPT).increment(1);
return;
}

byte[] bytes = result.getValue(family, qualifier);
if (bytes == null) {
LOG.info("Row " + Bytes.toStringBinary(row) + ": missing " +
Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier));
output.getCounter(Counts.UNREFERENCED).increment(1);
return;
}
if (!Bytes.equals(bytes, 0, bytes.length, value.getBytes(), 0, value.getLength())) {
LOG.info("Row " + Bytes.toStringBinary(row) + ": " +
Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier) +
" mismatch");
output.getCounter(Counts.CORRUPT).increment(1);
return;
}

}
// If we fell through to here all verification checks have succeeded, potentially after
// retries, and we must exit the while loop.
output.getCounter(Counts.REFERENCED).increment(1);
break;

output.getCounter(Counts.REFERENCED).increment(1);
}
}

}

}

}