Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,30 @@ boolean closeRegionWithEncodedRegionName(String encodedRegionName, String server
*/
void flush(TableName tableName, byte[] columnFamily) throws IOException;

/**
* Flush the specified column family stores on all regions of the passed table. This runs as a
* synchronous operation.
* @param tableName table to flush
* @param columnFamilies column families within a table
* @throws IOException if a remote or network exception occurs
*/
default void flush(TableName tableName, List<byte[]> columnFamilies) throws IOException {
get(flushAsync(tableName, columnFamilies), getSyncWaitTimeout(), TimeUnit.MILLISECONDS);
}

/**
* Flush a table but does not block and wait for it to finish. You can use Future.get(long,
* TimeUnit) to wait on the operation to complete. It may throw ExecutionException if there was an
* error while executing the operation or TimeoutException in case the wait timeout was not long
* enough to allow the operation to complete.
* @param tableName table to flush
* @param columnFamilies column families within a table
* @return the result of the async creation. You can use Future.get(long, TimeUnit) to wait on the
* operation to complete.
* @throws IOException if a remote or network exception occurs
*/
Future<Void> flushAsync(TableName tableName, List<byte[]> columnFamilies) throws IOException;

/**
* Flush an individual region. Synchronous operation.
* @param regionName region to flush
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,14 @@ CompletableFuture<Void> modifyColumnFamilyStoreFileTracker(TableName tableName,
*/
CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily);

/**
* Flush the specified column family stores on all regions of the passed table. This runs as a
* synchronous operation.
* @param tableName table to flush
* @param columnFamilies column families within a table
*/
CompletableFuture<Void> flush(TableName tableName, List<byte[]> columnFamilies);

/**
* Flush an individual region.
* @param regionName region to flush
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,11 @@ public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
return wrap(rawAdmin.flush(tableName, columnFamily));
}

@Override
public CompletableFuture<Void> flush(TableName tableName, List<byte[]> columnFamilies) {
return wrap(rawAdmin.flush(tableName, columnFamilies));
}

@Override
public CompletableFuture<Void> flushRegion(byte[] regionName) {
return wrap(rawAdmin.flushRegion(regionName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1847,6 +1847,12 @@ public MasterProtos.GetTableNamesResponse getTableNames(RpcController controller
return stub.getTableNames(controller, request);
}

@Override
public MasterProtos.FlushTableResponse flushTable(RpcController controller,
MasterProtos.FlushTableRequest request) throws ServiceException {
return stub.flushTable(controller, request);
}

@Override
public MasterProtos.ListTableNamesByStateResponse listTableNamesByState(
RpcController controller, MasterProtos.ListTableNamesByStateRequest request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.UnknownRegionException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
Expand Down Expand Up @@ -100,6 +101,7 @@
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Strings;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -164,6 +166,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
Expand Down Expand Up @@ -1243,21 +1247,104 @@ public List<HRegionInfo> getOnlineRegions(final ServerName sn) throws IOExceptio

@Override
public void flush(final TableName tableName) throws IOException {
flush(tableName, null);
flush(tableName, Collections.emptyList());
}

@Override
public void flush(final TableName tableName, byte[] columnFamily) throws IOException {
checkTableExists(tableName);
if (isTableDisabled(tableName)) {
LOG.info("Table is disabled: " + tableName.getNameAsString());
return;
flush(tableName, Collections.singletonList(columnFamily));
}

@Override
public Future<Void> flushAsync(TableName tableName, List<byte[]> columnFamilies)
throws IOException {
// check if the table exists and enabled
if (!isTableEnabled(tableName)) {
throw new TableNotEnabledException(tableName.getNameAsString());
}
// remove duplicate column families
List<byte[]> columnFamilyList = columnFamilies.stream()
.filter(cf -> cf != null && cf.length > 0).distinct().collect(Collectors.toList());

try {
FlushTableResponse resp = executeCallable(
new MasterCallable<FlushTableResponse>(getConnection(), getRpcControllerFactory()) {
final long nonceGroup = ng.getNonceGroup();
final long nonce = ng.newNonce();

@Override
protected FlushTableResponse rpcCall() throws Exception {
FlushTableRequest request = RequestConverter.buildFlushTableRequest(tableName,
columnFamilyList, nonceGroup, nonce);
return master.flushTable(getRpcController(), request);
}
});
return new FlushTableFuture(this, tableName, resp);
} catch (DoNotRetryIOException e) {
// This is for keeping compatibility with old implementation.
// usually the exception caused by the method is not present on the server or
// the hbase hadoop version does not match the running hadoop version or
// the FlushTableProcedure is disabled, if that happens, we need fall back
// to the old flush implementation.
Map<String, String> props = new HashMap<>();
if (!columnFamilies.isEmpty()) {
props.put(HConstants.FAMILY_KEY_STR, Strings.JOINER
.join(columnFamilies.stream().map(Bytes::toString).collect(Collectors.toList())));
}

executeCallable(
new MasterCallable<ExecProcedureResponse>(getConnection(), getRpcControllerFactory()) {
@Override
protected ExecProcedureResponse rpcCall() throws Exception {
ExecProcedureRequest request = ExecProcedureRequest.newBuilder()
.setProcedure(ProtobufUtil.buildProcedureDescription("flush-table-proc",
tableName.getNameAsString(), props))
.build();
return master.execProcedure(getRpcController(), request);
}
});
return new LegacyFlushFuture(this, tableName, props);
}
}

private static class FlushTableFuture extends TableFuture<Void> {

public FlushTableFuture(final HBaseAdmin admin, final TableName tableName,
final FlushTableResponse resp) {
super(admin, tableName, (resp != null && resp.hasProcId()) ? resp.getProcId() : null);
}
Map<String, String> props = new HashMap<>();
if (columnFamily != null) {
props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily));

@Override
public String getOperationType() {
return "FLUSH";
}
}

private static class LegacyFlushFuture extends TableFuture<Void> {

private final Map<String, String> props;

public LegacyFlushFuture(HBaseAdmin admin, TableName tableName, Map<String, String> props) {
super(admin, tableName, null);
this.props = props;
}

@Override
public String getOperationType() {
return "FLUSH";
}

@Override
protected Void waitOperationResult(long deadlineTs) throws IOException, TimeoutException {
waitForState(deadlineTs, new TableWaitForStateCallable() {
@Override
public boolean checkState(int tries) throws IOException {
return getAdmin().isProcedureFinished("flush-table-proc",
getTableName().getNameAsString(), props);
}
});
return null;
}
execProcedure("flush-table-proc", tableName.getNameAsString(), props);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.ClusterMetrics.Option;
import org.apache.hadoop.hbase.ClusterMetricsBuilder;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MetaTableAccessor;
Expand Down Expand Up @@ -95,6 +96,7 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.hadoop.hbase.util.Strings;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -176,6 +178,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
Expand Down Expand Up @@ -949,12 +953,50 @@ public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {

@Override
public CompletableFuture<Void> flush(TableName tableName) {
return flush(tableName, null);
return flush(tableName, Collections.emptyList());
}

@Override
public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
return flush(tableName, Collections.singletonList(columnFamily));
}

@Override
public CompletableFuture<Void> flush(TableName tableName, List<byte[]> columnFamilyList) {
// This is for keeping compatibility with old implementation.
// If the server version is lower than the client version, it's possible that the
// flushTable method is not present in the server side, if so, we need to fall back
// to the old implementation.
List<byte[]> columnFamilies = columnFamilyList.stream()
.filter(cf -> cf != null && cf.length > 0).distinct().collect(Collectors.toList());
FlushTableRequest request = RequestConverter.buildFlushTableRequest(tableName, columnFamilies,
ng.getNonceGroup(), ng.newNonce());
CompletableFuture<Void> procFuture = this.<FlushTableRequest, FlushTableResponse> procedureCall(
tableName, request, (s, c, req, done) -> s.flushTable(c, req, done),
(resp) -> resp.getProcId(), new FlushTableProcedureBiConsumer(tableName));
CompletableFuture<Void> future = new CompletableFuture<>();
addListener(procFuture, (ret, error) -> {
if (error != null) {
if (error instanceof TableNotFoundException || error instanceof TableNotEnabledException) {
future.completeExceptionally(error);
} else if (error instanceof DoNotRetryIOException) {
// usually this is caused by the method is not present on the server or
// the hbase hadoop version does not match the running hadoop version.
// if that happens, we need fall back to the old flush implementation.
LOG.info("Unrecoverable error in master side. Fallback to FlushTableProcedure V1", error);
legacyFlush(future, tableName, columnFamilies);
} else {
future.completeExceptionally(error);
}
} else {
future.complete(ret);
}
});
return future;
}

private void legacyFlush(CompletableFuture<Void> future, TableName tableName,
List<byte[]> columnFamilies) {
addListener(tableExists(tableName), (exists, err) -> {
if (err != null) {
future.completeExceptionally(err);
Expand All @@ -968,8 +1010,9 @@ public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
future.completeExceptionally(new TableNotEnabledException(tableName));
} else {
Map<String, String> props = new HashMap<>();
if (columnFamily != null) {
props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily));
if (columnFamilies != null && !columnFamilies.isEmpty()) {
props.put(HConstants.FAMILY_KEY_STR, Strings.JOINER
.join(columnFamilies.stream().map(Bytes::toString).collect(Collectors.toList())));
}
addListener(
execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), props),
Expand All @@ -984,7 +1027,6 @@ public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
});
}
});
return future;
}

@Override
Expand Down Expand Up @@ -2736,6 +2778,18 @@ String getOperationType() {
}
}

private static class FlushTableProcedureBiConsumer extends TableProcedureBiConsumer {

FlushTableProcedureBiConsumer(TableName tableName) {
super(tableName);
}

@Override
String getOperationType() {
return "FLUSH";
}
}

private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {

CreateNamespaceProcedureBiConsumer(String namespaceName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
Expand Down Expand Up @@ -495,6 +496,12 @@ public GetNamespaceDescriptorResponse getNamespaceDescriptor(RpcController contr
return stub.getNamespaceDescriptor(controller, request);
}

@Override
public MasterProtos.FlushTableResponse flushTable(RpcController controller,
MasterProtos.FlushTableRequest request) throws ServiceException {
return stub.flushTable(controller, request);
}

@Override
public ListNamespacesResponse listNamespaces(RpcController controller,
ListNamespacesRequest request) throws ServiceException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
Expand Down Expand Up @@ -1907,4 +1908,16 @@ public static ClearSlowLogResponseRequest buildClearSlowLogResponseRequest() {
return ClearSlowLogResponseRequest.newBuilder().build();
}

public static FlushTableRequest buildFlushTableRequest(final TableName tableName,
final List<byte[]> columnFamilies, final long nonceGroup, final long nonce) {
FlushTableRequest.Builder builder = FlushTableRequest.newBuilder();
builder.setTableName(ProtobufUtil.toProtoTableName(tableName));
if (!columnFamilies.isEmpty()) {
for (byte[] columnFamily : columnFamilies) {
builder.addColumnFamily(UnsafeByteOperations.unsafeWrap(columnFamily));
}
}
return builder.setNonceGroup(nonceGroup).setNonce(nonce).build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,18 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
import org.apache.hbase.thirdparty.com.google.common.base.Splitter;

/**
* Utility for Strings.
*/
@InterfaceAudience.Private
public final class Strings {
public static final String DEFAULT_SEPARATOR = "=";
public static final String DEFAULT_KEYVALUE_SEPARATOR = ", ";
public static final Joiner JOINER = Joiner.on(",");
public static final Splitter SPLITTER = Splitter.on(",");

private Strings() {
}
Expand Down
Loading