Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Optional;
import java.util.SortedMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand All @@ -53,6 +54,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put static import on top


/**
* The asynchronous meta table accessor. Used to read/write region and assignment information store
* in <code>hbase:meta</code>.
Expand Down Expand Up @@ -366,10 +369,40 @@ private static Scan getMetaScan(AsyncTable<?> metaTable, int rowUpperLimit) {
Scan scan = new Scan();
int scannerCaching = metaTable.getConfiguration().getInt(HConstants.HBASE_META_SCANNER_CACHING,
HConstants.DEFAULT_HBASE_META_SCANNER_CACHING);
if (metaTable.getConfiguration().getBoolean(HConstants.USE_META_REPLICAS,
HConstants.DEFAULT_USE_META_REPLICAS)) {
scan.setConsistency(Consistency.TIMELINE);

// Get the region locator's meta replica mode.
CatalogReplicaMode metaReplicaMode = CatalogReplicaMode.fromString(metaTable.getConfiguration()
.get(LOCATOR_META_REPLICAS_MODE, CatalogReplicaMode.NONE.toString()));

switch (metaReplicaMode) {
case LOAD_BALANCE:
int numOfReplicas = 1;
try {
numOfReplicas = metaTable.getDescriptor().get().getRegionReplication();
} catch (Exception e) {
LOG.warn("Failed to get region replication for meta table");
}
if (numOfReplicas > 1) {
int replicaId = ThreadLocalRandom.current().nextInt(numOfReplicas);

// When the replicaId is 0, do not set to Consistency.TIMELINE
if (replicaId > 0) {
scan.setReplicaId(replicaId);
scan.setConsistency(Consistency.TIMELINE);
}
}
break;
case NONE:
// If user does not configure LOCATOR_META_REPLICAS_MODE, let's check the legacy config.
if (metaTable.getConfiguration().getBoolean(HConstants.USE_META_REPLICAS, HConstants.DEFAULT_USE_META_REPLICAS)) {
scan.setConsistency(Consistency.TIMELINE);
}
break;

default:
// Do nothing
}

if (rowUpperLimit <= scannerCaching) {
scan.setLimit(rowUpperLimit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
package org.apache.hadoop.hbase;

import org.apache.yetus.audience.InterfaceAudience;

Expand All @@ -33,7 +33,7 @@
* </ol>
*/
@InterfaceAudience.Private
enum CatalogReplicaMode {
public enum CatalogReplicaMode {
NONE {
@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.NavigableMap;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -76,6 +77,7 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;

/**
* <p>
Expand Down Expand Up @@ -357,7 +359,7 @@ public static Result scanByRegionEncodedName(Connection connection,
String regionEncodedName) throws IOException {
RowFilter rowFilter = new RowFilter(CompareOperator.EQUAL,
new SubstringComparator(regionEncodedName));
Scan scan = getMetaScan(connection.getConfiguration(), 1);
Scan scan = getMetaScan(connection, connection.getConfiguration(), 1);
scan.setFilter(rowFilter);
try (Table table = getMetaHTable(connection);
ResultScanner resultScanner = table.getScanner(scan)) {
Expand Down Expand Up @@ -567,19 +569,55 @@ public static Scan getScanForTableName(Configuration conf, TableName tableName)
// Stop key appends the smallest possible char to the table name
byte[] stopKey = getTableStopRowForMeta(tableName, QueryType.REGION);

Scan scan = getMetaScan(conf, -1);
Scan scan = getMetaScan(null, conf, -1);
scan.setStartRow(startKey);
scan.setStopRow(stopKey);
return scan;
}

private static Scan getMetaScan(Configuration conf, int rowUpperLimit) {
private static Scan getMetaScan(Connection conn, Configuration conf, int rowUpperLimit) {
Scan scan = new Scan();
int scannerCaching = conf.getInt(HConstants.HBASE_META_SCANNER_CACHING,
HConstants.DEFAULT_HBASE_META_SCANNER_CACHING);
if (conf.getBoolean(HConstants.USE_META_REPLICAS, HConstants.DEFAULT_USE_META_REPLICAS)) {
scan.setConsistency(Consistency.TIMELINE);

// Get the region locator's meta replica mode.
CatalogReplicaMode metaReplicaMode = CatalogReplicaMode.fromString(
conf.get(LOCATOR_META_REPLICAS_MODE, CatalogReplicaMode.NONE.toString()));

switch (metaReplicaMode) {
case LOAD_BALANCE:
int numOfReplicas = 1;
if (conn != null) {
try {
try (Table metaTable = getMetaHTable(conn)) {
numOfReplicas = metaTable.getDescriptor().getRegionReplication();
Copy link
Contributor

@Apache9 Apache9 Jan 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this is a bit painful... We used to expect getMetaScan to only contain memory operations but here we will have network io... And everytime we call this method it will generate a rpc calll...

Looking at the code above, in the async code we also have this and we even introduce a blocking rpc call. I haven't realized this when reviewing the PR for master, but obviously it is incorrect, we should not do blocking call in the async code. We should try to find a better way...

Copy link
Contributor Author

@huaxiangsun huaxiangsun Jan 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice find! Let me back out the master changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reverted the commit to the master. You are right, the network call is bad. How about changing getMetaScan from a static method to a non-static one, so it can use a cached meta Replica count? Need to handle the case that meta replica count is changed dynamically.

}
} catch (IOException ioe) {
LOG.warn("Failed to get region replication for meta table");
}
}

if (numOfReplicas > 1) {
int replicaId = ThreadLocalRandom.current().nextInt(numOfReplicas);

// When the replicaId is 0, do not set to Consistency.TIMELINE
if (replicaId > 0) {
scan.setReplicaId(replicaId);
scan.setConsistency(Consistency.TIMELINE);
}
}
break;
case NONE:
// If user does not configure LOCATOR_META_REPLICAS_MODE, let's check the legacy config.
if (conf.getBoolean(HConstants.USE_META_REPLICAS, HConstants.DEFAULT_USE_META_REPLICAS)) {
scan.setConsistency(Consistency.TIMELINE);
}
break;

default:
// Do nothing
}

if (rowUpperLimit > 0) {
scan.setLimit(rowUpperLimit);
scan.setReadType(Scan.ReadType.PREAD);
Expand Down Expand Up @@ -771,7 +809,7 @@ private static void scanMeta(Connection connection, @Nullable final byte[] start
@Nullable final byte[] stopRow, QueryType type, @Nullable Filter filter, int maxRows,
final Visitor visitor) throws IOException {
int rowUpperLimit = maxRows > 0 ? maxRows : Integer.MAX_VALUE;
Scan scan = getMetaScan(connection.getConfiguration(), rowUpperLimit);
Scan scan = getMetaScan(connection, connection.getConfiguration(), rowUpperLimit);

for (byte[] family : type.getFamilies()) {
scan.addFamily(family);
Expand Down Expand Up @@ -821,7 +859,7 @@ private static void scanMeta(Connection connection, @Nullable final byte[] start
private static RegionInfo getClosestRegionInfo(Connection connection,
@NonNull final TableName tableName, @NonNull final byte[] row) throws IOException {
byte[] searchRow = RegionInfo.createRegionName(tableName, row, HConstants.NINES, false);
Scan scan = getMetaScan(connection.getConfiguration(), 1);
Scan scan = getMetaScan(connection, connection.getConfiguration(), 1);
scan.setReversed(true);
scan.withStartRow(searchRow);
try (ResultScanner resultScanner = getMetaHTable(connection).getScanner(scan)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.hadoop.hbase.CatalogReplicaMode;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.CatalogReplicaMode;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CatalogReplicaMode;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionLocation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,12 @@ private void primaryMayIncreaseReplicaNoChange(final long[] before, final long[]
}
}

private void PrimaryIncreaseReplicaIncrease(final long[] before, final long[] after) {
for (int i = 0; i < after.length; i++) {
assertTrue(after[i] > before[i]);
}
}

private void getMetaReplicaReadRequests(final Region[] metaRegions, final long[] counters) {
int i = 0;
for (Region r : metaRegions) {
Expand All @@ -535,6 +541,7 @@ public void testHBaseMetaReplicaGets() throws Exception {
final Region[] metaRegions = getAllRegions(TableName.META_TABLE_NAME, numOfMetaReplica);
long[] readReqsForMetaReplicas = new long[numOfMetaReplica];
long[] readReqsForMetaReplicasAfterGet = new long[numOfMetaReplica];
long[] readReqsForMetaReplicasAfterGetAllLocations = new long[numOfMetaReplica];
long[] readReqsForMetaReplicasAfterMove = new long[numOfMetaReplica];
long[] readReqsForMetaReplicasAfterSecondMove = new long[numOfMetaReplica];
long[] readReqsForMetaReplicasAfterThirdGet = new long[numOfMetaReplica];
Expand Down Expand Up @@ -583,6 +590,16 @@ public void testHBaseMetaReplicaGets() throws Exception {
// For rest of meta replicas, there are more reads against them.
primaryNoChangeReplicaIncrease(readReqsForMetaReplicas, readReqsForMetaReplicasAfterGet);

RegionLocator locator = tableForGet.getRegionLocator();

for (int j = 0; j < numOfMetaReplica * 3; j++) {
locator.getAllRegionLocations();
}

getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterGetAllLocations);
PrimaryIncreaseReplicaIncrease(readReqsForMetaReplicasAfterGet,
readReqsForMetaReplicasAfterGetAllLocations);

// move one of regions so it meta cache may be invalid.
HTU.moveRegionAndWait(userRegion.getRegionInfo(), destRs.getServerName());

Expand All @@ -592,7 +609,7 @@ public void testHBaseMetaReplicaGets() throws Exception {

// There are read requests increase for primary meta replica.
// For rest of meta replicas, there is no change as regionMove will tell the new location
primaryIncreaseReplicaNoChange(readReqsForMetaReplicasAfterGet,
primaryIncreaseReplicaNoChange(readReqsForMetaReplicasAfterGetAllLocations,
readReqsForMetaReplicasAfterMove);
// Move region again.
HTU.moveRegionAndWait(userRegion.getRegionInfo(), srcRs.getServerName());
Expand Down