Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ protected abstract <T extends BaseRecord> BufferedReader getReader(
* @param path Path of the record to write.
* @return Writer for the record.
*/
protected abstract <T extends BaseRecord> BufferedWriter getWriter(
@VisibleForTesting
public abstract <T extends BaseRecord> BufferedWriter getWriter(
String path);

/**
Expand Down Expand Up @@ -348,25 +349,18 @@ public <T extends BaseRecord> boolean putAll(
for (Entry<String, T> entry : toWrite.entrySet()) {
String recordPath = entry.getKey();
String recordPathTemp = recordPath + "." + now() + TMP_MARK;
BufferedWriter writer = getWriter(recordPathTemp);
try {
boolean recordWrittenSuccessfully = true;
try (BufferedWriter writer = getWriter(recordPathTemp)) {
T record = entry.getValue();
String line = serializeString(record);
writer.write(line);
} catch (IOException e) {
LOG.error("Cannot write {}", recordPathTemp, e);
recordWrittenSuccessfully = false;
success = false;
} finally {
if (writer != null) {
try {
writer.close();
} catch (IOException e) {
LOG.error("Cannot close the writer for {}", recordPathTemp, e);
}
}
}
// Commit
if (!rename(recordPathTemp, recordPath)) {
if (recordWrittenSuccessfully && !rename(recordPathTemp, recordPath)) {
LOG.error("Failed committing record into {}", recordPath);
success = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.List;

import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.slf4j.Logger;
Expand Down Expand Up @@ -125,7 +126,8 @@ protected <T extends BaseRecord> BufferedReader getReader(String filename) {
}

@Override
protected <T extends BaseRecord> BufferedWriter getWriter(String filename) {
@VisibleForTesting
public <T extends BaseRecord> BufferedWriter getWriter(String filename) {
BufferedWriter writer = null;
try {
LOG.debug("Writing file: {}", filename);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@
import java.util.Collections;
import java.util.List;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
Expand Down Expand Up @@ -82,17 +83,8 @@ protected boolean mkdir(String path) {
@Override
protected boolean rename(String src, String dst) {
try {
if (fs instanceof DistributedFileSystem) {
DistributedFileSystem dfs = (DistributedFileSystem)fs;
dfs.rename(new Path(src), new Path(dst), Options.Rename.OVERWRITE);
return true;
} else {
// Replace should be atomic but not available
if (fs.exists(new Path(dst))) {
fs.delete(new Path(dst), true);
}
return fs.rename(new Path(src), new Path(dst));
}
FileUtil.rename(fs, new Path(src), new Path(dst), Options.Rename.OVERWRITE);
return true;
} catch (Exception e) {
LOG.error("Cannot rename {} to {}", src, dst, e);
return false;
Expand Down Expand Up @@ -148,7 +140,8 @@ protected <T extends BaseRecord> BufferedReader getReader(String pathName) {
}

@Override
protected <T extends BaseRecord> BufferedWriter getWriter(String pathName) {
@VisibleForTesting
public <T extends BaseRecord> BufferedWriter getWriter(String pathName) {
BufferedWriter writer = null;
Path path = new Path(pathName);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,25 @@ public <T extends BaseRecord> void testInsert(
assertEquals(11, records2.size());
}

public <T extends BaseRecord> void testInsertWithErrorDuringWrite(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To test my above comment, you would need two iterations of the loop where the first is a failure and the second is a success. The test passes despite the issue because there is only a single write.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this test doesn't capture the error I'd made. I tried changing the test to do a failure and then a success but I can't get mockito to work right.
Since I'm working with a spy I need to use the doThrow().when(...) syntax. However, to have a change of behavior I believe mock wants me to use when(...).doThrow().doCallRealMethod().

@mkuchenbecker If it is okay, I'll leave the test as it is. It exposes the original bug in the code and validate that my patch address it.

StateStoreDriver driver, Class<T> recordClass)
throws IllegalArgumentException, IllegalAccessException, IOException {

assertTrue(driver.removeAll(recordClass));
QueryResult<T> queryResult0 = driver.get(recordClass);
List<T> records0 = queryResult0.getRecords();
assertTrue(records0.isEmpty());

// Insert single
BaseRecord record = generateFakeRecord(recordClass);
driver.put(record, true, false);

// Verify that no record was inserted.
QueryResult<T> queryResult1 = driver.get(recordClass);
List<T> records1 = queryResult1.getRecords();
assertEquals(0, records1.size());
}

public <T extends BaseRecord> void testFetchErrors(StateStoreDriver driver,
Class<T> clazz) throws IllegalAccessException, IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,26 @@
*/
package org.apache.hadoop.hdfs.server.federation.store.driver;

import java.io.BufferedWriter;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils;
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileBaseImpl;
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileSystemImpl;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.stubbing.Answer;

import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;


/**
* Test the FileSystem (e.g., HDFS) implementation of the State Store driver.
Expand Down Expand Up @@ -91,4 +101,18 @@ public void testMetrics()
throws IllegalArgumentException, IllegalAccessException, IOException {
testMetrics(getStateStoreDriver());
}

@Test
public void testInsertWithErrorDuringWrite()
throws IllegalArgumentException, IllegalAccessException, IOException {
StateStoreFileBaseImpl driver = spy((StateStoreFileBaseImpl)getStateStoreDriver());
doAnswer((Answer<BufferedWriter>) a -> {
BufferedWriter writer = (BufferedWriter) a.callRealMethod();
BufferedWriter spyWriter = spy(writer);
doThrow(IOException.class).when(spyWriter).write(any(String.class));
return spyWriter;
}).when(driver).getWriter(any());

testInsertWithErrorDuringWrite(driver, MembershipState.class);
}
}