From 710c3eb116dbe5a95c772039a2d12da6102ff0ed Mon Sep 17 00:00:00 2001 From: Jaehui-Lee Date: Tue, 21 Oct 2025 17:16:32 +0900 Subject: [PATCH] Bidirectional bulkload replication causes excessive network traffic --- .../hbase/shaded/protobuf/ProtobufUtil.java | 8 +--- .../src/main/protobuf/server/region/WAL.proto | 1 - .../hadoop/hbase/regionserver/HRegion.java | 8 +++- .../hbase/regionserver/RSRpcServices.java | 5 -- .../hbase/regionserver/wal/WALUtil.java | 46 +++++++++++-------- .../regionserver/ReplicationSink.java | 5 +- .../apache/hadoop/hbase/wal/WALKeyImpl.java | 6 +-- 7 files changed, 41 insertions(+), 38 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 60175137ad2c..474dc5a493ff 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -2676,20 +2676,16 @@ public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableN ByteString encodedRegionName, Map> storeFiles, Map storeFilesSize, long bulkloadSeqId) { return toBulkLoadDescriptor(tableName, encodedRegionName, storeFiles, storeFilesSize, - bulkloadSeqId, null, true); + bulkloadSeqId, true); } public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName, ByteString encodedRegionName, Map> storeFiles, - Map storeFilesSize, long bulkloadSeqId, List clusterIds, - boolean replicate) { + Map storeFilesSize, long bulkloadSeqId, boolean replicate) { BulkLoadDescriptor.Builder desc = BulkLoadDescriptor.newBuilder().setTableName(ProtobufUtil.toProtoTableName(tableName)) .setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId) .setReplicate(replicate); - if (clusterIds != null) { - desc.addAllClusterIds(clusterIds); - } for (Map.Entry> entry : storeFiles.entrySet()) { WALProtos.StoreDescriptor.Builder builder = diff --git a/hbase-protocol-shaded/src/main/protobuf/server/region/WAL.proto b/hbase-protocol-shaded/src/main/protobuf/server/region/WAL.proto index ba12dcf3edfc..9b5929ee9ca4 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/region/WAL.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/region/WAL.proto @@ -152,7 +152,6 @@ message BulkLoadDescriptor { required bytes encoded_region_name = 2; repeated StoreDescriptor stores = 3; required int64 bulkload_seq_num = 4; - repeated string cluster_ids = 5; optional bool replicate = 6 [default = true]; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 9b7daee0f668..dc6df82771ad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.REGION_NAMES_KEY; import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.ROW_LOCK_READ_LOCK_KEY; import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; +import static org.apache.hadoop.hbase.wal.WALKey.EMPTY_UUIDS; import com.google.errorprone.annotations.RestrictedApi; import edu.umd.cs.findbugs.annotations.Nullable; @@ -3056,7 +3057,7 @@ private void attachRegionReplicationToFlushOpSeqIdMVCCEntry(WriteEntry flushOpSe assert !flushOpSeqIdMVCCEntry.getCompletionAction().isPresent(); WALEdit flushMarkerWALEdit = WALEdit.createFlushWALEdit(getRegionInfo(), desc); WALKeyImpl walKey = - WALUtil.createWALKey(getRegionInfo(), mvcc, this.getReplicationScope(), null); + WALUtil.createWALKey(getRegionInfo(), EMPTY_UUIDS, mvcc, this.getReplicationScope(), null); walKey.setWriteEntry(flushOpSeqIdMVCCEntry); /** * Here the {@link ServerCall} is null for {@link RegionReplicationSink#add} because the @@ -7557,8 +7558,11 @@ public Map> bulkLoadHFiles(Collection> f WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(), UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()), - storeFiles, storeFilesSizes, seqId, clusterIds, replicate); + storeFiles, storeFilesSizes, seqId, replicate); WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(), + clusterIds == null + ? EMPTY_UUIDS + : clusterIds.stream().map(UUID::fromString).collect(Collectors.toList()), loadDescriptor, mvcc, regionReplicationSink.orElse(null)); } catch (IOException ioe) { if (this.rsServices != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index fdfea375e096..dbff02ee51a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -2338,11 +2338,6 @@ public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller, final BulkLoadHFileRequest request) throws ServiceException { long start = EnvironmentEdgeManager.currentTime(); List clusterIds = new ArrayList<>(request.getClusterIdsList()); - if (clusterIds.contains(this.server.getClusterId())) { - return BulkLoadHFileResponse.newBuilder().setLoaded(true).build(); - } else { - clusterIds.add(this.server.getClusterId()); - } try { checkOpen(); requestCount.increment(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java index d9c9a10a163b..4c16af38b434 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java @@ -18,12 +18,15 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL; +import static org.apache.hadoop.hbase.wal.WALKey.EMPTY_UUIDS; import java.io.IOException; import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; +import java.util.UUID; import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -78,8 +81,8 @@ private WALUtil() { public static WALKeyImpl writeCompactionMarker(WAL wal, NavigableMap replicationScope, RegionInfo hri, final CompactionDescriptor c, MultiVersionConcurrencyControl mvcc, RegionReplicationSink sink) throws IOException { - WALKeyImpl walKey = - writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc, null, sink); + WALKeyImpl walKey = writeMarker(wal, replicationScope, hri, EMPTY_UUIDS, + WALEdit.createCompaction(hri, c), mvcc, null, sink); if (LOG.isTraceEnabled()) { LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c)); } @@ -94,7 +97,7 @@ public static WALKeyImpl writeCompactionMarker(WAL wal, public static WALKeyImpl writeFlushMarker(WAL wal, NavigableMap replicationScope, RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc, RegionReplicationSink sink) throws IOException { - WALKeyImpl walKey = doFullMarkerAppendTransaction(wal, replicationScope, hri, + WALKeyImpl walKey = doFullMarkerAppendTransaction(wal, replicationScope, hri, EMPTY_UUIDS, WALEdit.createFlushWALEdit(hri, f), mvcc, null, sync, sink); if (LOG.isTraceEnabled()) { LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f)); @@ -109,7 +112,7 @@ public static WALKeyImpl writeFlushMarker(WAL wal, NavigableMap public static WALKeyImpl writeRegionEventMarker(WAL wal, NavigableMap replicationScope, RegionInfo hri, RegionEventDescriptor r, MultiVersionConcurrencyControl mvcc, RegionReplicationSink sink) throws IOException { - WALKeyImpl walKey = writeMarker(wal, replicationScope, hri, + WALKeyImpl walKey = writeMarker(wal, replicationScope, hri, EMPTY_UUIDS, WALEdit.createRegionEventWALEdit(hri, r), mvcc, null, sink); if (LOG.isTraceEnabled()) { LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r)); @@ -130,9 +133,10 @@ public static WALKeyImpl writeRegionEventMarker(WAL wal, */ public static WALKeyImpl writeBulkLoadMarkerAndSync(final WAL wal, final NavigableMap replicationScope, final RegionInfo hri, - final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc, - final RegionReplicationSink sink) throws IOException { - WALKeyImpl walKey = writeMarker(wal, replicationScope, hri, + final List clusterIds, final WALProtos.BulkLoadDescriptor desc, + final MultiVersionConcurrencyControl mvcc, final RegionReplicationSink sink) + throws IOException { + WALKeyImpl walKey = writeMarker(wal, replicationScope, hri, clusterIds, WALEdit.createBulkLoadEvent(hri, desc), mvcc, null, sink); if (LOG.isTraceEnabled()) { LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc)); @@ -141,12 +145,13 @@ public static WALKeyImpl writeBulkLoadMarkerAndSync(final WAL wal, } private static WALKeyImpl writeMarker(final WAL wal, - final NavigableMap replicationScope, final RegionInfo hri, final WALEdit edit, - final MultiVersionConcurrencyControl mvcc, final Map extendedAttributes, - final RegionReplicationSink sink) throws IOException { + final NavigableMap replicationScope, final RegionInfo hri, + final List clusterIds, final WALEdit edit, final MultiVersionConcurrencyControl mvcc, + final Map extendedAttributes, final RegionReplicationSink sink) + throws IOException { // If sync == true in below, then timeout is not used; safe to pass UNSPECIFIED_TIMEOUT - return doFullMarkerAppendTransaction(wal, replicationScope, hri, edit, mvcc, extendedAttributes, - true, sink); + return doFullMarkerAppendTransaction(wal, replicationScope, hri, clusterIds, edit, mvcc, + extendedAttributes, true, sink); } /** @@ -158,11 +163,12 @@ private static WALKeyImpl writeMarker(final WAL wal, * @return WALKeyImpl that was added to the WAL. */ private static WALKeyImpl doFullMarkerAppendTransaction(final WAL wal, - final NavigableMap replicationScope, final RegionInfo hri, final WALEdit edit, - final MultiVersionConcurrencyControl mvcc, final Map extendedAttributes, - final boolean sync, final RegionReplicationSink sink) throws IOException { + final NavigableMap replicationScope, final RegionInfo hri, + final List clusterIds, final WALEdit edit, final MultiVersionConcurrencyControl mvcc, + final Map extendedAttributes, final boolean sync, + final RegionReplicationSink sink) throws IOException { // TODO: Pass in current time to use? - WALKeyImpl walKey = createWALKey(hri, mvcc, replicationScope, extendedAttributes); + WALKeyImpl walKey = createWALKey(hri, clusterIds, mvcc, replicationScope, extendedAttributes); long trx = MultiVersionConcurrencyControl.NONE; try { trx = wal.appendMarker(hri, walKey, edit); @@ -191,11 +197,11 @@ private static WALKeyImpl doFullMarkerAppendTransaction(final WAL wal, return walKey; } - public static WALKeyImpl createWALKey(final RegionInfo hri, MultiVersionConcurrencyControl mvcc, - final NavigableMap replicationScope, + public static WALKeyImpl createWALKey(final RegionInfo hri, List clusterIds, + MultiVersionConcurrencyControl mvcc, final NavigableMap replicationScope, final Map extendedAttributes) { return new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), - EnvironmentEdgeManager.currentTime(), mvcc, replicationScope, extendedAttributes); + EnvironmentEdgeManager.currentTime(), clusterIds, mvcc, replicationScope, extendedAttributes); } /** @@ -246,7 +252,7 @@ public static void writeReplicationMarkerAndSync(WAL wal, MultiVersionConcurrenc RegionInfo regionInfo, byte[] rowKey, long timestamp) throws IOException { NavigableMap replicationScope = new TreeMap<>(Bytes.BYTES_COMPARATOR); replicationScope.put(WALEdit.METAFAMILY, REPLICATION_SCOPE_GLOBAL); - writeMarker(wal, replicationScope, regionInfo, + writeMarker(wal, replicationScope, regionInfo, EMPTY_UUIDS, WALEdit.createReplicationMarkerEdit(rowKey, timestamp), mvcc, null, null); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 508ace390565..d420785e5025 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -245,8 +245,11 @@ public void replicateEntries(List entries, final ExtendedCellScanner c } // Map of table name Vs list of pair of family and list of // hfile paths from its namespace + + List clusterIds = entry.getKey().getClusterIdsList().stream() + .map(k -> toUUID(k).toString()).collect(Collectors.toList()); Map>>> bulkLoadHFileMap = - bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>()); + bulkLoadsPerClusters.computeIfAbsent(clusterIds, k -> new HashMap<>()); buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld); } } else if (CellUtil.matchingQualifier(cell, WALEdit.REPLICATION_MARKER)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java index cab96fe0dd1b..d83863683da4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java @@ -161,9 +161,9 @@ public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, fin } public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, final long now, - MultiVersionConcurrencyControl mvcc, final NavigableMap replicationScope, - Map extendedAttributes) { - init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE, + List clusterIds, MultiVersionConcurrencyControl mvcc, + final NavigableMap replicationScope, Map extendedAttributes) { + init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, HConstants.NO_NONCE, HConstants.NO_NONCE, mvcc, replicationScope, extendedAttributes); }