diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java index d1645f8462c0..48ffcb4cd566 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java @@ -43,12 +43,12 @@ private AsyncFSOutputHelper() { * Create {@link FanOutOneBlockAsyncDFSOutput} for {@link DistributedFileSystem}, and a simple * implementation for other {@link FileSystem} which wraps around a {@link FSDataOutputStream}. */ - public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite, + public static AsyncFSOutput createOutput(FileSystem fs, Path f, Path oldPath, boolean overwrite, boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup, Class channelClass) throws IOException, CommonFSUtils.StreamLacksCapabilityException { if (fs instanceof DistributedFileSystem) { - return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f, + return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f, oldPath, overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass); } final FSDataOutputStream out; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index ea9a0d8920aa..75d88e89621c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -18,11 +18,9 @@ package org.apache.hadoop.hbase.io.asyncfs; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO; -import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.READ_TIMEOUT; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.WRITER_IDLE; @@ -44,6 +42,7 @@ import org.apache.hadoop.crypto.Encryptor; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose; +import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hdfs.DFSClient; @@ -346,7 +345,8 @@ private void setupReceiver(int timeoutMs) { this.alloc = alloc; this.buf = alloc.directBuffer(sendBufSizePRedictor.initialSize()); this.state = State.STREAMING; - setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT)); + setupReceiver(conf.getInt(AbstractFSWAL.WAL_SYNC_TIMEOUT_MS, + AbstractFSWAL.DEFAULT_WAL_SYNC_TIMEOUT_MS)); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java index 0e5cf81c9f27..5672446c3681 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java @@ -21,7 +21,6 @@ import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE; @@ -48,6 +47,7 @@ import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.client.ConnectionUtils; +import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hdfs.DFSClient; @@ -123,8 +123,10 @@ private FanOutOneBlockAsyncDFSOutputHelper() { } public static final String ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = "hbase.fs.async.create.retries"; - + public static final String ASYNC_DFS_OUTPUT_SKIP_FAILED_DN = "hbase.fs.async.skipFailedDn"; public static final int DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = 10; + public static final boolean DEFAULT_ASYNC_DFS_OUTPUT_SKIP_FAILED_DN = false; + // use pooled allocator for performance. private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT; @@ -682,7 +684,8 @@ private static List> connectToDataNodes(Configuration conf, DFSC DatanodeInfo[] datanodeInfos = locatedBlock.getLocations(); boolean connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); - int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT); + int timeoutMs = conf.getInt(AbstractFSWAL.WAL_SYNC_TIMEOUT_MS, + AbstractFSWAL.DEFAULT_WAL_SYNC_TIMEOUT_MS); ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock()); blockCopy.setNumBytes(locatedBlock.getBlockSize()); ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder() @@ -741,7 +744,7 @@ public NameNodeException(Throwable cause) { } private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src, - boolean overwrite, boolean createParent, short replication, long blockSize, + Path oldPath, boolean overwrite, boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup, Class channelClass) throws IOException { Configuration conf = dfs.getConf(); FSUtils fsUtils = FSUtils.getInstance(dfs, conf); @@ -750,7 +753,28 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d ClientProtocol namenode = client.getNamenode(); int createMaxRetries = conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES, DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES); + boolean skipFailedDn = conf.getBoolean(ASYNC_DFS_OUTPUT_SKIP_FAILED_DN, + DEFAULT_ASYNC_DFS_OUTPUT_SKIP_FAILED_DN); DatanodeInfo[] excludesNodes = EMPTY_DN_ARRAY; + if (skipFailedDn && oldPath != null) { + String oldPathStr = oldPath.toUri().getPath(); + long len = namenode.getFileInfo(oldPathStr).getLen(); + for(LocatedBlock block : namenode.getBlockLocations(oldPathStr, Math.max(0, len - 1), len) + .getLocatedBlocks()) { + for(DatanodeInfo dn : block.getLocations()) { + excludesNodes = ArrayUtils.add(excludesNodes, dn); + } + } + if (LOG.isDebugEnabled()) { + StringBuilder sb = + new StringBuilder("create new output because old wal sync failed, old path is: "); + sb.append(oldPathStr).append(", newPath excludesNodes are :"); + for(DatanodeInfo info : excludesNodes) { + sb.append(info.getInfoAddr()).append(";"); + } + LOG.debug(sb.toString()); + } + } for (int retry = 0;; retry++) { HdfsFileStatus stat; try { @@ -838,14 +862,14 @@ public void operationComplete(Future future) throws Exception { * inside an {@link EventLoop}. */ public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f, - boolean overwrite, boolean createParent, short replication, long blockSize, + Path oldPath, boolean overwrite, boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup, Class channelClass) throws IOException { return new FileSystemLinkResolver() { @Override public FanOutOneBlockAsyncDFSOutput doCall(Path p) throws IOException, UnresolvedLinkException { - return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication, + return createOutput(dfs, p.toUri().getPath(), oldPath, overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index 8adb9b468052..60041830cc26 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALClosedException; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.HasThread; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.ipc.RemoteException; import org.apache.yetus.audience.InterfaceAudience; @@ -57,7 +58,8 @@ public class LogRoller extends HasThread implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(LogRoller.class); private final ReentrantLock rollLock = new ReentrantLock(); private final AtomicBoolean rollLog = new AtomicBoolean(false); - private final ConcurrentHashMap walNeedsRoll = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> walNeedsRoll = + new ConcurrentHashMap<>(); // wal -> private final Server server; protected final RegionServerServices services; private volatile long lastrolltime = System.currentTimeMillis(); @@ -70,11 +72,17 @@ public class LogRoller extends HasThread implements Closeable { private volatile boolean running = true; public void addWAL(final WAL wal) { - if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) { + if (null == walNeedsRoll.putIfAbsent(wal, + new Pair(Boolean.FALSE, Boolean.FALSE))) { wal.registerWALActionsListener(new WALActionsListener() { @Override public void logRollRequested(WALActionsListener.RollRequestReason reason) { - walNeedsRoll.put(wal, Boolean.TRUE); + Pair walInfo = new Pair<>(Boolean.TRUE, Boolean.FALSE); + if (reason.equals(WALActionsListener.RollRequestReason.SLOW_SYNC) || + reason.equals(WALActionsListener.RollRequestReason.ERROR)) { + walInfo.setSecond(Boolean.TRUE); + } + walNeedsRoll.put(wal, walInfo); // TODO logs will contend with each other here, replace with e.g. DelayedQueue synchronized(rollLog) { rollLog.set(true); @@ -87,7 +95,7 @@ public void logRollRequested(WALActionsListener.RollRequestReason reason) { public void requestRollAll() { for (WAL wal : walNeedsRoll.keySet()) { - walNeedsRoll.put(wal, Boolean.TRUE); + walNeedsRoll.put(wal, new Pair(Boolean.TRUE, Boolean.FALSE)); } synchronized(rollLog) { rollLog.set(true); @@ -122,9 +130,9 @@ public void interrupt() { */ void checkLowReplication(long now) { try { - for (Entry entry : walNeedsRoll.entrySet()) { + for (Entry> entry : walNeedsRoll.entrySet()) { WAL wal = entry.getKey(); - boolean needRollAlready = entry.getValue(); + boolean needRollAlready = entry.getValue().getFirst(); if (needRollAlready || !(wal instanceof AbstractFSWAL)) { continue; } @@ -180,16 +188,18 @@ public void run() { rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH try { this.lastrolltime = now; - for (Iterator> iter = walNeedsRoll.entrySet().iterator(); iter - .hasNext();) { - Entry entry = iter.next(); + for (Iterator>> iter + = walNeedsRoll.entrySet().iterator(); iter.hasNext();) { + Entry> entry = iter.next(); final WAL wal = entry.getKey(); + Pair walInfo = entry.getValue(); + boolean syncFailed = walInfo.getSecond().booleanValue(); // Force the roll if the logroll.period is elapsed or if a roll was requested. // The returned value is an array of actual region names. try { - final byte[][] regionsToFlush = - wal.rollWriter(periodic || entry.getValue().booleanValue()); - walNeedsRoll.put(wal, Boolean.FALSE); + final byte[][] regionsToFlush = wal.rollWriter(periodic || + walInfo.getFirst().booleanValue() || syncFailed, syncFailed); + walNeedsRoll.put(wal, new Pair<>(Boolean.FALSE, Boolean.FALSE)); if (regionsToFlush != null) { for (byte[] r : regionsToFlush) { scheduleFlush(r); @@ -247,8 +257,8 @@ private void scheduleFlush(final byte [] encodedRegionName) { * @return true if all WAL roll finished */ public boolean walRollFinished() { - for (boolean needRoll : walNeedsRoll.values()) { - if (needRoll) { + for (Pair walInfo : walNeedsRoll.values()) { + if (walInfo.getFirst().booleanValue()) { return false; } } @@ -271,7 +281,7 @@ public void close() { } @VisibleForTesting - Map getWalNeedsRoll() { + Map> getWalNeedsRoll() { return this.walNeedsRoll; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index ad2eec613019..3b18d0fe5cd8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -130,8 +130,8 @@ public abstract class AbstractFSWAL implements WAL { "hbase.regionserver.wal.slowsync.roll.interval.ms"; protected static final int DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS = 60 * 1000; // in ms, 1 minute - protected static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout"; - protected static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min + public static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout"; + public static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min /** * file system instance @@ -535,7 +535,7 @@ public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyNam @Override public byte[][] rollWriter() throws FailedLogCloseException, IOException { - return rollWriter(false); + return rollWriter(false, false); } /** @@ -814,14 +814,15 @@ private IOException convertInterruptedExceptionToIOException(final InterruptedEx } @Override - public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException { + public byte[][] rollWriter(boolean force, boolean syncFailed) + throws FailedLogCloseException, IOException { rollWriterLock.lock(); try { if (this.closed) { throw new WALClosedException("WAL has been closed"); } // Return if nothing to flush. - if (!force && this.writer != null && this.numEntries.get() <= 0) { + if (!force && !syncFailed && (this.writer != null && this.numEntries.get() <= 0)) { return null; } byte[][] regionsToFlush = null; @@ -829,7 +830,7 @@ public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOExce Path oldPath = getOldPath(); Path newPath = getNewPath(); // Any exception from here on is catastrophic, non-recoverable so we currently abort. - W nextWriter = this.createWriterInstance(newPath); + W nextWriter = this.createWriterInstance(newPath, syncFailed ? oldPath : null); tellListenersAboutPreLogRoll(oldPath, newPath); // NewPath could be equal to oldPath if replaceWriter fails. newPath = replaceWriter(oldPath, newPath, nextWriter); @@ -1125,7 +1126,7 @@ public abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, bool protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException; - protected abstract W createWriterInstance(Path path) + protected abstract W createWriterInstance(Path path, Path oldPath) throws IOException, CommonFSUtils.StreamLacksCapabilityException; protected abstract void doReplaceWriter(Path oldPath, Path newPath, W nextWriter) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java index ff2864d533fe..7378993e80d3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java @@ -154,8 +154,8 @@ private boolean initializeCompressionContext(Configuration conf, Path path) thro return doCompress; } - public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable, - long blocksize) throws IOException, StreamLacksCapabilityException { + public void init(FileSystem fs, Path path, Path oldPath, Configuration conf, + boolean overwritable, long blocksize) throws IOException, StreamLacksCapabilityException { this.conf = conf; boolean doCompress = initializeCompressionContext(conf, path); this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE); @@ -163,7 +163,7 @@ public void init(FileSystem fs, Path path, Configuration conf, boolean overwrita short replication = (short) conf.getInt("hbase.regionserver.hlog.replication", FSUtils.getDefaultReplication(fs, path)); - initOutput(fs, path, overwritable, bufferSize, replication, blocksize); + initOutput(fs, path, oldPath, overwritable, bufferSize, replication, blocksize); boolean doTagCompress = doCompress && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true); @@ -239,8 +239,9 @@ protected void writeWALTrailer() { } } - protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, - short replication, long blockSize) throws IOException, StreamLacksCapabilityException; + protected abstract void initOutput(FileSystem fs, Path path, Path oldPath, + boolean overwritable, int bufferSize, short replication, long blockSize) + throws IOException, StreamLacksCapabilityException; /** * return the file length after written. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 209ace684257..593a3fb8d9e8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -660,14 +660,15 @@ public void sync(long txid) throws IOException { } } - protected final AsyncWriter createAsyncWriter(FileSystem fs, Path path) throws IOException { - return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, this.blocksize, + protected final AsyncWriter createAsyncWriter(FileSystem fs, Path path, Path oldPath) + throws IOException { + return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, oldPath, false, this.blocksize, eventLoopGroup, channelClass); } @Override - protected AsyncWriter createWriterInstance(Path path) throws IOException { - return createAsyncWriter(fs, path); + protected AsyncWriter createWriterInstance(Path path, Path oldPath) throws IOException { + return createAsyncWriter(fs, path, oldPath); } private void waitForSafePoint() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index 37c6f004913b..aecab2c9ff4d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -164,9 +164,9 @@ public AsyncFSOutput getOutput() { } @Override - protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, + protected void initOutput(FileSystem fs, Path path, Path oldPath, boolean overwritable, int bufferSize, short replication, long blockSize) throws IOException, StreamLacksCapabilityException { - this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication, + this.output = AsyncFSOutputHelper.createOutput(fs, path, oldPath, overwritable, false, replication, blockSize, eventLoopGroup, channelClass); this.asyncOutputWrapper = new OutputStreamWrapper(output); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java index bf5b96dfce1e..38b7e48371eb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java @@ -68,8 +68,8 @@ protected AsyncWriter createCombinedAsyncWriter(AsyncWriter localWriter, } @Override - protected AsyncWriter createWriterInstance(Path path) throws IOException { - AsyncWriter localWriter = super.createWriterInstance(path); + protected AsyncWriter createWriterInstance(Path path, Path oldPath) throws IOException { + AsyncWriter localWriter = super.createWriterInstance(path, oldPath); // retry forever if we can not create the remote writer to prevent aborting the RS due to log // rolling error, unless the skipRemoteWal is set to true. // TODO: since for now we only have one thread doing log rolling, this may block the rolling for @@ -81,7 +81,7 @@ protected AsyncWriter createWriterInstance(Path path) throws IOException { } AsyncWriter remoteWriter; try { - remoteWriter = createAsyncWriter(remoteFs, remoteWAL); + remoteWriter = createAsyncWriter(remoteFs, remoteWAL, null); } catch (IOException e) { LOG.warn("create remote writer {} failed, retry = {}", remoteWAL, retry, e); try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 44e919be0af5..0e58f819dca8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -275,8 +275,8 @@ private void preemptiveSync(final ProtobufLogWriter nextWriter) { * @return Writer instance */ @Override - protected Writer createWriterInstance(final Path path) throws IOException { - Writer writer = FSHLogProvider.createWriter(conf, fs, path, false, this.blocksize); + protected Writer createWriterInstance(final Path path, final Path oldPath) throws IOException { + Writer writer = FSHLogProvider.createWriter(conf, fs, path, oldPath, false, this.blocksize); if (writer instanceof ProtobufLogWriter) { preemptiveSync((ProtobufLogWriter) writer); } @@ -581,7 +581,6 @@ public void run() { //TraceScope scope = Trace.continueSpan(takeSyncFuture.getSpan()); long start = System.nanoTime(); Throwable lastException = null; - boolean wasRollRequested = false; try { TraceUtil.addTimelineAnnotation("syncing writer"); writer.sync(useHsync); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java index b2af4a80ad3b..3c587ecc8556 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java @@ -19,17 +19,15 @@ package org.apache.hadoop.hbase.regionserver.wal; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; - import java.io.IOException; - -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKey; -import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.util.StringUtils; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; /** * Class used to push numbers about the WAL into the metrics subsystem. This will take a diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index 5c8e0d21f875..f2a5c3770adb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -92,8 +92,9 @@ public FSDataOutputStream getStream() { @SuppressWarnings("deprecation") @Override - protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, - short replication, long blockSize) throws IOException, StreamLacksCapabilityException { + protected void initOutput(FileSystem fs, Path path, Path oldPath, boolean overwritable, + int bufferSize,short replication, long blockSize) + throws IOException, StreamLacksCapabilityException { this.output = CommonFSUtils.createForWal(fs, path, overwritable, bufferSize, replication, blockSize, false); if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index 8741c1c8c63f..3e0512c20107 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -36,15 +36,15 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; @@ -462,9 +462,7 @@ public static Path getArchivedLogPath(Path path, Configuration conf) throws IOEx * @throws IOException */ public static org.apache.hadoop.hbase.wal.WAL.Reader openReader(Path path, Configuration conf) - throws IOException - - { + throws IOException { long retryInterval = 2000; // 2 sec int maxAttempts = 30; int attempt = 0; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java index 062b3688d3e4..be7de0a214d7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java @@ -55,8 +55,8 @@ public interface AsyncWriter extends WALProvider.AsyncWriter { * @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that * meet the needs of the given Writer implementation. */ - void init(FileSystem fs, Path path, Configuration c, boolean overwritable, long blocksize) - throws IOException, CommonFSUtils.StreamLacksCapabilityException; + void init(FileSystem fs, Path path, Path oldPath, Configuration c, boolean overwritable, + long blocksize) throws IOException, CommonFSUtils.StreamLacksCapabilityException; } private EventLoopGroup eventLoopGroup; @@ -84,17 +84,17 @@ protected void doInit(Configuration conf) throws IOException { * Public because of AsyncFSWAL. Should be package-private */ public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, Path path, - boolean overwritable, EventLoopGroup eventLoopGroup, + Path oldPath, boolean overwritable, EventLoopGroup eventLoopGroup, Class channelClass) throws IOException { - return createAsyncWriter(conf, fs, path, overwritable, WALUtil.getWALBlockSize(conf, fs, path), - eventLoopGroup, channelClass); + return createAsyncWriter(conf, fs, path, oldPath, overwritable, + WALUtil.getWALBlockSize(conf, fs, path), eventLoopGroup, channelClass); } /** * Public because of AsyncFSWAL. Should be package-private */ public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, Path path, - boolean overwritable, long blocksize, EventLoopGroup eventLoopGroup, + Path oldPath, boolean overwritable, long blocksize, EventLoopGroup eventLoopGroup, Class channelClass) throws IOException { // Configuration already does caching for the Class lookup. Class logWriterClass = conf.getClass( @@ -102,7 +102,7 @@ public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, P try { AsyncWriter writer = logWriterClass.getConstructor(EventLoopGroup.class, Class.class) .newInstance(eventLoopGroup, channelClass); - writer.init(fs, path, conf, overwritable, blocksize); + writer.init(fs, path, oldPath, conf, overwritable, blocksize); return writer; } catch (Exception e) { if (e instanceof CommonFSUtils.StreamLacksCapabilityException) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index 5f787fef5990..3445923011f7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -140,7 +140,7 @@ public byte[][] rollWriter() { } @Override - public byte[][] rollWriter(boolean force) { + public byte[][] rollWriter(boolean force, boolean syncFailed) { return rollWriter(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java index 3b91c2475cfe..e90b4ad2c53f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java @@ -48,8 +48,8 @@ public interface Writer extends WALProvider.Writer { * @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that * meet the needs of the given Writer implementation. */ - void init(FileSystem fs, Path path, Configuration c, boolean overwritable, long blocksize) - throws IOException, CommonFSUtils.StreamLacksCapabilityException; + void init(FileSystem fs, Path path, Path oldPath, Configuration c, boolean overwritable, + long blocksize) throws IOException, CommonFSUtils.StreamLacksCapabilityException; } /** @@ -58,8 +58,8 @@ void init(FileSystem fs, Path path, Configuration c, boolean overwritable, long * for WAL it is false. Thus we can distinguish WAL and recovered edits by this. */ public static Writer createWriter(final Configuration conf, final FileSystem fs, final Path path, - final boolean overwritable) throws IOException { - return createWriter(conf, fs, path, overwritable, + final Path oldPath, final boolean overwritable) throws IOException { + return createWriter(conf, fs, path, oldPath, overwritable, WALUtil.getWALBlockSize(conf, fs, path, overwritable)); } @@ -67,7 +67,7 @@ public static Writer createWriter(final Configuration conf, final FileSystem fs, * Public because of FSHLog. Should be package-private */ public static Writer createWriter(final Configuration conf, final FileSystem fs, final Path path, - final boolean overwritable, long blocksize) throws IOException { + final Path oldPath, final boolean overwritable, long blocksize) throws IOException { // Configuration already does caching for the Class lookup. Class logWriterClass = conf.getClass("hbase.regionserver.hlog.writer.impl", ProtobufLogWriter.class, @@ -76,7 +76,7 @@ public static Writer createWriter(final Configuration conf, final FileSystem fs, try { writer = logWriterClass.getDeclaredConstructor().newInstance(); FileSystem rootFs = FileSystem.get(path.toUri(), conf); - writer.init(rootFs, path, conf, overwritable, blocksize); + writer.init(rootFs, path, oldPath, conf, overwritable, blocksize); return writer; } catch (Exception e) { if (e instanceof CommonFSUtils.StreamLacksCapabilityException) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index cf367cde159d..1956c13e0bfd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -80,7 +80,8 @@ public interface WAL extends Closeable, WALFileLengthProvider { * can clean logs. Returns null if nothing to flush. Names are actual * region names as returned by {@link RegionInfo#getEncodedName()} */ - byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException; + byte[][] rollWriter(boolean force, boolean syncFailed) + throws FailedLogCloseException, IOException; /** * Stop accepting new writes. If we have unsynced writes still in buffer, sync them. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java index 8bde6d20011b..77902362ac7c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java @@ -385,7 +385,7 @@ public Reader createReader(final FileSystem fs, final Path path, CancelableProgr * @return A WAL writer. Close when done with it. */ public Writer createWALWriter(final FileSystem fs, final Path path) throws IOException { - return FSHLogProvider.createWriter(conf, fs, path, false); + return FSHLogProvider.createWriter(conf, fs, path, null, false); } /** @@ -396,7 +396,7 @@ public Writer createWALWriter(final FileSystem fs, final Path path) throws IOExc @VisibleForTesting public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path) throws IOException { - return FSHLogProvider.createWriter(conf, fs, path, true); + return FSHLogProvider.createWriter(conf, fs, path, null, true); } // These static methods are currently used where it's impractical to @@ -465,7 +465,7 @@ public static Reader createReaderIgnoreCustomClass(final FileSystem fs, final Pa static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path, final Configuration configuration) throws IOException { - return FSHLogProvider.createWriter(configuration, fs, path, true); + return FSHLogProvider.createWriter(configuration, fs, path, null, true); } /** @@ -477,7 +477,7 @@ static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path, public static Writer createWALWriter(final FileSystem fs, final Path path, final Configuration configuration) throws IOException { - return FSHLogProvider.createWriter(configuration, fs, path, false); + return FSHLogProvider.createWriter(configuration, fs, path, null, false); } public final WALProvider getWALProvider() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java index dde020d4326b..e2a22b7e605a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java @@ -393,7 +393,7 @@ public void testWALObserverRoll() throws Exception { assertFalse(cp.isPreWALRollCalled()); assertFalse(cp.isPostWALRollCalled()); - wal.rollWriter(true); + wal.rollWriter(true, false); assertTrue(cp.isPreWALRollCalled()); assertTrue(cp.isPostWALRollCalled()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index 6be44e9158c1..1cf901e71312 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -87,6 +88,7 @@ public class TestFanOutOneBlockAsyncDFSOutput { @BeforeClass public static void setUp() throws Exception { TEST_UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS); + TEST_UTIL.getConfiguration().setInt(AbstractFSWAL.WAL_SYNC_TIMEOUT_MS, READ_TIMEOUT_MS); TEST_UTIL.startMiniDFSCluster(3); FS = TEST_UTIL.getDFSCluster().getFileSystem(); EVENT_LOOP_GROUP = new NioEventLoopGroup(); @@ -135,8 +137,8 @@ static void writeAndVerify(FileSystem fs, Path f, AsyncFSOutput out) public void test() throws IOException, InterruptedException, ExecutionException { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); - FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); + FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, + null, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); writeAndVerify(FS, f, out); } @@ -144,8 +146,8 @@ public void test() throws IOException, InterruptedException, ExecutionException public void testRecover() throws IOException, InterruptedException, ExecutionException { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); - FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); + FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, + null, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); byte[] b = new byte[10]; ThreadLocalRandom.current().nextBytes(b); out.write(b, 0, b.length); @@ -173,8 +175,8 @@ public void testRecover() throws IOException, InterruptedException, ExecutionExc public void testHeartbeat() throws IOException, InterruptedException, ExecutionException { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); - FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); + FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, + null, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); Thread.sleep(READ_TIMEOUT_MS * 2); // the connection to datanode should still alive. writeAndVerify(FS, f, out); @@ -188,7 +190,7 @@ public void testCreateParentFailed() throws IOException { Path f = new Path("/" + name.getMethodName() + "/test"); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); try { - FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, + FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, null, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); fail("should fail with parent does not exist"); } catch (RemoteException e) { @@ -212,7 +214,7 @@ public void testConnectToDatanodeFailed() Path f = new Path("/test"); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, - f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS)) { + f, null, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS)) { // should exclude the dead dn when retry so here we only have 2 DNs in pipeline assertEquals(2, output.getPipeline().length); } finally { @@ -224,8 +226,8 @@ public void testConnectToDatanodeFailed() public void testWriteLargeChunk() throws IOException, InterruptedException, ExecutionException { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); - FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS); + FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, null, + true, false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS); byte[] b = new byte[50 * 1024 * 1024]; ThreadLocalRandom.current().nextBytes(b); out.write(b); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java index 406af17b52e1..661f99d8c67b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java @@ -60,7 +60,7 @@ public void test() throws IOException, InterruptedException, ExecutionException, FSUtils.StreamLacksCapabilityException { Path f = new Path(TEST_UTIL.getDataTestDir(), "test"); FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration()); - AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true, + AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, null, false, true, fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP, CHANNEL_CLASS); TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(fs, f, out); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java index cf0ffa235d1b..4190dc280ff2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java @@ -213,7 +213,7 @@ private Path getEncryptionTestFile() { private void test(Path file) throws IOException, InterruptedException, ExecutionException { EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file, - true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); + null, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(FS, file, out); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java index 14906539798c..856b87f93468 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java @@ -114,15 +114,16 @@ public DodgyFSLog(FileSystem fs, Path root, String logDir, Configuration conf) } @Override - public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException { - byte[][] regions = super.rollWriter(force); + public byte[][] rollWriter(boolean force, boolean syncFailed) + throws FailedLogCloseException, IOException { + byte[][] regions = super.rollWriter(force, syncFailed); rolls.getAndIncrement(); return regions; } @Override - protected Writer createWriterInstance(Path path) throws IOException { - final Writer w = super.createWriterInstance(path); + protected Writer createWriterInstance(Path path, Path oldPath) throws IOException { + final Writer w = super.createWriterInstance(path, oldPath); return new Writer() { @Override public void close() throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index a646b32d4539..d00bec040a4e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -1157,8 +1157,8 @@ public FailAppendFlushMarkerWAL(FileSystem fs, Path root, String logDir, Configu } @Override - protected Writer createWriterInstance(Path path) throws IOException { - final Writer w = super.createWriterInstance(path); + protected Writer createWriterInstance(Path newPath, Path oldPath) throws IOException { + final Writer w = super.createWriterInstance(newPath, oldPath); return new Writer() { @Override public void close() throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java index 0e7c019de664..a19a03cd6231 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java @@ -521,7 +521,7 @@ public String explainFailure() throws Exception { assertEquals(MutableSegment.DEEP_OVERHEAD, desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize()); // let WAL cleanOldLogs - assertNull(getWAL(desiredRegion).rollWriter(true)); + assertNull(getWAL(desiredRegion).rollWriter(true, false)); assertTrue(getNumRolledLogFiles(desiredRegion) < maxLogs); } finally { TEST_UTIL.shutdownMiniCluster(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java index 0e20252166be..4ad01739890a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java @@ -162,8 +162,8 @@ protected void beforeWaitOnSafePoint() { } @Override - protected Writer createWriterInstance(Path path) throws IOException { - final Writer w = super.createWriterInstance(path); + protected Writer createWriterInstance(Path newPath, Path oldPath) throws IOException { + final Writer w = super.createWriterInstance(newPath, oldPath); return new Writer() { @Override public void close() throws IOException { @@ -353,8 +353,8 @@ protected void publishSyncOnRingBufferAndBlock(long sequence) { } @Override - protected Writer createWriterInstance(Path path) throws IOException { - final Writer w = super.createWriterInstance(path); + protected Writer createWriterInstance(Path path, Path oldPath) throws IOException { + final Writer w = super.createWriterInstance(path, oldPath); return new Writer() { @Override public void close() throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java index 68eebc17a45d..e7dcf4c26222 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java @@ -297,7 +297,7 @@ public void testFindMemStoresEligibleForFlush() throws Exception { assertEquals(1, wal.getNumRolledLogFiles()); // flush the second region flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames()); - wal.rollWriter(true); + wal.rollWriter(true, false); // no wal should remain now. assertEquals(0, wal.getNumRolledLogFiles()); // add edits both to region 1 and region 2, and roll. @@ -315,7 +315,7 @@ public void testFindMemStoresEligibleForFlush() throws Exception { // flush both regions flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames()); - wal.rollWriter(true); + wal.rollWriter(true, false); assertEquals(0, wal.getNumRolledLogFiles()); // Add an edit to region1, and roll the wal. addEdits(wal, hri1, t1, 2, mvcc, scopes1); @@ -339,12 +339,12 @@ public void testFailedToCreateWALIfParentRenamed() throws IOException, HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); long filenum = System.currentTimeMillis(); Path path = wal.computeFilename(filenum); - wal.createWriterInstance(path); + wal.createWriterInstance(path, null); Path parent = path.getParent(); path = wal.computeFilename(filenum + 1); Path newPath = new Path(parent.getParent(), parent.getName() + "-splitting"); FS.rename(parent, newPath); - wal.createWriterInstance(path); + wal.createWriterInstance(path, null); fail("It should fail to create the new WAL"); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java index 4c19aa0a8244..651f7794c2cb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java @@ -169,7 +169,7 @@ public void testLogRollOnNothingWritten() throws Exception { final WAL newLog = wals.getWAL(null); try { // Now roll the log before we write anything. - newLog.rollWriter(true); + newLog.rollWriter(true, false); } finally { wals.close(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index f2fd5916f776..20a0cd7d2eab 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -1230,7 +1230,7 @@ private void writerWALFile(Path file, List entries) throws IOExcepti StreamLacksCapabilityException { fs.mkdirs(file.getParent()); ProtobufLogWriter writer = new ProtobufLogWriter(); - writer.init(fs, file, conf, true, WALUtil.getWALBlockSize(conf, fs, file)); + writer.init(fs, file, null, conf, true, WALUtil.getWALBlockSize(conf, fs, file)); for (FSWALEntry entry : entries) { writer.append(entry); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java index 8afae061be43..7f4aade662c1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java @@ -20,6 +20,10 @@ import static org.junit.Assert.assertEquals; import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.client.RegionInfo; @@ -53,7 +57,7 @@ public static void setUpBeforeClass() throws Exception { @Test public void testLogRollOnDatanodeDeath() throws IOException, InterruptedException { - dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), 3, true, null, null); + dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), 2, true, null, null); tableName = getName(); Table table = createTestTable(tableName); TEST_UTIL.waitUntilAllRegionsAssigned(table.getName()); @@ -67,5 +71,13 @@ public void testLogRollOnDatanodeDeath() throws IOException, InterruptedExceptio TEST_UTIL.getDFSCluster().restartDataNode(dnProp); doPut(table, 2); assertEquals(numRolledLogFiles + 1, AsyncFSWALProvider.getNumRolledLogFiles(wal)); + + // Test HBASE-20902 + DatanodeInfo[] newDNs = wal.getPipeline(); + Set dns = new HashSet<>(); + Collections.addAll(dns, dnInfos); + Collections.addAll(dns, newDNs); + // when syncfailed choose diff dns + assertEquals(dns.size(), dnInfos.length + newDNs.length); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java index 7626dcf971a6..fb0c7204cf1b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java @@ -61,6 +61,7 @@ public static void tearDownAfterClass() throws Exception { @Override protected Writer createWriter(Path path) throws IOException { return new WriterOverAsyncWriter(AsyncFSWALProvider.createAsyncWriter( - TEST_UTIL.getConfiguration(), fs, path, false, EVENT_LOOP_GROUP.next(), CHANNEL_CLASS)); + TEST_UTIL.getConfiguration(), fs, path, null, false, + EVENT_LOOP_GROUP.next(), CHANNEL_CLASS)); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java index f73b4f159eec..a8ebdfb99730 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java @@ -105,9 +105,9 @@ private void doTest(boolean withTrailer) throws IOException { FileSystem fs = UTIL.getTestFileSystem(); Configuration conf = UTIL.getConfiguration(); try ( - AsyncWriter writer1 = AsyncFSWALProvider.createAsyncWriter(conf, fs, path1, false, + AsyncWriter writer1 = AsyncFSWALProvider.createAsyncWriter(conf, fs, path1, null, false, EVENT_LOOP_GROUP.next(), CHANNEL_CLASS); - AsyncWriter writer2 = AsyncFSWALProvider.createAsyncWriter(conf, fs, path2, false, + AsyncWriter writer2 = AsyncFSWALProvider.createAsyncWriter(conf, fs, path2, null, false, EVENT_LOOP_GROUP.next(), CHANNEL_CLASS); CombinedAsyncWriter writer = CombinedAsyncWriter.create(writer1, writer2)) { ProtobufLogTestHelper.doWrite(new WriterOverAsyncWriter(writer), withTrailer, tableName, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java index 3eed1372a1ef..1d10d030e45e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java @@ -167,7 +167,7 @@ public void testRSAbortWithUnflushedEdits() throws Exception { LOG.info("Restarted datanodes"); try { - log.rollWriter(true); + log.rollWriter(true, false); } catch (FailedLogCloseException flce) { // Expected exception. We used to expect that there would be unsynced appends but this // not reliable now that sync plays a roll in wall rolling. The above puts also now call diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java index 6c257e250bd4..106bcf27db75 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java @@ -398,7 +398,7 @@ public void logRollRequested(WALActionsListener.RollRequestReason reason) { // Force roll writer. The new log file will have the default replications, // and the LowReplication Roller will be enabled. - log.rollWriter(true); + log.rollWriter(true, false); batchWriteAndWait(table, log, 13, true, 10000); replication = log.getLogReplication(); assertTrue("New log file should have the default replication instead of " + replication, @@ -488,7 +488,7 @@ public void postLogRoll(Path oldFile, Path newFile) { writeData(table, 1005); // force a log roll to read back and verify previously written logs - log.rollWriter(true); + log.rollWriter(true, false); assertTrue("preLogRolledCalled has size of " + preLogRolledCalled.size(), preLogRolledCalled.size() >= 1); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java index d429a01fdb9a..bc484fae4274 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java @@ -36,6 +36,6 @@ public class TestProtobufLog extends AbstractTestProtobufLog { @Override protected Writer createWriter(Path path) throws IOException { - return FSHLogProvider.createWriter(TEST_UTIL.getConfiguration(), fs, path, false); + return FSHLogProvider.createWriter(TEST_UTIL.getConfiguration(), fs, path, null, false); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java index 62000b4cd720..9ee576dc42ff 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java @@ -112,7 +112,7 @@ protected AsyncWriter createCombinedAsyncWriter(AsyncWriter localWriter, } @Override - protected AsyncWriter createWriterInstance(Path path) throws IOException { + protected AsyncWriter createWriterInstance(Path path, Path oldPath) throws IOException { if (arrive != null) { arrive.countDown(); try { @@ -123,7 +123,7 @@ protected AsyncWriter createWriterInstance(Path path) throws IOException { if (localBroken || remoteBroken) { throw new IOException("WAL broken"); } - return super.createWriterInstance(path); + return super.createWriterInstance(path, oldPath); } public void setLocalBroken() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java index 4effe4149c57..f86b74b8baaa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java @@ -109,7 +109,7 @@ public void testEmptyWALRecovery() throws Exception { RegionInfo regionInfo = utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); WAL wal = hrs.getWAL(regionInfo); - wal.rollWriter(true); + wal.rollWriter(true, false); } // ReplicationSource should advance past the empty wal, or else the test will fail diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java index d01a0ac61ad2..5e37c1be68b0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java @@ -160,7 +160,8 @@ private void setupSyncReplicationWALs() throws IOException, StreamLacksCapabilit for (int i = 0; i < WAL_NUMBER; i++) { try (ProtobufLogWriter writer = new ProtobufLogWriter()) { Path wal = new Path(peerRemoteWALDir, "srv1,8888." + i + ".syncrep"); - writer.init(fs, wal, conf, true, WALUtil.getWALBlockSize(conf, fs, peerRemoteWALDir)); + writer.init(fs, wal, null, conf, true, + WALUtil.getWALBlockSize(conf, fs, peerRemoteWALDir)); List entries = setupWALEntries(i * ROW_COUNT, (i + 1) * ROW_COUNT); for (Entry entry : entries) { writer.append(entry); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java index d062c77cb336..869d9bde47bd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java @@ -202,7 +202,7 @@ public IOTestWAL(final FileSystem fs, final Path rootDir, final String logDir, // creatWriterInstance is where the new pipeline is set up for doing file rolls // if we are skipping it, just keep returning the same writer. @Override - protected Writer createWriterInstance(final Path path) throws IOException { + protected Writer createWriterInstance(final Path path, final Path oldPath) throws IOException { // we get called from the FSHLog constructor (!); always roll in this case since // we don't know yet if we're supposed to generally roll and // we need an initial file in the case of doing appends but no rolls. @@ -210,7 +210,7 @@ protected Writer createWriterInstance(final Path path) throws IOException { LOG.info("creating new writer instance."); final ProtobufLogWriter writer = new IOTestWriter(); try { - writer.init(fs, path, conf, false, this.blocksize); + writer.init(fs, path, oldPath, conf, false, this.blocksize); } catch (CommonFSUtils.StreamLacksCapabilityException exception) { throw new IOException("Can't create writer instance because underlying FileSystem " + "doesn't support needed stream capabilities.", exception); @@ -237,8 +237,9 @@ private static class IOTestWriter extends ProtobufLogWriter { private boolean doSyncs; @Override - public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable, - long blocksize) throws IOException, CommonFSUtils.StreamLacksCapabilityException { + public void init(FileSystem fs, Path path, Path oldPath, Configuration conf, + boolean overwritable, long blocksize) + throws IOException, CommonFSUtils.StreamLacksCapabilityException { Collection operations = conf.getStringCollection(ALLOWED_OPERATIONS); if (operations.isEmpty() || operations.contains(AllowedOperations.all.name())) { doAppends = doSyncs = true; @@ -250,7 +251,7 @@ public void init(FileSystem fs, Path path, Configuration conf, boolean overwrita } LOG.info("IOTestWriter initialized with appends " + (doAppends ? "enabled" : "disabled") + " and syncs " + (doSyncs ? "enabled" : "disabled")); - super.init(fs, path, conf, overwritable, blocksize); + super.init(fs, path, oldPath, conf, overwritable, blocksize); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java index 8fbe09dd30ba..8bf1c4defc8c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java @@ -212,7 +212,7 @@ public void testSplit() throws IOException { walKey.getWriteEntry(); } log.sync(); - log.rollWriter(true); + log.rollWriter(true, false); } } wals.shutdown();