Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
*/
package org.apache.hadoop.hbase.regionserver.storefiletracker;

import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.zip.CRC32;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -29,9 +31,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;

import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList;

/**
Expand All @@ -42,18 +41,27 @@
* other file.
* <p/>
* So in this way, we could avoid listing when we want to load the store file list file.
* <p/>
* To prevent loading partial file, we use the first 4 bytes as file length, and also append a 4
* bytes crc32 checksum at the end. This is because the protobuf message parser sometimes can return
* without error on partial bytes if you stop at some special points, but the return message will
* have incorrect field value. We should try our best to prevent this happens because loading an
* incorrect store file list file usually leads to data loss.
*/
@InterfaceAudience.Private
class StoreFileListFile {

private static final Logger LOG = LoggerFactory.getLogger(StoreFileListFile.class);

private static final String TRACK_FILE_DIR = ".filelist";
static final String TRACK_FILE_DIR = ".filelist";

private static final String TRACK_FILE = "f1";

private static final String TRACK_FILE_ROTATE = "f2";

// 16 MB, which is big enough for a tracker file
private static final int MAX_FILE_SIZE = 16 * 1024 * 1024;

private final StoreContext ctx;

private final Path trackFileDir;
Expand All @@ -74,16 +82,26 @@ class StoreFileListFile {

private StoreFileList load(Path path) throws IOException {
FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
byte[] bytes;
byte[] data;
int expectedChecksum;
try (FSDataInputStream in = fs.open(path)) {
bytes = ByteStreams.toByteArray(in);
int length = in.readInt();
if (length <= 0 || length > MAX_FILE_SIZE) {
throw new IOException("Invalid file length " + length +
", either less than 0 or greater then max allowed size " + MAX_FILE_SIZE);
}
data = new byte[length];
in.readFully(data);
expectedChecksum = in.readInt();
}
// Read all the bytes and then parse it, so we will only throw InvalidProtocolBufferException
// here. This is very important for upper layer to determine whether this is the normal case,
// where the file does not exist or is incomplete. If there is another type of exception, the
// upper layer should throw it out instead of just ignoring it, otherwise it will lead to data
// loss.
return StoreFileList.parseFrom(bytes);
CRC32 crc32 = new CRC32();
crc32.update(data);
int calculatedChecksum = (int) crc32.getValue();
if (expectedChecksum != calculatedChecksum) {
throw new IOException(
"Checksum mismatch, expected " + expectedChecksum + ", actual " + calculatedChecksum);
}
return StoreFileList.parseFrom(data);
}

private int select(StoreFileList[] lists) {
Expand All @@ -101,9 +119,9 @@ StoreFileList load() throws IOException {
for (int i = 0; i < 2; i++) {
try {
lists[i] = load(trackFiles[i]);
} catch (FileNotFoundException | InvalidProtocolBufferException e) {
} catch (FileNotFoundException | EOFException e) {
// this is normal case, so use info and do not log stacktrace
LOG.info("Failed to load track file {}: {}", trackFiles[i], e);
LOG.info("Failed to load track file {}: {}", trackFiles[i], e.toString());
}
}
int winnerIndex = select(lists);
Expand All @@ -124,10 +142,17 @@ void update(StoreFileList.Builder builder) throws IOException {
// we need to call load first to load the prevTimestamp and also the next file
load();
}
FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
long timestamp = Math.max(prevTimestamp + 1, EnvironmentEdgeManager.currentTime());
byte[] actualData = builder.setTimestamp(timestamp).build().toByteArray();
CRC32 crc32 = new CRC32();
crc32.update(actualData);
int checksum = (int) crc32.getValue();
// 4 bytes length at the beginning, plus 4 bytes checksum
FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
try (FSDataOutputStream out = fs.create(trackFiles[nextTrackFile], true)) {
builder.setTimestamp(timestamp).build().writeTo(out);
out.writeInt(actualData.length);
out.write(actualData);
out.writeInt(checksum);
}
// record timestamp
prevTimestamp = timestamp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.regionserver.storefiletracker.TestStoreFileTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerForTest;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -96,7 +96,7 @@ public void testCreateWithTrackImpl() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
TableDescriptor htd = MasterProcedureTestingUtility.createHTD(tableName, F1);
String trackerName = TestStoreFileTracker.class.getName();
String trackerName = StoreFileTrackerForTest.class.getName();
htd = TableDescriptorBuilder.newBuilder(htd).setValue(TRACKER_IMPL, trackerName).build();
RegionInfo[] regions = ModifyRegionUtils.createRegionInfos(htd, null);
long procId = ProcedureTestingUtility.submitAndWait(procExec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.regionserver.storefiletracker.TestStoreFileTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerForTest;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -86,13 +86,13 @@ public static void afterClass() throws Exception {

@Before
public void setup(){
TestStoreFileTracker.clear();
StoreFileTrackerForTest.clear();
}

private TableName createTable(byte[] splitKey) throws IOException {
TableDescriptor td = TableDescriptorBuilder.newBuilder(name.getTableName())
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY_NAME))
.setValue(TRACKER_IMPL, TestStoreFileTracker.class.getName()).build();
.setValue(TRACKER_IMPL, StoreFileTrackerForTest.class.getName()).build();
if (splitKey != null) {
TEST_UTIL.getAdmin().createTable(td, new byte[][] { splitKey });
} else {
Expand Down Expand Up @@ -241,7 +241,8 @@ private void validateDaughterRegionsFiles(HRegion region, String orignalFileName

private void verifyFilesAreTracked(Path regionDir, FileSystem fs) throws Exception {
for (FileStatus f : fs.listStatus(new Path(regionDir, FAMILY_NAME_STR))) {
assertTrue(TestStoreFileTracker.tracked(regionDir.getName(), FAMILY_NAME_STR, f.getPath()));
assertTrue(
StoreFileTrackerForTest.tracked(regionDir.getName(), FAMILY_NAME_STR, f.getPath()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestStoreFileTracker extends DefaultStoreFileTracker {
public class StoreFileTrackerForTest extends DefaultStoreFileTracker {

private static final Logger LOG = LoggerFactory.getLogger(TestStoreFileTracker.class);
private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerForTest.class);
private static ConcurrentMap<String, BlockingQueue<StoreFileInfo>> trackedFiles =
new ConcurrentHashMap<>();
private String storeId;

public TestStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
public StoreFileTrackerForTest(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
super(conf, isPrimaryReplica, ctx);
if (ctx != null && ctx.getRegionFileSystem() != null) {
this.storeId = ctx.getRegionInfo().getEncodedName() + "-" + ctx.getFamily().getNameAsString();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.storefiletracker;

import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.StoreContext;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;

import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList;

@Category({ RegionServerTests.class, SmallTests.class })
public class TestStoreFileListFile {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestStoreFileListFile.class);

private static final Logger LOG = LoggerFactory.getLogger(TestStoreFileListFile.class);

private static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil();

private Path testDir;

private StoreFileListFile storeFileListFile;

@Rule
public TestName name = new TestName();

@Before
public void setUp() throws IOException {
testDir = UTIL.getDataTestDir(name.getMethodName());
HRegionFileSystem hfs = mock(HRegionFileSystem.class);
when(hfs.getFileSystem()).thenReturn(FileSystem.get(UTIL.getConfiguration()));
StoreContext ctx = StoreContext.getBuilder().withFamilyStoreDirectoryPath(testDir)
.withRegionFileSystem(hfs).build();
storeFileListFile = new StoreFileListFile(ctx);
}

@AfterClass
public static void tearDown() {
UTIL.cleanupTestDir();
}

@Test
public void testEmptyLoad() throws IOException {
assertNull(storeFileListFile.load());
}

private FileStatus getOnlyTrackerFile(FileSystem fs) throws IOException {
return fs.listStatus(new Path(testDir, StoreFileListFile.TRACK_FILE_DIR))[0];
}

private byte[] readAll(FileSystem fs, Path file) throws IOException {
try (FSDataInputStream in = fs.open(file)) {
return ByteStreams.toByteArray(in);
}
}

private void write(FileSystem fs, Path file, byte[] buf, int off, int len) throws IOException {
try (FSDataOutputStream out = fs.create(file, true)) {
out.write(buf, off, len);
}
}

@Test
public void testLoadPartial() throws IOException {
StoreFileList.Builder builder = StoreFileList.newBuilder();
storeFileListFile.update(builder);
FileSystem fs = FileSystem.get(UTIL.getConfiguration());
FileStatus trackerFileStatus = getOnlyTrackerFile(fs);
// truncate it so we do not have enough data
LOG.info("Truncate file {} with size {} to {}", trackerFileStatus.getPath(),
trackerFileStatus.getLen(), trackerFileStatus.getLen() / 2);
byte[] content = readAll(fs, trackerFileStatus.getPath());
write(fs, trackerFileStatus.getPath(), content, 0, content.length / 2);
assertNull(storeFileListFile.load());
}

private void writeInt(byte[] buf, int off, int value) {
byte[] b = Bytes.toBytes(value);
for (int i = 0; i < 4; i++) {
buf[off + i] = b[i];
}
}

@Test
public void testZeroFileLength() throws IOException {
StoreFileList.Builder builder = StoreFileList.newBuilder();
storeFileListFile.update(builder);
FileSystem fs = FileSystem.get(UTIL.getConfiguration());
FileStatus trackerFileStatus = getOnlyTrackerFile(fs);
// write a zero length
byte[] content = readAll(fs, trackerFileStatus.getPath());
writeInt(content, 0, 0);
write(fs, trackerFileStatus.getPath(), content, 0, content.length);
assertThrows(IOException.class, () -> storeFileListFile.load());
}

@Test
public void testBigFileLength() throws IOException {
StoreFileList.Builder builder = StoreFileList.newBuilder();
storeFileListFile.update(builder);
FileSystem fs = FileSystem.get(UTIL.getConfiguration());
FileStatus trackerFileStatus = getOnlyTrackerFile(fs);
// write a large length
byte[] content = readAll(fs, trackerFileStatus.getPath());
writeInt(content, 0, 128 * 1024 * 1024);
write(fs, trackerFileStatus.getPath(), content, 0, content.length);
assertThrows(IOException.class, () -> storeFileListFile.load());
}

@Test
public void testChecksumMismatch() throws IOException {
StoreFileList.Builder builder = StoreFileList.newBuilder();
storeFileListFile.update(builder);
FileSystem fs = FileSystem.get(UTIL.getConfiguration());
FileStatus trackerFileStatus = getOnlyTrackerFile(fs);
// flip one byte
byte[] content = readAll(fs, trackerFileStatus.getPath());
content[5] = (byte) ~content[5];
write(fs, trackerFileStatus.getPath(), content, 0, content.length);
assertThrows(IOException.class, () -> storeFileListFile.load());
}
}