From 2707ce7f135169bf268477359b9f13b5f2f32e64 Mon Sep 17 00:00:00 2001 From: huiruan <876107431@qq.com> Date: Wed, 16 Aug 2023 23:41:36 +0800 Subject: [PATCH 1/6] HBASE-26867 Introduce a FlushProcedure --- .../org/apache/hadoop/hbase/client/Admin.java | 22 ++ .../hadoop/hbase/client/AsyncAdmin.java | 8 + .../hadoop/hbase/client/AsyncHBaseAdmin.java | 5 + .../client/ConnectionImplementation.java | 6 + .../hadoop/hbase/client/HBaseAdmin.java | 73 +++++- .../hbase/client/RawAsyncHBaseAdmin.java | 62 ++++- .../client/ShortCircuitMasterConnection.java | 7 + .../shaded/protobuf/RequestConverter.java | 13 + .../org/apache/hadoop/hbase/util/Strings.java | 5 + .../src/main/protobuf/Master.proto | 15 ++ .../src/main/protobuf/MasterProcedure.proto | 20 ++ .../hadoop/hbase/executor/EventType.java | 8 +- .../hadoop/hbase/executor/ExecutorType.java | 3 +- .../apache/hadoop/hbase/master/HMaster.java | 31 +++ .../hbase/master/MasterRpcServices.java | 19 ++ .../hadoop/hbase/master/MasterServices.java | 11 + .../procedure/FlushRegionProcedure.java | 238 ++++++++++++++++++ .../master/procedure/FlushTableProcedure.java | 199 +++++++++++++++ .../procedure/TableProcedureInterface.java | 1 + .../hbase/master/procedure/TableQueue.java | 1 + .../flush/FlushTableSubprocedure.java | 20 +- .../MasterFlushTableProcedureManager.java | 4 + ...egionServerFlushTableProcedureManager.java | 20 +- .../regionserver/FlushRegionCallable.java | 83 ++++++ .../hbase/regionserver/HRegionServer.java | 4 + .../hbase/master/MockNoopMasterServices.java | 6 + .../procedure/TestFlushTableProcedure.java | 69 +++++ .../TestFlushTableProcedureBase.java | 97 +++++++ ...TestFlushTableProcedureMasterRestarts.java | 76 ++++++ ...edureWithDoNotSupportFlushTableMaster.java | 82 ++++++ .../TestFlushWithThroughputController.java | 10 +- .../hbase/thrift2/client/ThriftAdmin.java | 10 + 32 files changed, 1195 insertions(+), 33 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushTableProcedure.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedure.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureBase.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureMasterRestarts.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureWithDoNotSupportFlushTableMaster.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index e00f0c1ca176..536941495512 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -886,6 +886,28 @@ boolean closeRegionWithEncodedRegionName(String encodedRegionName, String server */ void flush(TableName tableName, byte[] columnFamily) throws IOException; + /** + * Flush the specified column family stores on all regions of the passed table. This runs as a + * synchronous operation. + * @param tableName table to flush + * @param columnFamilies column families within a table + * @throws IOException if a remote or network exception occurs + */ + void flush(TableName tableName, List columnFamilies) throws IOException; + + /** + * Flush a table but does not block and wait for it to finish. You can use Future.get(long, + * TimeUnit) to wait on the operation to complete. It may throw ExecutionException if there was an + * error while executing the operation or TimeoutException in case the wait timeout was not long + * enough to allow the operation to complete. + * @param tableName table to flush + * @param columnFamilies column families within a table + * @return the result of the async creation. You can use Future.get(long, TimeUnit) to wait on the + * operation to complete. + * @throws IOException if a remote or network exception occurs + */ + Future flushAsync(TableName tableName, List columnFamilies) throws IOException; + /** * Flush an individual region. Synchronous operation. * @param regionName region to flush diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 913350b1d172..9509a888cc57 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -354,6 +354,14 @@ CompletableFuture modifyColumnFamilyStoreFileTracker(TableName tableName, */ CompletableFuture flush(TableName tableName, byte[] columnFamily); + /** + * Flush the specified column family stores on all regions of the passed table. This runs as a + * synchronous operation. + * @param tableName table to flush + * @param columnFamilies column families within a table + */ + CompletableFuture flush(TableName tableName, List columnFamilies); + /** * Flush an individual region. * @param regionName region to flush diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index ce604d90b2ec..442fb9115563 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -269,6 +269,11 @@ public CompletableFuture flush(TableName tableName, byte[] columnFamily) { return wrap(rawAdmin.flush(tableName, columnFamily)); } + @Override + public CompletableFuture flush(TableName tableName, List columnFamilies) { + return wrap(rawAdmin.flush(tableName, columnFamilies)); + } + @Override public CompletableFuture flushRegion(byte[] regionName) { return wrap(rawAdmin.flushRegion(regionName)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 7d8e7038e869..80f7a7959f12 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -1847,6 +1847,12 @@ public MasterProtos.GetTableNamesResponse getTableNames(RpcController controller return stub.getTableNames(controller, request); } + @Override + public MasterProtos.FlushTableResponse flushTable(RpcController controller, + MasterProtos.FlushTableRequest request) throws ServiceException { + return stub.flushTable(controller, request); + } + @Override public MasterProtos.ListTableNamesByStateResponse listTableNamesByState( RpcController controller, MasterProtos.ListTableNamesByStateRequest request) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 27952e39e23d..63ab1651d994 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.util.FutureUtils.get; + import com.google.protobuf.Descriptors; import com.google.protobuf.Message; import com.google.protobuf.RpcController; @@ -68,6 +70,7 @@ import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotDisabledException; +import org.apache.hadoop.hbase.TableNotEnabledException; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.UnknownRegionException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; @@ -100,6 +103,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Strings; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.util.StringUtils; import org.apache.yetus.audience.InterfaceAudience; @@ -164,6 +168,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest; @@ -1243,19 +1249,72 @@ public List getOnlineRegions(final ServerName sn) throws IOExceptio @Override public void flush(final TableName tableName) throws IOException { - flush(tableName, null); + flush(tableName, Collections.emptyList()); } @Override public void flush(final TableName tableName, byte[] columnFamily) throws IOException { - checkTableExists(tableName); - if (isTableDisabled(tableName)) { - LOG.info("Table is disabled: " + tableName.getNameAsString()); - return; + flush(tableName, Collections.singletonList(columnFamily)); + } + + @Override + public void flush(TableName tableName, List columnFamilyList) throws IOException { + // check if the table exists and enabled + if (!isTableEnabled(tableName)) { + throw new TableNotEnabledException(tableName.getNameAsString()); } + + List columnFamilies = columnFamilyList.stream() + .filter(cf -> cf != null && cf.length > 0).distinct().collect(Collectors.toList()); + + try { + get(flushAsync(tableName, columnFamilies), getSyncWaitTimeout(), TimeUnit.MILLISECONDS); + } catch (DoNotRetryIOException e) { + // This is for keeping compatibility with old implementation. + // usually the exception caused by the method is not present on the server or + // the hbase hadoop version does not match the running hadoop version or + // the FlushTableProcedure is disabled, if that happens, we need fall back + // to the old flush implementation. + legacyFlush(tableName, columnFamilies); + } + } + + @Override + public Future flushAsync(TableName tableName, List columnFamilies) + throws IOException { + FlushTableResponse resp = executeCallable( + new MasterCallable(getConnection(), getRpcControllerFactory()) { + final long nonceGroup = ng.getNonceGroup(); + final long nonce = ng.newNonce(); + + @Override + protected FlushTableResponse rpcCall() throws Exception { + FlushTableRequest request = + RequestConverter.buildFlushTableRequest(tableName, columnFamilies, nonceGroup, nonce); + return master.flushTable(getRpcController(), request); + } + }); + return new FlushTableFuture(this, tableName, resp); + } + + private static class FlushTableFuture extends TableFuture { + + public FlushTableFuture(final HBaseAdmin admin, final TableName tableName, + final FlushTableResponse resp) { + super(admin, tableName, (resp != null && resp.hasProcId()) ? resp.getProcId() : null); + } + + @Override + public String getOperationType() { + return "FLUSH"; + } + } + + private void legacyFlush(TableName tableName, List columnFamilies) throws IOException { Map props = new HashMap<>(); - if (columnFamily != null) { - props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily)); + if (!columnFamilies.isEmpty()) { + props.put(HConstants.FAMILY_KEY_STR, Strings.JOINER + .join(columnFamilies.stream().map(Bytes::toString).collect(Collectors.toList()))); } execProcedure("flush-table-proc", tableName.getNameAsString(), props); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index d413905c04af..9231c423ce60 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.ClusterMetrics.Option; import org.apache.hadoop.hbase.ClusterMetricsBuilder; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.MetaTableAccessor; @@ -95,6 +96,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ForeignExceptionUtil; +import org.apache.hadoop.hbase.util.Strings; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -176,6 +178,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; @@ -949,12 +953,50 @@ public CompletableFuture> getRegions(TableName tableName) { @Override public CompletableFuture flush(TableName tableName) { - return flush(tableName, null); + return flush(tableName, Collections.emptyList()); } @Override public CompletableFuture flush(TableName tableName, byte[] columnFamily) { + return flush(tableName, Collections.singletonList(columnFamily)); + } + + @Override + public CompletableFuture flush(TableName tableName, List columnFamilyList) { + // This is for keeping compatibility with old implementation. + // If the server version is lower than the client version, it's possible that the + // flushTable method is not present in the server side, if so, we need to fall back + // to the old implementation. + List columnFamilies = columnFamilyList.stream() + .filter(cf -> cf != null && cf.length > 0).distinct().collect(Collectors.toList()); + FlushTableRequest request = RequestConverter.buildFlushTableRequest(tableName, columnFamilies, + ng.getNonceGroup(), ng.newNonce()); + CompletableFuture procFuture = this. procedureCall( + tableName, request, (s, c, req, done) -> s.flushTable(c, req, done), + (resp) -> resp.getProcId(), new FlushTableProcedureBiConsumer(tableName)); CompletableFuture future = new CompletableFuture<>(); + addListener(procFuture, (ret, error) -> { + if (error != null) { + if (error instanceof TableNotFoundException || error instanceof TableNotEnabledException) { + future.completeExceptionally(error); + } else if (error instanceof DoNotRetryIOException) { + // usually this is caused by the method is not present on the server or + // the hbase hadoop version does not match the running hadoop version. + // if that happens, we need fall back to the old flush implementation. + LOG.info("Unrecoverable error in master side. Fallback to FlushTableProcedure V1", error); + legacyFlush(future, tableName, columnFamilies); + } else { + future.completeExceptionally(error); + } + } else { + future.complete(ret); + } + }); + return future; + } + + private void legacyFlush(CompletableFuture future, TableName tableName, + List columnFamilies) { addListener(tableExists(tableName), (exists, err) -> { if (err != null) { future.completeExceptionally(err); @@ -968,8 +1010,9 @@ public CompletableFuture flush(TableName tableName, byte[] columnFamily) { future.completeExceptionally(new TableNotEnabledException(tableName)); } else { Map props = new HashMap<>(); - if (columnFamily != null) { - props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily)); + if (columnFamilies != null && !columnFamilies.isEmpty()) { + props.put(HConstants.FAMILY_KEY_STR, Strings.JOINER + .join(columnFamilies.stream().map(Bytes::toString).collect(Collectors.toList()))); } addListener( execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), props), @@ -984,7 +1027,6 @@ public CompletableFuture flush(TableName tableName, byte[] columnFamily) { }); } }); - return future; } @Override @@ -2736,6 +2778,18 @@ String getOperationType() { } } + private static class FlushTableProcedureBiConsumer extends TableProcedureBiConsumer { + + FlushTableProcedureBiConsumer(TableName tableName) { + super(tableName); + } + + @Override + String getOperationType() { + return "FLUSH"; + } + } + private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { CreateNamespaceProcedureBiConsumer(String namespaceName) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java index a4b361b20915..ea8747420419 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest; @@ -495,6 +496,12 @@ public GetNamespaceDescriptorResponse getNamespaceDescriptor(RpcController contr return stub.getNamespaceDescriptor(controller, request); } + @Override + public MasterProtos.FlushTableResponse flushTable(RpcController controller, + MasterProtos.FlushTableRequest request) throws ServiceException { + return stub.flushTable(controller, request); + } + @Override public ListNamespacesResponse listNamespaces(RpcController controller, ListNamespacesRequest request) throws ServiceException { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 9bd29f37ff96..d6956f4e9df3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -116,6 +116,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest; @@ -1907,4 +1908,16 @@ public static ClearSlowLogResponseRequest buildClearSlowLogResponseRequest() { return ClearSlowLogResponseRequest.newBuilder().build(); } + public static FlushTableRequest buildFlushTableRequest(final TableName tableName, + final List columnFamilies, final long nonceGroup, final long nonce) { + FlushTableRequest.Builder builder = FlushTableRequest.newBuilder(); + builder.setTableName(ProtobufUtil.toProtoTableName(tableName)); + if (!columnFamilies.isEmpty()) { + for (byte[] columnFamily : columnFamilies) { + builder.addColumnFamily(UnsafeByteOperations.unsafeWrap(columnFamily)); + } + } + return builder.setNonceGroup(nonceGroup).setNonce(nonce).build(); + } + } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Strings.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Strings.java index cdf5bf63fb59..d859290566b5 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Strings.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Strings.java @@ -20,6 +20,9 @@ import org.apache.commons.lang3.StringUtils; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.base.Joiner; +import org.apache.hbase.thirdparty.com.google.common.base.Splitter; + /** * Utility for Strings. */ @@ -27,6 +30,8 @@ public final class Strings { public static final String DEFAULT_SEPARATOR = "="; public static final String DEFAULT_KEYVALUE_SEPARATOR = ", "; + public static final Joiner JOINER = Joiner.on(","); + public static final Splitter SPLITTER = Splitter.on(","); private Strings() { } diff --git a/hbase-protocol-shaded/src/main/protobuf/Master.proto b/hbase-protocol-shaded/src/main/protobuf/Master.proto index be59566d73af..6ca065dfc942 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Master.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Master.proto @@ -199,6 +199,17 @@ message ModifyTableResponse { optional uint64 proc_id = 1; } +message FlushTableRequest { + required TableName table_name = 1; + repeated bytes column_family = 2; + optional uint64 nonce_group = 3 [default = 0]; + optional uint64 nonce = 4 [default = 0]; +} + +message FlushTableResponse { + optional uint64 proc_id = 1; +} + /* Namespace-level protobufs */ message CreateNamespaceRequest { @@ -872,6 +883,10 @@ service MasterService { rpc ModifyTable(ModifyTableRequest) returns(ModifyTableResponse); + /** Flush a table's memstore */ + rpc FlushTable(FlushTableRequest) + returns(FlushTableResponse); + /** Creates a new table asynchronously */ rpc CreateTable(CreateTableRequest) returns(CreateTableResponse); diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index 25243596f105..90d0341df648 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -191,6 +191,26 @@ message RestoreParentToChildRegionsPair { required string child2_region_name = 3; } +enum FlushTableState { + FLUSH_TABLE_PREPARE = 1; + FLUSH_TABLE_FLUSH_REGIONS = 2; +} + +message FlushTableProcedureStateData { + required TableName table_name = 1; + repeated bytes column_family = 2; +} + +message FlushRegionProcedureStateData { + required RegionInfo region = 1; + repeated bytes column_family = 2; +} + +message FlushRegionParameter { + required RegionInfo region = 1; + repeated bytes column_family = 2; +} + enum SnapshotState { SNAPSHOT_PREPARE = 1; SNAPSHOT_PRE_OPERATION = 2; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java index 68a61157b8ad..2a6d8e467965 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java @@ -285,7 +285,13 @@ public enum EventType { * RS verify snapshot.
* RS_VERIFY_SNAPSHOT */ - RS_VERIFY_SNAPSHOT(88, ExecutorType.RS_SNAPSHOT_OPERATIONS); + RS_VERIFY_SNAPSHOT(88, ExecutorType.RS_SNAPSHOT_OPERATIONS), + + /** + * RS flush regions.
+ * RS_FLUSH_OPERATIONS + */ + RS_FLUSH_REGIONS(89, ExecutorType.RS_FLUSH_OPERATIONS); private final int code; private final ExecutorType executor; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java index 320842985d4e..feb703a4e583 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java @@ -53,7 +53,8 @@ public enum ExecutorType { RS_SWITCH_RPC_THROTTLE(33), RS_IN_MEMORY_COMPACTION(34), RS_CLAIM_REPLICATION_QUEUE(35), - RS_SNAPSHOT_OPERATIONS(36); + RS_SNAPSHOT_OPERATIONS(36), + RS_FLUSH_OPERATIONS(37); ExecutorType(int value) { } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index e56258208740..d867c377d0c0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -151,6 +151,7 @@ import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure; import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure; import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure; +import org.apache.hadoop.hbase.master.procedure.FlushTableProcedure; import org.apache.hadoop.hbase.master.procedure.InitMetaProcedure; import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; @@ -4252,4 +4253,34 @@ private void initializeCoprocessorHost(Configuration conf) { // initialize master side coprocessors before we start handling requests this.cpHost = new MasterCoprocessorHost(this, conf); } + + @Override + public long flushTable(TableName tableName, List columnFamilies, long nonceGroup, + long nonce) throws IOException { + checkInitialized(); + + if ( + !getConfiguration().getBoolean(MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED, + MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED_DEFAULT) + ) { + throw new DoNotRetryIOException("FlushTableProcedureV2 is DISABLED"); + } + + return MasterProcedureUtil + .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { + @Override + protected void run() throws IOException { + getMaster().getMasterCoprocessorHost().preTableFlush(tableName); + LOG.info(getClientIdAuditPrefix() + " flush " + tableName); + submitProcedure( + new FlushTableProcedure(procedureExecutor.getEnvironment(), tableName, columnFamilies)); + getMaster().getMasterCoprocessorHost().postTableFlush(tableName); + } + + @Override + protected String getDescription() { + return "FlushTableProcedure"; + } + }); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index d61ad10bc937..468a477288c4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -205,6 +205,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; @@ -3100,4 +3102,21 @@ public GetLiveRegionServersResponse getLiveRegionServers(RpcController controlle .forEach(builder::addServer); return builder.build(); } + + @Override + public FlushTableResponse flushTable(RpcController controller, FlushTableRequest req) + throws ServiceException { + TableName tableName = ProtobufUtil.toTableName(req.getTableName()); + List columnFamilies = req.getColumnFamilyCount() > 0 + ? req.getColumnFamilyList().stream().filter(cf -> !cf.isEmpty()).map(ByteString::toByteArray) + .collect(Collectors.toList()) + : null; + try { + long procId = + master.flushTable(tableName, columnFamilies, req.getNonceGroup(), req.getNonce()); + return FlushTableResponse.newBuilder().setProcId(procId).build(); + } catch (IOException ioe) { + throw new ServiceException(ioe); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 279a3e1b48a9..d465a4a67279 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -437,4 +437,15 @@ boolean normalizeRegions(final NormalizeTableFilterParams ntfp, final boolean is * Flush master local region */ void flushMasterStore() throws IOException; + + /** + * Flush an existing table + * @param tableName The table name + * @param columnFamilies The column families to flush + * @param nonceGroup the nonce group + * @param nonce the nonce + * @return the flush procedure id + */ + long flushTable(final TableName tableName, final List columnFamilies, + final long nonceGroup, final long nonce) throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java new file mode 100644 index 000000000000..67f0442b618a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.master.RegionState.State; +import org.apache.hadoop.hbase.master.assignment.RegionStateNode; +import org.apache.hadoop.hbase.master.assignment.RegionStates; +import org.apache.hadoop.hbase.master.assignment.ServerState; +import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; +import org.apache.hadoop.hbase.regionserver.FlushRegionCallable; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; +import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushRegionParameter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushRegionProcedureStateData; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; + +@InterfaceAudience.Private +public class FlushRegionProcedure extends Procedure + implements TableProcedureInterface, RemoteProcedure { + private static final Logger LOG = LoggerFactory.getLogger(FlushRegionProcedure.class); + + private RegionInfo region; + private List columnFamilies; + private ProcedureEvent event; + private boolean dispatched; + private boolean succ; + private RetryCounter retryCounter; + + public FlushRegionProcedure() { + } + + public FlushRegionProcedure(RegionInfo region) { + this(region, null); + } + + public FlushRegionProcedure(RegionInfo region, List columnFamilies) { + this.region = region; + this.columnFamilies = columnFamilies; + } + + @Override + protected Procedure[] execute(MasterProcedureEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + if (dispatched) { + if (succ) { + return null; + } + dispatched = false; + } + + RegionStates regionStates = env.getAssignmentManager().getRegionStates(); + RegionStateNode regionNode = regionStates.getRegionStateNode(region); + regionNode.lock(); + try { + if (!regionNode.isInState(State.OPEN) || regionNode.isInTransition()) { + LOG.info("State of region {} is not OPEN or in transition. Skip {} ...", region, this); + return null; + } + ServerName targetServer = regionNode.getRegionLocation(); + if (targetServer == null) { + setTimeoutForSuspend(env, + String.format("target server of region %s is null", region.getRegionNameAsString())); + throw new ProcedureSuspendedException(); + } + ServerState serverState = regionStates.getServerNode(targetServer).getState(); + if (serverState != ServerState.ONLINE) { + setTimeoutForSuspend(env, String.format("target server of region %s %s is in state %s", + region.getRegionNameAsString(), targetServer, serverState)); + throw new ProcedureSuspendedException(); + } + try { + env.getRemoteDispatcher().addOperationToNode(targetServer, this); + dispatched = true; + event = new ProcedureEvent<>(this); + event.suspendIfNotReady(this); + throw new ProcedureSuspendedException(); + } catch (FailedRemoteDispatchException e) { + setTimeoutForSuspend(env, "Failed send request to " + targetServer); + throw new ProcedureSuspendedException(); + } + } finally { + regionNode.unlock(); + } + } + + @Override + protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { + setState(ProcedureProtos.ProcedureState.RUNNABLE); + env.getProcedureScheduler().addFront(this); + return false; + } + + @Override + protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + protected boolean abort(MasterProcedureEnv env) { + return false; + } + + @Override + public void remoteCallFailed(MasterProcedureEnv env, ServerName serverName, IOException e) { + complete(env, e); + } + + @Override + public void remoteOperationCompleted(MasterProcedureEnv env) { + complete(env, null); + } + + @Override + public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) { + complete(env, error); + } + + private void complete(MasterProcedureEnv env, Throwable error) { + if (isFinished()) { + LOG.info("This procedure {} is already finished, skip the rest processes", this.getProcId()); + return; + } + if (event == null) { + LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery", + getProcId()); + return; + } + if (error == null) { + succ = true; + } + event.wake(env.getProcedureScheduler()); + event = null; + } + + private void setTimeoutForSuspend(MasterProcedureEnv env, String reason) { + if (retryCounter == null) { + retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration()); + } + long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); + LOG.warn("{} can not run currently because {}, wait {} ms to retry", this, reason, backoff); + setTimeout(Math.toIntExact(backoff)); + setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); + skipPersistence(); + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + FlushRegionProcedureStateData.Builder builder = FlushRegionProcedureStateData.newBuilder(); + builder.setRegion(ProtobufUtil.toRegionInfo(region)); + if (columnFamilies != null) { + for (byte[] columnFamily : columnFamilies) { + if (columnFamily != null && columnFamily.length > 0) { + builder.addColumnFamily(UnsafeByteOperations.unsafeWrap(columnFamily)); + } + } + } + serializer.serialize(builder.build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + FlushRegionProcedureStateData data = + serializer.deserialize(FlushRegionProcedureStateData.class); + this.region = ProtobufUtil.toRegionInfo(data.getRegion()); + if (data.getColumnFamilyCount() > 0) { + this.columnFamilies = data.getColumnFamilyList().stream().filter(cf -> !cf.isEmpty()) + .map(ByteString::toByteArray).collect(Collectors.toList()); + } + } + + @Override + public Optional remoteCallBuild(MasterProcedureEnv env, ServerName serverName) { + FlushRegionParameter.Builder builder = FlushRegionParameter.newBuilder(); + builder.setRegion(ProtobufUtil.toRegionInfo(region)); + if (columnFamilies != null) { + for (byte[] columnFamily : columnFamilies) { + if (columnFamily != null && columnFamily.length > 0) { + builder.addColumnFamily(UnsafeByteOperations.unsafeWrap(columnFamily)); + } + } + } + return Optional.of(new RSProcedureDispatcher.ServerOperation(this, getProcId(), + FlushRegionCallable.class, builder.build().toByteArray())); + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.FLUSH; + } + + @Override + protected boolean waitInitialized(MasterProcedureEnv env) { + return env.waitInitialized(this); + } + + @Override + public TableName getTableName() { + return region.getTable(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushTableProcedure.java new file mode 100644 index 000000000000..892d4d13b5ee --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushTableProcedure.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Strings; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; +import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushTableProcedureStateData; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushTableState; + +@InterfaceAudience.Private +public class FlushTableProcedure extends AbstractStateMachineTableProcedure { + private static final Logger LOG = LoggerFactory.getLogger(FlushTableProcedure.class); + + private TableName tableName; + + private List columnFamilies; + + public FlushTableProcedure() { + super(); + } + + public FlushTableProcedure(MasterProcedureEnv env, TableName tableName) { + this(env, tableName, null); + } + + public FlushTableProcedure(MasterProcedureEnv env, TableName tableName, + List columnFamilies) { + super(env); + this.tableName = tableName; + this.columnFamilies = columnFamilies; + } + + @Override + protected LockState acquireLock(MasterProcedureEnv env) { + // Here we don't acquire table lock because the flush operation and other operations (like + // split or merge) are not mutually exclusive. Region will flush memstore when being closed. + // It's safe even if we don't have lock. However, currently we are limited by the scheduling + // mechanism of the procedure scheduler and have to acquire table shared lock here. See + // HBASE-27905 for details. + if (env.getProcedureScheduler().waitTableSharedLock(this, getTableName())) { + return LockState.LOCK_EVENT_WAIT; + } + return LockState.LOCK_ACQUIRED; + } + + @Override + protected void releaseLock(MasterProcedureEnv env) { + env.getProcedureScheduler().wakeTableSharedLock(this, getTableName()); + } + + @Override + protected Flow executeFromState(MasterProcedureEnv env, FlushTableState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + LOG.info("{} execute state={}", this, state); + + try { + switch (state) { + case FLUSH_TABLE_PREPARE: + preflightChecks(env, true); + setNextState(FlushTableState.FLUSH_TABLE_FLUSH_REGIONS); + return Flow.HAS_MORE_STATE; + case FLUSH_TABLE_FLUSH_REGIONS: + addChildProcedure(createFlushRegionProcedures(env)); + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException("unhandled state=" + state); + } + } catch (Exception e) { + if (e instanceof DoNotRetryIOException) { + // for example, TableNotFoundException or TableNotEnabledException + setFailure("master-flush-table", e); + LOG.warn("Unrecoverable error trying to flush " + getTableName() + " state=" + state, e); + } else { + LOG.warn("Retriable error trying to flush " + getTableName() + " state=" + state, e); + } + } + return Flow.HAS_MORE_STATE; + } + + @Override + protected void rollbackState(MasterProcedureEnv env, FlushTableState state) + throws IOException, InterruptedException { + // nothing to rollback + } + + @Override + protected FlushTableState getState(int stateId) { + return FlushTableState.forNumber(stateId); + } + + @Override + protected int getStateId(FlushTableState state) { + return state.getNumber(); + } + + @Override + protected FlushTableState getInitialState() { + return FlushTableState.FLUSH_TABLE_PREPARE; + } + + @Override + public TableName getTableName() { + return tableName; + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.FLUSH; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.serializeStateData(serializer); + FlushTableProcedureStateData.Builder builder = FlushTableProcedureStateData.newBuilder(); + builder.setTableName(ProtobufUtil.toProtoTableName(tableName)); + if (columnFamilies != null) { + for (byte[] columnFamily : columnFamilies) { + if (columnFamily != null && columnFamily.length > 0) { + builder.addColumnFamily(UnsafeByteOperations.unsafeWrap(columnFamily)); + } + } + } + serializer.serialize(builder.build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.deserializeStateData(serializer); + FlushTableProcedureStateData data = serializer.deserialize(FlushTableProcedureStateData.class); + this.tableName = ProtobufUtil.toTableName(data.getTableName()); + if (data.getColumnFamilyCount() > 0) { + this.columnFamilies = data.getColumnFamilyList().stream().filter(cf -> !cf.isEmpty()) + .map(ByteString::toByteArray).collect(Collectors.toList()); + } + } + + private FlushRegionProcedure[] createFlushRegionProcedures(MasterProcedureEnv env) { + return env.getAssignmentManager().getTableRegions(getTableName(), true).stream() + .filter(r -> RegionReplicaUtil.isDefaultReplica(r)) + .map(r -> new FlushRegionProcedure(r, columnFamilies)).toArray(FlushRegionProcedure[]::new); + } + + @Override + public void toStringClassDetails(StringBuilder builder) { + builder.append(getClass().getName()).append(", id=").append(getProcId()).append(", table=") + .append(tableName); + if (columnFamilies != null) { + builder.append(", columnFamilies=[") + .append(Strings.JOINER + .join(columnFamilies.stream().map(Bytes::toString).collect(Collectors.toList()))) + .append("]"); + } + } + + @Override + protected void afterReplay(MasterProcedureEnv env) { + if ( + !env.getMasterConfiguration().getBoolean( + MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED, + MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED_DEFAULT) + ) { + setFailure("master-flush-table", new HBaseIOException("FlushTableProcedureV2 is DISABLED")); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java index 4b3ae5ae1387..b5aaada9c455 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java @@ -33,6 +33,7 @@ public enum TableOperationType { EDIT, ENABLE, READ, + FLUSH, SNAPSHOT, REGION_SNAPSHOT, REGION_EDIT, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java index 0f580969c4f9..d3eae050b56d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java @@ -56,6 +56,7 @@ private static boolean requireTableExclusiveLock(TableProcedureInterface proc) { // we allow concurrent edit on the NS table return !proc.getTableName().equals(TableName.NAMESPACE_TABLE_NAME); case READ: + case FLUSH: case SNAPSHOT: return false; // region operations are using the shared-lock on the table diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java index b521a85e7a0c..bced7fefcf33 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java @@ -17,9 +17,9 @@ */ package org.apache.hadoop.hbase.procedure.flush; -import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; +import java.util.stream.Collectors; import org.apache.hadoop.hbase.errorhandling.ForeignException; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; import org.apache.hadoop.hbase.procedure.ProcedureMember; @@ -41,16 +41,16 @@ public class FlushTableSubprocedure extends Subprocedure { private static final Logger LOG = LoggerFactory.getLogger(FlushTableSubprocedure.class); private final String table; - private final String family; + private final List families; private final List regions; private final FlushTableSubprocedurePool taskManager; public FlushTableSubprocedure(ProcedureMember member, ForeignExceptionDispatcher errorListener, - long wakeFrequency, long timeout, List regions, String table, String family, + long wakeFrequency, long timeout, List regions, String table, List families, FlushTableSubprocedurePool taskManager) { super(member, table, errorListener, wakeFrequency, timeout); this.table = table; - this.family = family; + this.families = families; this.regions = regions; this.taskManager = taskManager; } @@ -70,7 +70,7 @@ public Void call() throws Exception { region.startRegionOperation(); try { LOG.debug("Flush region " + region.toString() + " started..."); - if (families == null) { + if (families == null || families.isEmpty()) { region.flush(true); } else { region.flushcache(families, false, FlushLifeCycleTracker.DUMMY); @@ -97,15 +97,15 @@ private void flushRegions() throws ForeignException { throw new IllegalStateException( "Attempting to flush " + table + " but we currently have outstanding tasks"); } - List families = null; - if (family != null) { - LOG.debug("About to flush family {} on all regions for table {}", family, table); - families = Collections.singletonList(Bytes.toBytes(family)); + List familiesToFlush = null; + if (families != null && !families.isEmpty()) { + LOG.debug("About to flush family {} on all regions for table {}", families, table); + familiesToFlush = families.stream().map(Bytes::toBytes).collect(Collectors.toList()); } // Add all hfiles already existing in region. for (HRegion region : regions) { // submit one task per region for parallelize by region. - taskManager.submitTask(new RegionFlushTask(region, families)); + taskManager.submitTask(new RegionFlushTask(region, familiesToFlush)); monitor.rethrowException(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java index 620bb5f8df7a..a4ef8cf0a1f8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java @@ -58,6 +58,10 @@ public class MasterFlushTableProcedureManager extends MasterProcedureManager { public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc"; + public static final String FLUSH_PROCEDURE_ENABLED = "hbase.flush.procedure.enabled"; + + public static final boolean FLUSH_PROCEDURE_ENABLED_DEFAULT = true; + private static final String FLUSH_TIMEOUT_MILLIS_KEY = "hbase.flush.master.timeoutMillis"; private static final int FLUSH_TIMEOUT_MILLIS_DEFAULT = 60000; private static final String FLUSH_WAKE_MILLIS_KEY = "hbase.flush.master.wakeMillis"; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java index 9c4473163c79..ba5cd3cbad66 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.util.Strings; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; @@ -50,6 +51,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; @@ -127,11 +129,11 @@ public void stop(boolean force) throws IOException { * If in a running state, creates the specified subprocedure to flush table regions. Because this * gets the local list of regions to flush and not the set the master had, there is a possibility * of a race where regions may be missed. - * @param table table to flush - * @param family column family within a table + * @param table table to flush + * @param families column families within a table * @return Subprocedure to submit to the ProcedureMember. */ - public Subprocedure buildSubprocedure(String table, String family) { + public Subprocedure buildSubprocedure(String table, List families) { // don't run the subprocedure if the parent is stop(ping) if (rss.isStopping() || rss.isStopped()) { throw new IllegalStateException("Can't start flush region subprocedure on RS: " @@ -160,7 +162,7 @@ public Subprocedure buildSubprocedure(String table, String family) { FlushTableSubprocedurePool taskManager = new FlushTableSubprocedurePool(rss.getServerName().toString(), conf, rss); return new FlushTableSubprocedure(member, exnDispatcher, wakeMillis, timeoutMillis, - involvedRegions, table, family, taskManager); + involvedRegions, table, families, taskManager); } /** @@ -176,19 +178,19 @@ public class FlushTableSubprocedureBuilder implements SubprocedureFactory { @Override public Subprocedure buildSubprocedure(String name, byte[] data) { - String family = null; - // Currently we do not put other data except family, so it is ok to - // judge by length that if family was specified + List families = null; + // Currently we do not put other data except families, so it is ok to + // judge by length that if families were specified if (data.length > 0) { try { HBaseProtos.NameStringPair nsp = HBaseProtos.NameStringPair.parseFrom(data); - family = nsp.getValue(); + families = ImmutableList.copyOf(Strings.SPLITTER.split(nsp.getValue())); } catch (Exception e) { LOG.error("fail to get family by parsing from data", e); } } // The name of the procedure instance from the master is the table name. - return RegionServerFlushTableProcedureManager.this.buildSubprocedure(name, family); + return RegionServerFlushTableProcedureManager.this.buildSubprocedure(name, families); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java new file mode 100644 index 000000000000..3dd932a1736d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushRegionParameter; + +@InterfaceAudience.Private +public class FlushRegionCallable extends BaseRSProcedureCallable { + + private static final Logger LOG = LoggerFactory.getLogger(FlushRegionCallable.class); + + private RegionInfo regionInfo; + + private List columnFamilies; + + @Override + protected void doCall() throws Exception { + HRegion region = rs.getRegion(regionInfo.getEncodedName()); + if (region == null) { + throw new NotServingRegionException("region=" + regionInfo.getRegionNameAsString()); + } + LOG.debug("Starting region operation on {}", region); + region.startRegionOperation(); + try { + HRegion.FlushResult res; + if (columnFamilies == null) { + res = region.flush(true); + } else { + res = region.flushcache(columnFamilies, false, FlushLifeCycleTracker.DUMMY); + } + if (res.getResult() == HRegion.FlushResult.Result.CANNOT_FLUSH) { + throw new IOException("Unable to complete flush " + regionInfo); + } + } finally { + LOG.debug("Closing region operation on {}", region); + region.closeRegionOperation(); + } + } + + @Override + protected void initParameter(byte[] parameter) throws Exception { + FlushRegionParameter param = FlushRegionParameter.parseFrom(parameter); + this.regionInfo = ProtobufUtil.toRegionInfo(param.getRegion()); + if (param.getColumnFamilyCount() > 0) { + this.columnFamilies = param.getColumnFamilyList().stream().filter(cf -> !cf.isEmpty()) + .map(ByteString::toByteArray).collect(Collectors.toList()); + } + } + + @Override + public EventType getEventType() { + return EventType.RS_FLUSH_REGIONS; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 74b8d1bef927..237de50ea5e3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2212,6 +2212,10 @@ executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_CLAIM_REPLI executorService.startExecutorService( executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SNAPSHOT_OPERATIONS) .setCorePoolSize(rsSnapshotOperationThreads)); + final int rsFlushOperationThreads = + conf.getInt("hbase.regionserver.executor.flush.operations.threads", 3); + executorService.startExecutorService(executorService.new ExecutorConfig() + .setExecutorType(ExecutorType.RS_FLUSH_OPERATIONS).setCorePoolSize(rsFlushOperationThreads)); Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller", uncaughtExceptionHandler); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java index 062e40aeeabf..d78caff9a393 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java @@ -501,4 +501,10 @@ public boolean replicationPeerModificationSwitch(boolean on) throws IOException public boolean isReplicationPeerModificationEnabled() { return false; } + + @Override + public long flushTable(TableName tableName, List columnFamilies, long nonceGroup, + long nonce) throws IOException { + return 0; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedure.java new file mode 100644 index 000000000000..cd48370647df --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedure.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.util.Arrays; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.TableNotEnabledException; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestFlushTableProcedure extends TestFlushTableProcedureBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestFlushTableProcedure.class); + + @Test + public void testSimpleFlush() throws IOException { + assertTableMemStoreNotEmpty(); + TEST_UTIL.getAdmin().flush(TABLE_NAME); + assertTableMemStoreEmpty(); + } + + @Test + public void testFlushTableExceptionally() throws IOException { + Admin admin = TEST_UTIL.getAdmin(); + admin.disableTable(TABLE_NAME); + Assert.assertThrows(TableNotEnabledException.class, () -> admin.flush(TABLE_NAME)); + admin.deleteTable(TABLE_NAME); + Assert.assertThrows(TableNotFoundException.class, () -> admin.flush(TABLE_NAME)); + } + + @Test + public void testSingleColumnFamilyFlush() throws IOException { + assertTableMemStoreNotEmpty(); + TEST_UTIL.getAdmin().flush(TABLE_NAME, Arrays.asList(FAMILY1, FAMILY2, FAMILY3)); + assertTableMemStoreEmpty(); + } + + @Test + public void testMultiColumnFamilyFlush() throws IOException { + assertTableMemStoreNotEmpty(); + TEST_UTIL.getAdmin().flush(TABLE_NAME, Arrays.asList(FAMILY1, FAMILY2, FAMILY3)); + assertTableMemStoreEmpty(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureBase.java new file mode 100644 index 000000000000..f8d4852d41fb --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureBase.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.RegionSplitter; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; + +public class TestFlushTableProcedureBase { + + protected static HBaseTestingUtility TEST_UTIL; + + protected TableName TABLE_NAME; + protected byte[] FAMILY1; + protected byte[] FAMILY2; + protected byte[] FAMILY3; + + @Before + public void setup() throws Exception { + TEST_UTIL = new HBaseTestingUtility(); + addConfiguration(TEST_UTIL.getConfiguration()); + TEST_UTIL.startMiniCluster(3); + TABLE_NAME = TableName.valueOf(Bytes.toBytes("TestFlushTable")); + FAMILY1 = Bytes.toBytes("cf1"); + FAMILY2 = Bytes.toBytes("cf2"); + FAMILY3 = Bytes.toBytes("cf3"); + final byte[][] splitKeys = new RegionSplitter.HexStringSplit().split(10); + Table table = + TEST_UTIL.createTable(TABLE_NAME, new byte[][] { FAMILY1, FAMILY2, FAMILY3 }, splitKeys); + TEST_UTIL.loadTable(table, FAMILY1, false); + TEST_UTIL.loadTable(table, FAMILY2, false); + TEST_UTIL.loadTable(table, FAMILY3, false); + } + + protected void addConfiguration(Configuration config) { + // delay dispatch so that we can do something, for example kill a target server + config.setInt(RemoteProcedureDispatcher.DISPATCH_DELAY_CONF_KEY, 10000); + config.setInt(RemoteProcedureDispatcher.DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, 128); + } + + protected void assertTableMemStoreNotEmpty() { + long totalSize = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).stream() + .mapToLong(HRegion::getMemStoreDataSize).sum(); + Assert.assertTrue(totalSize > 0); + } + + protected void assertTableMemStoreEmpty() { + long totalSize = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).stream() + .mapToLong(HRegion::getMemStoreDataSize).sum(); + Assert.assertEquals(0, totalSize); + } + + protected void assertColumnFamilyMemStoreNotEmpty(byte[] columnFamily) { + long totalSize = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).stream() + .mapToLong(r -> r.getStore(columnFamily).getMemStoreSize().getDataSize()).sum(); + Assert.assertTrue(totalSize > 0); + } + + protected void assertColumnFamilyMemStoreEmpty(byte[] columnFamily) { + long totalSize = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).stream() + .mapToLong(r -> r.getStore(columnFamily).getMemStoreSize().getDataSize()).sum(); + Assert.assertEquals(0, totalSize); + } + + @After + public void teardown() throws Exception { + if (TEST_UTIL.getHBaseCluster().getMaster() != null) { + ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate( + TEST_UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor(), false); + } + TEST_UTIL.shutdownMiniCluster(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureMasterRestarts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureMasterRestarts.java new file mode 100644 index 000000000000..c0c038982a9a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureMasterRestarts.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestFlushTableProcedureMasterRestarts extends TestFlushTableProcedureBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestFlushTableProcedureMasterRestarts.class); + + @Test + public void testMasterRestarts() throws IOException { + assertTableMemStoreNotEmpty(); + + HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); + ProcedureExecutor procExec = master.getMasterProcedureExecutor(); + MasterProcedureEnv env = procExec.getEnvironment(); + FlushTableProcedure proc = new FlushTableProcedure(env, TABLE_NAME); + long procId = procExec.submitProcedure(proc); + TEST_UTIL.waitFor(5000, 1000, () -> proc.getState().getNumber() > 1); + + TEST_UTIL.getHBaseCluster().killMaster(master.getServerName()); + TEST_UTIL.getHBaseCluster().waitForMasterToStop(master.getServerName(), 30000); + TEST_UTIL.getHBaseCluster().startMaster(); + TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster(); + + master = TEST_UTIL.getHBaseCluster().getMaster(); + procExec = master.getMasterProcedureExecutor(); + ProcedureTestingUtility.waitProcedure(procExec, procId); + assertTableMemStoreEmpty(); + } + + @Test + public void testSkipRIT() throws IOException { + HRegion region = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).get(0); + + TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates() + .getRegionStateNode(region.getRegionInfo()) + .setState(RegionState.State.CLOSING, RegionState.State.OPEN); + + FlushRegionProcedure proc = new FlushRegionProcedure(region.getRegionInfo()); + TEST_UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor().submitProcedure(proc); + + // wait for a time which is shorter than RSProcedureDispatcher delays + TEST_UTIL.waitFor(5000, () -> proc.isFinished()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureWithDoNotSupportFlushTableMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureWithDoNotSupportFlushTableMaster.java new file mode 100644 index 000000000000..66ccf362fef7 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureWithDoNotSupportFlushTableMaster.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestFlushTableProcedureWithDoNotSupportFlushTableMaster + extends TestFlushTableProcedureBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestFlushTableProcedureWithDoNotSupportFlushTableMaster.class); + + @Override + protected void addConfiguration(Configuration config) { + super.addConfiguration(config); + config.set(HConstants.MASTER_IMPL, DoNotSupportFlushTableMaster.class.getName()); + } + + @Test + public void testFlushFallback() throws IOException { + assertTableMemStoreNotEmpty(); + TEST_UTIL.getAdmin().flush(TABLE_NAME); + assertTableMemStoreEmpty(); + } + + @Test + public void testSingleColumnFamilyFlushFallback() throws IOException { + assertColumnFamilyMemStoreNotEmpty(FAMILY1); + TEST_UTIL.getAdmin().flush(TABLE_NAME, FAMILY1); + assertColumnFamilyMemStoreEmpty(FAMILY1); + } + + @Test + public void testMultiColumnFamilyFlushFallback() throws IOException { + assertTableMemStoreNotEmpty(); + TEST_UTIL.getAdmin().flush(TABLE_NAME, Arrays.asList(FAMILY1, FAMILY2, FAMILY3)); + assertTableMemStoreEmpty(); + } + + public static final class DoNotSupportFlushTableMaster extends HMaster { + + public DoNotSupportFlushTableMaster(Configuration conf) throws IOException { + super(conf); + } + + @Override + public long flushTable(TableName tableName, List columnFamilies, long nonceGroup, + long nonce) throws IOException { + throw new DoNotRetryIOException("UnsupportedOperation: flushTable"); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestFlushWithThroughputController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestFlushWithThroughputController.java index bfba16dad1ac..bb274878cee2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestFlushWithThroughputController.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestFlushWithThroughputController.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; import java.util.List; @@ -125,7 +126,14 @@ private Pair generateAndFlushData(Table table) throws IOException table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); } long startTime = System.nanoTime(); - hbtu.getAdmin().flush(tableName); + hbtu.getHBaseCluster().getRegions(tableName).stream().findFirst().ifPresent(r -> { + try { + r.flush(true); + } catch (IOException e) { + LOG.error("Failed flush region {}", r, e); + fail("Failed flush region " + r.getRegionInfo().getRegionNameAsString()); + } + }); duration += System.nanoTime() - startTime; } HStore store = getStoreWithName(tableName); diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java index 4ba7ad3101fd..05fc3c932dc5 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java @@ -1343,6 +1343,11 @@ public Future disableTableAsync(TableName tableName) { throw new NotImplementedException("disableTableAsync not supported in ThriftAdmin"); } + @Override + public Future flushAsync(TableName tableName, List columnFamilies) { + throw new NotImplementedException("flushAsync not supported in ThriftAdmin"); + } + @Override public Pair getAlterStatus(TableName tableName) { throw new NotImplementedException("getAlterStatus not supported in ThriftAdmin"); @@ -1489,6 +1494,11 @@ public boolean replicationPeerModificationSwitch(boolean on, boolean drainProced "replicationPeerModificationSwitch not supported in ThriftAdmin"); } + @Override + public void flush(TableName tableName, List columnFamilies) { + throw new NotImplementedException("flush not supported in ThriftAdmin"); + } + @Override public boolean isReplicationPeerModificationEnabled() throws IOException { throw new NotImplementedException( From c0f559c9774fa133ff16f951e262bc46540ae4d7 Mon Sep 17 00:00:00 2001 From: huiruan <876107431@qq.com> Date: Wed, 23 Aug 2023 20:30:47 +0800 Subject: [PATCH 2/6] add fallback logic in flushAsync --- .../org/apache/hadoop/hbase/client/Admin.java | 4 +- .../hadoop/hbase/client/HBaseAdmin.java | 88 ++++++++++++------- 2 files changed, 60 insertions(+), 32 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index 536941495512..3fc844b1aac9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -893,7 +893,9 @@ boolean closeRegionWithEncodedRegionName(String encodedRegionName, String server * @param columnFamilies column families within a table * @throws IOException if a remote or network exception occurs */ - void flush(TableName tableName, List columnFamilies) throws IOException; + default void flush(TableName tableName, List columnFamilies) throws IOException { + get(flushAsync(tableName, columnFamilies), getSyncWaitTimeout(), TimeUnit.MILLISECONDS); + } /** * Flush a table but does not block and wait for it to finish. You can use Future.get(long, diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 63ab1651d994..6dfeafc3467b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.util.FutureUtils.get; - import com.google.protobuf.Descriptors; import com.google.protobuf.Message; import com.google.protobuf.RpcController; @@ -1258,45 +1256,55 @@ public void flush(final TableName tableName, byte[] columnFamily) throws IOExcep } @Override - public void flush(TableName tableName, List columnFamilyList) throws IOException { + public Future flushAsync(TableName tableName, List columnFamilies) + throws IOException { // check if the table exists and enabled if (!isTableEnabled(tableName)) { throw new TableNotEnabledException(tableName.getNameAsString()); } - - List columnFamilies = columnFamilyList.stream() + // remove duplicate column families + List columnFamilyList = columnFamilies.stream() .filter(cf -> cf != null && cf.length > 0).distinct().collect(Collectors.toList()); try { - get(flushAsync(tableName, columnFamilies), getSyncWaitTimeout(), TimeUnit.MILLISECONDS); + FlushTableResponse resp = executeCallable( + new MasterCallable(getConnection(), getRpcControllerFactory()) { + final long nonceGroup = ng.getNonceGroup(); + final long nonce = ng.newNonce(); + + @Override + protected FlushTableResponse rpcCall() throws Exception { + FlushTableRequest request = RequestConverter.buildFlushTableRequest(tableName, + columnFamilyList, nonceGroup, nonce); + return master.flushTable(getRpcController(), request); + } + }); + return new FlushTableFuture(this, tableName, resp); } catch (DoNotRetryIOException e) { // This is for keeping compatibility with old implementation. // usually the exception caused by the method is not present on the server or // the hbase hadoop version does not match the running hadoop version or // the FlushTableProcedure is disabled, if that happens, we need fall back // to the old flush implementation. - legacyFlush(tableName, columnFamilies); + Map props = new HashMap<>(); + if (!columnFamilies.isEmpty()) { + props.put(HConstants.FAMILY_KEY_STR, Strings.JOINER + .join(columnFamilies.stream().map(Bytes::toString).collect(Collectors.toList()))); + } + ProcedureDescription desc = ProtobufUtil.buildProcedureDescription("flush-table-proc", + tableName.getNameAsString(), props); + ExecProcedureRequest request = ExecProcedureRequest.newBuilder().setProcedure(desc).build(); + ExecProcedureResponse resp = executeCallable( + new MasterCallable(getConnection(), getRpcControllerFactory()) { + @Override + protected ExecProcedureResponse rpcCall() throws Exception { + return master.execProcedure(getRpcController(), request); + } + }); + return new LegacyFlushFuture(this, tableName, props); } } - @Override - public Future flushAsync(TableName tableName, List columnFamilies) - throws IOException { - FlushTableResponse resp = executeCallable( - new MasterCallable(getConnection(), getRpcControllerFactory()) { - final long nonceGroup = ng.getNonceGroup(); - final long nonce = ng.newNonce(); - - @Override - protected FlushTableResponse rpcCall() throws Exception { - FlushTableRequest request = - RequestConverter.buildFlushTableRequest(tableName, columnFamilies, nonceGroup, nonce); - return master.flushTable(getRpcController(), request); - } - }); - return new FlushTableFuture(this, tableName, resp); - } - private static class FlushTableFuture extends TableFuture { public FlushTableFuture(final HBaseAdmin admin, final TableName tableName, @@ -1310,13 +1318,31 @@ public String getOperationType() { } } - private void legacyFlush(TableName tableName, List columnFamilies) throws IOException { - Map props = new HashMap<>(); - if (!columnFamilies.isEmpty()) { - props.put(HConstants.FAMILY_KEY_STR, Strings.JOINER - .join(columnFamilies.stream().map(Bytes::toString).collect(Collectors.toList()))); + private static class LegacyFlushFuture extends TableFuture { + + private final Map props; + + public LegacyFlushFuture(HBaseAdmin admin, TableName tableName, Map props) { + super(admin, tableName, null); + this.props = props; + } + + @Override + public String getOperationType() { + return "FLUSH"; + } + + @Override + protected Void waitOperationResult(long deadlineTs) throws IOException, TimeoutException { + waitForState(deadlineTs, new TableWaitForStateCallable() { + @Override + public boolean checkState(int tries) throws IOException { + return getAdmin().isProcedureFinished("flush-table-proc", + getTableName().getNameAsString(), props); + } + }); + return null; } - execProcedure("flush-table-proc", tableName.getNameAsString(), props); } @Override From 2c3357ce0691edb7d55e9c827562b78cb80bb9de Mon Sep 17 00:00:00 2001 From: huiruan <876107431@qq.com> Date: Thu, 24 Aug 2023 01:09:29 +0800 Subject: [PATCH 3/6] remove unused local variable --- .../java/org/apache/hadoop/hbase/client/HBaseAdmin.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 6dfeafc3467b..1c2a35a08303 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -1291,13 +1291,14 @@ protected FlushTableResponse rpcCall() throws Exception { props.put(HConstants.FAMILY_KEY_STR, Strings.JOINER .join(columnFamilies.stream().map(Bytes::toString).collect(Collectors.toList()))); } - ProcedureDescription desc = ProtobufUtil.buildProcedureDescription("flush-table-proc", - tableName.getNameAsString(), props); - ExecProcedureRequest request = ExecProcedureRequest.newBuilder().setProcedure(desc).build(); - ExecProcedureResponse resp = executeCallable( + executeCallable( new MasterCallable(getConnection(), getRpcControllerFactory()) { @Override protected ExecProcedureResponse rpcCall() throws Exception { + ExecProcedureRequest request = ExecProcedureRequest.newBuilder() + .setProcedure(ProtobufUtil.buildProcedureDescription("flush-table-proc", + tableName.getNameAsString(), props)) + .build(); return master.execProcedure(getRpcController(), request); } }); From 2376b2f4d397d2d79f5c182f4e1c6012b9aba6db Mon Sep 17 00:00:00 2001 From: huiruan <876107431@qq.com> Date: Thu, 24 Aug 2023 12:01:29 +0800 Subject: [PATCH 4/6] add a blank line to trigger a new build --- .../src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 1c2a35a08303..a379e72570f8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -1291,6 +1291,7 @@ protected FlushTableResponse rpcCall() throws Exception { props.put(HConstants.FAMILY_KEY_STR, Strings.JOINER .join(columnFamilies.stream().map(Bytes::toString).collect(Collectors.toList()))); } + executeCallable( new MasterCallable(getConnection(), getRpcControllerFactory()) { @Override From a8e4c2fb5734c7cd0587c962a31782560f88d196 Mon Sep 17 00:00:00 2001 From: huiruan <876107431@qq.com> Date: Fri, 25 Aug 2023 14:18:07 +0800 Subject: [PATCH 5/6] rerun spotless:apply --- .../main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index a379e72570f8..b4e9390c13e1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -1291,7 +1291,7 @@ protected FlushTableResponse rpcCall() throws Exception { props.put(HConstants.FAMILY_KEY_STR, Strings.JOINER .join(columnFamilies.stream().map(Bytes::toString).collect(Collectors.toList()))); } - + executeCallable( new MasterCallable(getConnection(), getRpcControllerFactory()) { @Override From b722b2b861b199c69feb64c00614ee8687859f1f Mon Sep 17 00:00:00 2001 From: huiruan <876107431@qq.com> Date: Sun, 27 Aug 2023 23:32:07 +0800 Subject: [PATCH 6/6] rebase on branch-2 --- .../hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java index f0d612930820..64dcd6c7067c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java @@ -36,8 +36,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;