From adc9fcce82a626cb909ab7152186b3a7dd2a9bb9 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Wed, 18 Sep 2024 23:29:12 +0800 Subject: [PATCH] HBASE-28850 Only return from ReplicationSink.replicationEntries while all background tasks are finished --- .../regionserver/ReplicationSink.java | 21 +++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index ff8adfceec0b..508ace390565 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -492,16 +492,33 @@ private void batch(TableName tableName, Collection> allRows, int batch } futures.addAll(batchRows.stream().map(table::batchAll).collect(Collectors.toList())); } + // Here we will always wait until all futures are finished, even if there are failures when + // getting from a future in the middle. This is because this method may be called in a rpc call, + // so the batch operations may reference some off heap cells(through CellScanner). If we return + // earlier here, the rpc call may be finished and they will release the off heap cells before + // some of the batch operations finish, and then cause corrupt data or even crash the region + // server. See HBASE-28584 and HBASE-28850 for more details. + IOException error = null; for (Future future : futures) { try { FutureUtils.get(future); } catch (RetriesExhaustedException e) { + IOException ioe; if (e.getCause() instanceof TableNotFoundException) { - throw new TableNotFoundException("'" + tableName + "'"); + ioe = new TableNotFoundException("'" + tableName + "'"); + } else { + ioe = e; + } + if (error == null) { + error = ioe; + } else { + error.addSuppressed(ioe); } - throw e; } } + if (error != null) { + throw error; + } } private AsyncClusterConnection getConnection() throws IOException {