From 01c8201167b8534c403199ecaf0728aff7083ed6 Mon Sep 17 00:00:00 2001 From: haxiaolin Date: Wed, 9 Mar 2022 22:21:06 +0800 Subject: [PATCH] HBASE-26832 Check epoch in AsyncFSWAL#syncCompleted, to avoid repeated release of flushed wal entries --- .../hbase/regionserver/wal/AsyncFSWAL.java | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index dca64b9f77ea..c62af594ea37 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -343,12 +343,25 @@ private void syncFailed(long epochWhenSync, Throwable error) { } } - private void syncCompleted(AsyncWriter writer, long processedTxid, long startTimeNs) { + private void syncCompleted(long epochWhenSync, AsyncWriter writer, long processedTxid, + long startTimeNs) { highestSyncedTxid.set(processedTxid); + boolean epochChanged = false; + consumeLock.lock(); + try { + int currentEpochAndState = epochAndState; + if (epoch(currentEpochAndState) != epochWhenSync) { + epochChanged = true; + } + } finally { + consumeLock.unlock(); + } for (Iterator iter = unackedAppends.iterator(); iter.hasNext();) { FSWALEntry entry = iter.next(); if (entry.getTxid() <= processedTxid) { - entry.release(); + if (!epochChanged) { + entry.release(); + } iter.remove(); } else { break; @@ -399,7 +412,7 @@ private void sync(AsyncWriter writer) { if (error != null) { syncFailed(epoch, error); } else { - syncCompleted(writer, currentHighestProcessedAppendTxid, startTimeNs); + syncCompleted(epoch, writer, currentHighestProcessedAppendTxid, startTimeNs); } }, consumeExecutor); }