Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ private AsyncFSOutputHelper() {
* Create {@link FanOutOneBlockAsyncDFSOutput} for {@link DistributedFileSystem}, and a simple
* implementation for other {@link FileSystem} which wraps around a {@link FSDataOutputStream}.
*/
public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite,
public static AsyncFSOutput createOutput(FileSystem fs, Path f, Path oldPath, boolean overwrite,
boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup,
Class<? extends Channel> channelClass)
throws IOException, CommonFSUtils.StreamLacksCapabilityException {
if (fs instanceof DistributedFileSystem) {
return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f,
return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f, oldPath,
overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass);
}
final FSDataOutputStream out;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
package org.apache.hadoop.hbase.io.asyncfs;

import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.READ_TIMEOUT;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.WRITER_IDLE;

Expand All @@ -44,6 +42,7 @@
import org.apache.hadoop.crypto.Encryptor;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hdfs.DFSClient;
Expand Down Expand Up @@ -346,7 +345,8 @@ private void setupReceiver(int timeoutMs) {
this.alloc = alloc;
this.buf = alloc.directBuffer(sendBufSizePRedictor.initialSize());
this.state = State.STREAMING;
setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT));
setupReceiver(conf.getInt(AbstractFSWAL.WAL_SYNC_TIMEOUT_MS,
AbstractFSWAL.DEFAULT_WAL_SYNC_TIMEOUT_MS));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE;
Expand All @@ -48,6 +47,7 @@
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hdfs.DFSClient;
Expand Down Expand Up @@ -123,8 +123,10 @@ private FanOutOneBlockAsyncDFSOutputHelper() {
}

public static final String ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = "hbase.fs.async.create.retries";

public static final String ASYNC_DFS_OUTPUT_SKIP_FAILED_DN = "hbase.fs.async.skipFailedDn";
public static final int DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = 10;
public static final boolean DEFAULT_ASYNC_DFS_OUTPUT_SKIP_FAILED_DN = false;

// use pooled allocator for performance.
private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT;

Expand Down Expand Up @@ -682,7 +684,8 @@ private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSC
DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
boolean connectToDnViaHostname =
conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT);
int timeoutMs = conf.getInt(AbstractFSWAL.WAL_SYNC_TIMEOUT_MS,
AbstractFSWAL.DEFAULT_WAL_SYNC_TIMEOUT_MS);
ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock());
blockCopy.setNumBytes(locatedBlock.getBlockSize());
ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder()
Expand Down Expand Up @@ -741,7 +744,7 @@ public NameNodeException(Throwable cause) {
}

private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
boolean overwrite, boolean createParent, short replication, long blockSize,
Path oldPath, boolean overwrite, boolean createParent, short replication, long blockSize,
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
Configuration conf = dfs.getConf();
FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
Expand All @@ -750,7 +753,28 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d
ClientProtocol namenode = client.getNamenode();
int createMaxRetries = conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES,
DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES);
boolean skipFailedDn = conf.getBoolean(ASYNC_DFS_OUTPUT_SKIP_FAILED_DN,
DEFAULT_ASYNC_DFS_OUTPUT_SKIP_FAILED_DN);
DatanodeInfo[] excludesNodes = EMPTY_DN_ARRAY;
if (skipFailedDn && oldPath != null) {
String oldPathStr = oldPath.toUri().getPath();
long len = namenode.getFileInfo(oldPathStr).getLen();
for(LocatedBlock block : namenode.getBlockLocations(oldPathStr, Math.max(0, len - 1), len)
.getLocatedBlocks()) {
for(DatanodeInfo dn : block.getLocations()) {
excludesNodes = ArrayUtils.add(excludesNodes, dn);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here it always adding nodes into exclude list but never check and remove even after the DN recovers, right? So it's possible that one day all DN nodes are excluded and the OutputStream will fail due to could only be replicated to 0 nodes?

Copy link
Contributor Author

@chenxu14 chenxu14 May 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for your review @carp84, there seems to be some difference opinions with HBASE-22301, i will fix the checkstyle first.

So it's possible that one day all DN nodes are excluded

excludesNodes are not a global variable, each FanOutOneBlockAsyncDFSOutput will use different instance, when new FanOutOneBlockAsyncDFSOutput created, it's Initial excludesNodes will be an empty array(code in FanOutOneBlockAsyncDFSOutputHelper#createOutput)

}
}
if (LOG.isDebugEnabled()) {
StringBuilder sb =
new StringBuilder("create new output because old wal sync failed, old path is: ");
sb.append(oldPathStr).append(", newPath excludesNodes are :");
for(DatanodeInfo info : excludesNodes) {
sb.append(info.getInfoAddr()).append(";");
}
LOG.debug(sb.toString());
}
}
for (int retry = 0;; retry++) {
HdfsFileStatus stat;
try {
Expand Down Expand Up @@ -838,14 +862,14 @@ public void operationComplete(Future<Channel> future) throws Exception {
* inside an {@link EventLoop}.
*/
public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f,
boolean overwrite, boolean createParent, short replication, long blockSize,
Path oldPath, boolean overwrite, boolean createParent, short replication, long blockSize,
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {

@Override
public FanOutOneBlockAsyncDFSOutput doCall(Path p)
throws IOException, UnresolvedLinkException {
return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
return createOutput(dfs, p.toUri().getPath(), oldPath, overwrite, createParent, replication,
blockSize, eventLoopGroup, channelClass);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.hbase.regionserver.wal.WALClosedException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -57,7 +58,8 @@ public class LogRoller extends HasThread implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(LogRoller.class);
private final ReentrantLock rollLock = new ReentrantLock();
private final AtomicBoolean rollLog = new AtomicBoolean(false);
private final ConcurrentHashMap<WAL, Boolean> walNeedsRoll = new ConcurrentHashMap<>();
private final ConcurrentHashMap<WAL, Pair<Boolean,Boolean>> walNeedsRoll =
new ConcurrentHashMap<>(); // wal -> <rollRequesting, syncFailed>
private final Server server;
protected final RegionServerServices services;
private volatile long lastrolltime = System.currentTimeMillis();
Expand All @@ -70,11 +72,17 @@ public class LogRoller extends HasThread implements Closeable {
private volatile boolean running = true;

public void addWAL(final WAL wal) {
if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) {
if (null == walNeedsRoll.putIfAbsent(wal,
new Pair<Boolean,Boolean>(Boolean.FALSE, Boolean.FALSE))) {
wal.registerWALActionsListener(new WALActionsListener() {
@Override
public void logRollRequested(WALActionsListener.RollRequestReason reason) {
walNeedsRoll.put(wal, Boolean.TRUE);
Pair<Boolean,Boolean> walInfo = new Pair<>(Boolean.TRUE, Boolean.FALSE);
if (reason.equals(WALActionsListener.RollRequestReason.SLOW_SYNC) ||
reason.equals(WALActionsListener.RollRequestReason.ERROR)) {
walInfo.setSecond(Boolean.TRUE);
}
walNeedsRoll.put(wal, walInfo);
// TODO logs will contend with each other here, replace with e.g. DelayedQueue
synchronized(rollLog) {
rollLog.set(true);
Expand All @@ -87,7 +95,7 @@ public void logRollRequested(WALActionsListener.RollRequestReason reason) {

public void requestRollAll() {
for (WAL wal : walNeedsRoll.keySet()) {
walNeedsRoll.put(wal, Boolean.TRUE);
walNeedsRoll.put(wal, new Pair<Boolean, Boolean>(Boolean.TRUE, Boolean.FALSE));
}
synchronized(rollLog) {
rollLog.set(true);
Expand Down Expand Up @@ -122,9 +130,9 @@ public void interrupt() {
*/
void checkLowReplication(long now) {
try {
for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
for (Entry<WAL, Pair<Boolean, Boolean>> entry : walNeedsRoll.entrySet()) {
WAL wal = entry.getKey();
boolean needRollAlready = entry.getValue();
boolean needRollAlready = entry.getValue().getFirst();
if (needRollAlready || !(wal instanceof AbstractFSWAL)) {
continue;
}
Expand Down Expand Up @@ -180,16 +188,18 @@ public void run() {
rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH
try {
this.lastrolltime = now;
for (Iterator<Entry<WAL, Boolean>> iter = walNeedsRoll.entrySet().iterator(); iter
.hasNext();) {
Entry<WAL, Boolean> entry = iter.next();
for (Iterator<Entry<WAL, Pair<Boolean,Boolean>>> iter
= walNeedsRoll.entrySet().iterator(); iter.hasNext();) {
Entry<WAL, Pair<Boolean,Boolean>> entry = iter.next();
final WAL wal = entry.getKey();
Pair<Boolean, Boolean> walInfo = entry.getValue();
boolean syncFailed = walInfo.getSecond().booleanValue();
// Force the roll if the logroll.period is elapsed or if a roll was requested.
// The returned value is an array of actual region names.
try {
final byte[][] regionsToFlush =
wal.rollWriter(periodic || entry.getValue().booleanValue());
walNeedsRoll.put(wal, Boolean.FALSE);
final byte[][] regionsToFlush = wal.rollWriter(periodic ||
walInfo.getFirst().booleanValue() || syncFailed, syncFailed);
walNeedsRoll.put(wal, new Pair<>(Boolean.FALSE, Boolean.FALSE));
if (regionsToFlush != null) {
for (byte[] r : regionsToFlush) {
scheduleFlush(r);
Expand Down Expand Up @@ -247,8 +257,8 @@ private void scheduleFlush(final byte [] encodedRegionName) {
* @return true if all WAL roll finished
*/
public boolean walRollFinished() {
for (boolean needRoll : walNeedsRoll.values()) {
if (needRoll) {
for (Pair<Boolean,Boolean> walInfo : walNeedsRoll.values()) {
if (walInfo.getFirst().booleanValue()) {
return false;
}
}
Expand All @@ -271,7 +281,7 @@ public void close() {
}

@VisibleForTesting
Map<WAL, Boolean> getWalNeedsRoll() {
Map<WAL, Pair<Boolean,Boolean>> getWalNeedsRoll() {
return this.walNeedsRoll;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
"hbase.regionserver.wal.slowsync.roll.interval.ms";
protected static final int DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS = 60 * 1000; // in ms, 1 minute

protected static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout";
protected static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min
public static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout";
public static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min

/**
* file system instance
Expand Down Expand Up @@ -535,7 +535,7 @@ public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyNam

@Override
public byte[][] rollWriter() throws FailedLogCloseException, IOException {
return rollWriter(false);
return rollWriter(false, false);
}

/**
Expand Down Expand Up @@ -814,22 +814,23 @@ private IOException convertInterruptedExceptionToIOException(final InterruptedEx
}

@Override
public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException {
public byte[][] rollWriter(boolean force, boolean syncFailed)
throws FailedLogCloseException, IOException {
rollWriterLock.lock();
try {
if (this.closed) {
throw new WALClosedException("WAL has been closed");
}
// Return if nothing to flush.
if (!force && this.writer != null && this.numEntries.get() <= 0) {
if (!force && !syncFailed && (this.writer != null && this.numEntries.get() <= 0)) {
return null;
}
byte[][] regionsToFlush = null;
try (TraceScope scope = TraceUtil.createTrace("FSHLog.rollWriter")) {
Path oldPath = getOldPath();
Path newPath = getNewPath();
// Any exception from here on is catastrophic, non-recoverable so we currently abort.
W nextWriter = this.createWriterInstance(newPath);
W nextWriter = this.createWriterInstance(newPath, syncFailed ? oldPath : null);
tellListenersAboutPreLogRoll(oldPath, newPath);
// NewPath could be equal to oldPath if replaceWriter fails.
newPath = replaceWriter(oldPath, newPath, nextWriter);
Expand Down Expand Up @@ -1125,7 +1126,7 @@ public abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, bool

protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException;

protected abstract W createWriterInstance(Path path)
protected abstract W createWriterInstance(Path path, Path oldPath)
throws IOException, CommonFSUtils.StreamLacksCapabilityException;

protected abstract void doReplaceWriter(Path oldPath, Path newPath, W nextWriter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,16 +154,16 @@ private boolean initializeCompressionContext(Configuration conf, Path path) thro
return doCompress;
}

public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable,
long blocksize) throws IOException, StreamLacksCapabilityException {
public void init(FileSystem fs, Path path, Path oldPath, Configuration conf,
boolean overwritable, long blocksize) throws IOException, StreamLacksCapabilityException {
this.conf = conf;
boolean doCompress = initializeCompressionContext(conf, path);
this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
int bufferSize = FSUtils.getDefaultBufferSize(fs);
short replication = (short) conf.getInt("hbase.regionserver.hlog.replication",
FSUtils.getDefaultReplication(fs, path));

initOutput(fs, path, overwritable, bufferSize, replication, blocksize);
initOutput(fs, path, oldPath, overwritable, bufferSize, replication, blocksize);

boolean doTagCompress = doCompress
&& conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
Expand Down Expand Up @@ -239,8 +239,9 @@ protected void writeWALTrailer() {
}
}

protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
short replication, long blockSize) throws IOException, StreamLacksCapabilityException;
protected abstract void initOutput(FileSystem fs, Path path, Path oldPath,
boolean overwritable, int bufferSize, short replication, long blockSize)
throws IOException, StreamLacksCapabilityException;

/**
* return the file length after written.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -660,14 +660,15 @@ public void sync(long txid) throws IOException {
}
}

protected final AsyncWriter createAsyncWriter(FileSystem fs, Path path) throws IOException {
return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, this.blocksize,
protected final AsyncWriter createAsyncWriter(FileSystem fs, Path path, Path oldPath)
throws IOException {
return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, oldPath, false, this.blocksize,
eventLoopGroup, channelClass);
}

@Override
protected AsyncWriter createWriterInstance(Path path) throws IOException {
return createAsyncWriter(fs, path);
protected AsyncWriter createWriterInstance(Path path, Path oldPath) throws IOException {
return createAsyncWriter(fs, path, oldPath);
}

private void waitForSafePoint() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,9 @@ public AsyncFSOutput getOutput() {
}

@Override
protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
protected void initOutput(FileSystem fs, Path path, Path oldPath, boolean overwritable, int bufferSize,
short replication, long blockSize) throws IOException, StreamLacksCapabilityException {
this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication,
this.output = AsyncFSOutputHelper.createOutput(fs, path, oldPath, overwritable, false, replication,
blockSize, eventLoopGroup, channelClass);
this.asyncOutputWrapper = new OutputStreamWrapper(output);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ protected AsyncWriter createCombinedAsyncWriter(AsyncWriter localWriter,
}

@Override
protected AsyncWriter createWriterInstance(Path path) throws IOException {
AsyncWriter localWriter = super.createWriterInstance(path);
protected AsyncWriter createWriterInstance(Path path, Path oldPath) throws IOException {
AsyncWriter localWriter = super.createWriterInstance(path, oldPath);
// retry forever if we can not create the remote writer to prevent aborting the RS due to log
// rolling error, unless the skipRemoteWal is set to true.
// TODO: since for now we only have one thread doing log rolling, this may block the rolling for
Expand All @@ -81,7 +81,7 @@ protected AsyncWriter createWriterInstance(Path path) throws IOException {
}
AsyncWriter remoteWriter;
try {
remoteWriter = createAsyncWriter(remoteFs, remoteWAL);
remoteWriter = createAsyncWriter(remoteFs, remoteWAL, null);
} catch (IOException e) {
LOG.warn("create remote writer {} failed, retry = {}", remoteWAL, retry, e);
try {
Expand Down
Loading