Skip to content

Commit 833c43d

Browse files
saintstackJenkins
authored andcommitted
HBASE-23221 Polish the WAL interface after HBASE-23181 (apache#774)
Removes the closeRegion flag added by HBASE-23181 and instead relies on reading meta WALEdit content. Modified how qualifier is written when the meta WALEdit is for a RegionEventDescriptor so the 'type' is added to the qualifer so can figure type w/o having to deserialize protobuf value content: e.g. HBASE::REGION_EVENT::REGION_CLOSE Added doc on WALEdit and tried to formalize the 'meta' WALEdit type and how it works. Needs complete redo in part as suggested by HBASE-8457. Meantime, some doc and cleanup. Also changed the LogRoller constructor to remove redundant param. Because of constructor change, need to change also TestFailedAppendAndSync, TestWALLockup, TestAsyncFSWAL & WALPerformanceEvaluation.java Signed-off-by: Duo Zhang <[email protected]> Signed-off-by: Lijin Bin <[email protected]> (cherry picked from commit 91678bc) Change-Id: I49d2f9134c432ea06bc42bd5234464277c5412de
1 parent de377a1 commit 833c43d

File tree

28 files changed

+311
-234
lines changed

28 files changed

+311
-234
lines changed

hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -756,7 +756,7 @@ public static boolean matchingRow(final Cell left, final Cell right) {
756756

757757
/**
758758
* @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0. Instead use
759-
* {@link #matchingRows(Cell, byte[]))}
759+
* {@link #matchingRows(Cell, byte[])}
760760
*/
761761
@Deprecated
762762
public static boolean matchingRow(final Cell left, final byte[] buf) {
@@ -894,8 +894,15 @@ public static boolean matchingQualifier(final Cell left, final byte[] buf, final
894894
}
895895

896896
public static boolean matchingColumn(final Cell left, final byte[] fam, final byte[] qual) {
897-
if (!matchingFamily(left, fam)) return false;
898-
return matchingQualifier(left, qual);
897+
return matchingFamily(left, fam) && matchingQualifier(left, qual);
898+
}
899+
900+
/**
901+
* @return True if matching column family and the qualifier starts with <code>qual</code>
902+
*/
903+
public static boolean matchingColumnFamilyAndQualifierPrefix(final Cell left, final byte[] fam,
904+
final byte[] qual) {
905+
return matchingFamily(left, fam) && PrivateCellUtil.qualifierStartsWith(left, qual);
899906
}
900907

901908
/**

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,11 +199,10 @@ public void map(WALKey key, WALEdit value, Context context)
199199
Delete del = null;
200200
Cell lastCell = null;
201201
for (Cell cell : value.getCells()) {
202-
// filtering WAL meta entries
202+
// Filtering WAL meta marker entries.
203203
if (WALEdit.isMetaEditFamily(cell)) {
204204
continue;
205205
}
206-
207206
// Allow a subclass filter out this cell.
208207
if (filter(context, cell)) {
209208
// A WALEdit may contain multiple operations (HBASE-3584) and/or

hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3485,7 +3485,7 @@ public List<ReplicationPeerDescription> listReplicationPeers(String regex)
34853485
if (cpHost != null) {
34863486
cpHost.preListReplicationPeers(regex);
34873487
}
3488-
LOG.info(getClientIdAuditPrefix() + " list replication peers, regex=" + regex);
3488+
LOG.debug("{} list replication peers, regex={}", getClientIdAuditPrefix(), regex);
34893489
Pattern pattern = regex == null ? null : Pattern.compile(regex);
34903490
List<ReplicationPeerDescription> peers =
34913491
this.replicationPeerManager.listPeers(pattern);

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1725,7 +1725,7 @@ public Pair<byte[], Collection<HStoreFile>> call() throws IOException {
17251725

17261726
status.setStatus("Writing region close event to WAL");
17271727
// Always write close marker to wal even for read only table. This is not a big problem as we
1728-
// do not write any data into the region.
1728+
// do not write any data into the region; it is just a meta edit in the WAL file.
17291729
if (!abort && wal != null && getRegionServerServices() != null &&
17301730
RegionReplicaUtil.isDefaultReplica(getRegionInfo())) {
17311731
writeRegionCloseMarker(wal);
@@ -2691,7 +2691,8 @@ private void logFatLineOnFlush(Collection<HStore> storesToFlush, long sequenceId
26912691
}
26922692
}
26932693
MemStoreSize mss = this.memStoreSizing.getMemStoreSize();
2694-
LOG.info("Flushing " + storesToFlush.size() + "/" + stores.size() + " column families," +
2694+
LOG.info("Flushing " + this.getRegionInfo().getEncodedName() + " " +
2695+
storesToFlush.size() + "/" + stores.size() + " column families," +
26952696
" dataSize=" + StringUtils.byteDesc(mss.getDataSize()) +
26962697
" heapSize=" + StringUtils.byteDesc(mss.getHeapSize()) +
26972698
((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") +
@@ -4818,7 +4819,7 @@ private long replayRecoveredEdits(final Path edits,
48184819
for (Cell cell: val.getCells()) {
48194820
// Check this edit is for me. Also, guard against writing the special
48204821
// METACOLUMN info such as HBASE::CACHEFLUSH entries
4821-
if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
4822+
if (WALEdit.isMetaEditFamily(cell)) {
48224823
// if region names don't match, skipp replaying compaction marker
48234824
if (!checkRowWithinBoundary) {
48244825
//this is a special edit, we should handle it

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1928,7 +1928,7 @@ private void startServices() throws IOException {
19281928
healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
19291929
}
19301930

1931-
this.walRoller = new LogRoller(this, this);
1931+
this.walRoller = new LogRoller(this);
19321932
this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf);
19331933
this.procedureResultReporter = new RemoteProcedureResultReporter(this);
19341934

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/**
1+
/*
22
*
33
* Licensed to the Apache Software Foundation (ASF) under one
44
* or more contributor license agreements. See the NOTICE file
@@ -28,7 +28,6 @@
2828
import java.util.concurrent.ConcurrentHashMap;
2929
import java.util.concurrent.ConcurrentMap;
3030
import org.apache.hadoop.hbase.HConstants;
31-
import org.apache.hadoop.hbase.Server;
3231
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
3332
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
3433
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
@@ -56,7 +55,6 @@
5655
public class LogRoller extends HasThread implements Closeable {
5756
private static final Logger LOG = LoggerFactory.getLogger(LogRoller.class);
5857
private final ConcurrentMap<WAL, Boolean> walNeedsRoll = new ConcurrentHashMap<>();
59-
private final Server server;
6058
protected final RegionServerServices services;
6159
private volatile long lastRollTime = System.currentTimeMillis();
6260
// Period to roll log.
@@ -99,16 +97,14 @@ public void requestRollAll() {
9997
}
10098
}
10199

102-
/** @param server */
103-
public LogRoller(final Server server, final RegionServerServices services) {
100+
public LogRoller(RegionServerServices services) {
104101
super("LogRoller");
105-
this.server = server;
106102
this.services = services;
107-
this.rollPeriod = this.server.getConfiguration().
103+
this.rollPeriod = this.services.getConfiguration().
108104
getLong("hbase.regionserver.logroll.period", 3600000);
109-
this.threadWakeFrequency = this.server.getConfiguration().
105+
this.threadWakeFrequency = this.services.getConfiguration().
110106
getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
111-
this.checkLowReplicationInterval = this.server.getConfiguration().getLong(
107+
this.checkLowReplicationInterval = this.services.getConfiguration().getLong(
112108
"hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
113109
}
114110

@@ -144,7 +140,7 @@ private void abort(String reason, Throwable cause) {
144140
LOG.warn("Failed to shutdown wal", e);
145141
}
146142
}
147-
server.abort(reason, cause);
143+
this.services.abort(reason, cause);
148144
}
149145

150146
@Override
@@ -156,7 +152,7 @@ public void run() {
156152
periodic = (now - this.lastRollTime) > this.rollPeriod;
157153
if (periodic) {
158154
// Time for periodic roll, fall through
159-
LOG.debug("Wal roll period {} ms elapsed", this.rollPeriod);
155+
LOG.debug("WAL roll period {} ms elapsed", this.rollPeriod);
160156
} else {
161157
synchronized (this) {
162158
if (walNeedsRoll.values().stream().anyMatch(Boolean::booleanValue)) {
@@ -183,9 +179,9 @@ public void run() {
183179
WAL wal = entry.getKey();
184180
// reset the flag in front to avoid missing roll request before we return from rollWriter.
185181
walNeedsRoll.put(wal, Boolean.FALSE);
186-
// Force the roll if the logroll.period is elapsed or if a roll was requested.
187-
// The returned value is an array of actual region names.
188-
byte[][] regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue());
182+
// Force the roll if the logroll.period is elapsed or if a roll was requested.
183+
// The returned value is an array of actual region names.
184+
byte[][] regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue());
189185
if (regionsToFlush != null) {
190186
for (byte[] r : regionsToFlush) {
191187
scheduleFlush(Bytes.toString(r));

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
242242
private static final class WalProps {
243243

244244
/**
245-
* Map the encoded region name to the highest sequence id. Contain all the regions it has
246-
* entries of
245+
* Map the encoded region name to the highest sequence id.
246+
* <p/>Contains all the regions it has an entry for.
247247
*/
248248
public final Map<byte[], Long> encodedName2HighestSequenceId;
249249

@@ -585,9 +585,9 @@ public int getNumLogFiles() {
585585
}
586586

587587
/**
588-
* If the number of un-archived WAL files is greater than maximum allowed, check the first
589-
* (oldest) WAL file, and returns those regions which should be flushed so that it can be
590-
* archived.
588+
* If the number of un-archived WAL files ('live' WALs) is greater than maximum allowed,
589+
* check the first (oldest) WAL, and return those regions which should be flushed so that
590+
* it can be let-go/'archived'.
591591
* @return regions (encodedRegionNames) to flush in order to archive oldest WAL file.
592592
*/
593593
byte[][] findRegionsToForceFlush() throws IOException {
@@ -860,10 +860,6 @@ public void close() throws IOException {
860860
/**
861861
* updates the sequence number of a specific store. depending on the flag: replaces current seq
862862
* number if the given seq id is bigger, or even if it is lower than existing one
863-
* @param encodedRegionName
864-
* @param familyName
865-
* @param sequenceid
866-
* @param onlyIfGreater
867863
*/
868864
@Override
869865
public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid,
@@ -973,7 +969,7 @@ protected final void postSync(final long timeInNanos, final int handlerSyncs) {
973969
}
974970

975971
protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
976-
WALEdit edits, boolean inMemstore, boolean closeRegion, RingBuffer<RingBufferTruck> ringBuffer)
972+
WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer)
977973
throws IOException {
978974
if (this.closed) {
979975
throw new IOException(
@@ -987,7 +983,7 @@ protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKe
987983
ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
988984
.filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
989985
try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
990-
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, closeRegion, rpcCall);
986+
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
991987
entry.stampRegionSequenceId(we);
992988
ringBuffer.get(txid).load(entry);
993989
} finally {
@@ -1025,13 +1021,13 @@ public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
10251021

10261022
@Override
10271023
public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
1028-
return append(info, key, edits, true, false);
1024+
return append(info, key, edits, true);
10291025
}
10301026

10311027
@Override
1032-
public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean closeRegion)
1028+
public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits)
10331029
throws IOException {
1034-
return append(info, key, edits, false, closeRegion);
1030+
return append(info, key, edits, false);
10351031
}
10361032

10371033
/**
@@ -1055,17 +1051,17 @@ public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean
10551051
* @param key Modified by this call; we add to it this edits region edit/sequence id.
10561052
* @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
10571053
* sequence id that is after all currently appended edits.
1058-
* @param inMemstore Always true except for case where we are writing a region event marker, for
1059-
* example, a compaction completion record into the WAL; in this case the entry is just
1060-
* so we can finish an unfinished compaction -- it is not an edit for memstore.
1061-
* @param closeRegion Whether this is a region close marker, i.e, the last wal edit for this
1062-
* region on this region server. The WAL implementation should remove all the related
1063-
* stuff, for example, the sequence id accounting.
1054+
* @param inMemstore Always true except for case where we are writing a region event meta
1055+
* marker edit, for example, a compaction completion record into the WAL or noting a
1056+
* Region Open event. In these cases the entry is just so we can finish an unfinished
1057+
* compaction after a crash when the new Server reads the WAL on recovery, etc. These
1058+
* transition event 'Markers' do not go via the memstore. When memstore is false,
1059+
* we presume a Marker event edit.
10641060
* @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
10651061
* in it.
10661062
*/
1067-
protected abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore,
1068-
boolean closeRegion) throws IOException;
1063+
protected abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore)
1064+
throws IOException;
10691065

10701066
protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException;
10711067

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -558,9 +558,9 @@ private boolean shouldScheduleConsumer() {
558558
}
559559

560560
@Override
561-
protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore,
562-
boolean closeRegion) throws IOException {
563-
long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, closeRegion,
561+
protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore)
562+
throws IOException {
563+
long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore,
564564
waitingConsumePayloads);
565565
if (shouldScheduleConsumer()) {
566566
consumeExecutor.execute(consumer);

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -431,8 +431,8 @@ protected void doShutdown() throws IOException {
431431

432432
@Override
433433
protected long append(final RegionInfo hri, final WALKeyImpl key, final WALEdit edits,
434-
final boolean inMemstore, boolean closeRegion) throws IOException {
435-
return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, closeRegion,
434+
final boolean inMemstore) throws IOException {
435+
return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore,
436436
disruptor.getRingBuffer());
437437
}
438438

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
/**
4141
* A WAL Entry for {@link AbstractFSWAL} implementation. Immutable.
4242
* A subclass of {@link Entry} that carries extra info across the ring buffer such as
43-
* region sequence id (we want to use this later, just before we write the WAL to ensure region
43+
* region sequenceid (we want to use this later, just before we write the WAL to ensure region
4444
* edits maintain order). The extra info added here is not 'serialized' as part of the WALEdit
4545
* hence marked 'transient' to underline this fact. It also adds mechanism so we can wait on
4646
* the assign of the region sequence id. See #stampRegionSequenceId().
@@ -50,25 +50,40 @@ class FSWALEntry extends Entry {
5050
// The below data members are denoted 'transient' just to highlight these are not persisted;
5151
// they are only in memory and held here while passing over the ring buffer.
5252
private final transient long txid;
53+
54+
/**
55+
* If false, means this is a meta edit written by the hbase system itself. It was not in
56+
* memstore. HBase uses these edit types to note in the log operational transitions such
57+
* as compactions, flushes, or region open/closes.
58+
*/
5359
private final transient boolean inMemstore;
60+
61+
/**
62+
* Set if this is a meta edit and it is of close region type.
63+
*/
5464
private final transient boolean closeRegion;
65+
5566
private final transient RegionInfo regionInfo;
5667
private final transient Set<byte[]> familyNames;
5768
private final transient ServerCall<?> rpcCall;
5869

70+
/**
71+
* @param inMemstore If true, then this is a data edit, one that came from client. If false, it
72+
* is a meta edit made by the hbase system itself and is for the WAL only.
73+
*/
5974
FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit, final RegionInfo regionInfo,
60-
final boolean inMemstore, boolean closeRegion, ServerCall<?> rpcCall) {
75+
final boolean inMemstore, ServerCall<?> rpcCall) {
6176
super(key, edit);
6277
this.inMemstore = inMemstore;
63-
this.closeRegion = closeRegion;
78+
this.closeRegion = !inMemstore && edit.isRegionCloseMarker();
6479
this.regionInfo = regionInfo;
6580
this.txid = txid;
6681
if (inMemstore) {
6782
// construct familyNames here to reduce the work of log sinker.
6883
Set<byte[]> families = edit.getFamilies();
6984
this.familyNames = families != null ? families : collectFamilies(edit.getCells());
7085
} else {
71-
this.familyNames = Collections.<byte[]> emptySet();
86+
this.familyNames = Collections.emptySet();
7287
}
7388
this.rpcCall = rpcCall;
7489
if (rpcCall != null) {
@@ -83,7 +98,7 @@ static Set<byte[]> collectFamilies(List<Cell> cells) {
8398
} else {
8499
Set<byte[]> set = new TreeSet<>(Bytes.BYTES_COMPARATOR);
85100
for (Cell cell: cells) {
86-
if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
101+
if (!WALEdit.isMetaEditFamily(cell)) {
87102
set.add(CellUtil.cloneFamily(cell));
88103
}
89104
}
@@ -94,7 +109,7 @@ static Set<byte[]> collectFamilies(List<Cell> cells) {
94109
@Override
95110
public String toString() {
96111
return "sequence=" + this.txid + ", " + super.toString();
97-
};
112+
}
98113

99114
boolean isInMemStore() {
100115
return this.inMemstore;

0 commit comments

Comments
 (0)