Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
Expand Down Expand Up @@ -68,7 +69,7 @@ private static List<TooSlowLog.SlowLogPayload> filterLogs(
if (tableName != null && slowLogPayload.getRegionName().startsWith(tableName)) {
totalFilterMatches++;
}
if (slowLogPayload.getClientAddress().equals(clientAddress)) {
if (isClientAddressMatched(slowLogPayload, clientAddress)) {
totalFilterMatches++;
}
if (slowLogPayload.getUserName().equals(userName)) {
Expand All @@ -92,6 +93,17 @@ private static List<TooSlowLog.SlowLogPayload> filterLogs(
return filteredSlowLogPayloads;
}

private static boolean isClientAddressMatched(TooSlowLog.SlowLogPayload slowLogPayload,
String clientAddress) {
String clientAddressInPayload = slowLogPayload.getClientAddress();
int portPos = clientAddressInPayload.lastIndexOf(Addressing.HOSTNAME_PORT_SEPARATOR);
if (portPos < 1) {
return clientAddressInPayload.equals(clientAddress);
}
return clientAddressInPayload.equals(clientAddress)
|| clientAddressInPayload.substring(0, portPos).equals(clientAddress);
}

public static List<TooSlowLog.SlowLogPayload> getFilteredLogs(
AdminProtos.SlowLogResponseRequest request, List<TooSlowLog.SlowLogPayload> logPayloadList) {
int totalFilters = getTotalFiltersCount(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,60 @@ public void testSlowLogFilters() throws Exception {
HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(requestSlowLog).size() == 15));
}

@Test
public void testSlowLogFilterWithClientAddress() throws Exception {
Configuration conf = applySlowLogRecorderConf(10);
Constructor<NamedQueueRecorder> constructor =
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
constructor.setAccessible(true);
namedQueueRecorder = constructor.newInstance(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().build();
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);

String[] clientAddressArray = new String[] { "[127:1:1:1:1:1:1:1]:1", "[127:1:1:1:1:1:1:1]:2",
"[127:1:1:1:1:1:1:1]:3", "127.0.0.1:1", "127.0.0.1:2" };
boolean isSlowLog;
boolean isLargeLog;
for (int i = 0; i < 10; i++) {
if (i % 2 == 0) {
isSlowLog = true;
isLargeLog = false;
} else {
isSlowLog = false;
isLargeLog = true;
}
RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_" + (i + 1),
clientAddressArray[i % 5], "class_" + (i + 1), isSlowLog, isLargeLog);
namedQueueRecorder.addRecord(rpcLogDetails);
}

AdminProtos.SlowLogResponseRequest largeLogRequestIPv6WithPort =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG)
.setClientAddress("[127:1:1:1:1:1:1:1]:2").build();
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> getSlowLogPayloads(largeLogRequestIPv6WithPort).size() == 1));
AdminProtos.SlowLogResponseRequest largeLogRequestIPv6WithoutPort =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG)
.setClientAddress("[127:1:1:1:1:1:1:1]").build();
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> getSlowLogPayloads(largeLogRequestIPv6WithoutPort).size() == 3));
AdminProtos.SlowLogResponseRequest largeLogRequestIPv4WithPort =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG)
.setClientAddress("127.0.0.1:1").build();
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> getSlowLogPayloads(largeLogRequestIPv4WithPort).size() == 1));
AdminProtos.SlowLogResponseRequest largeLogRequestIPv4WithoutPort =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG)
.setClientAddress("127.0.0.1").build();
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> getSlowLogPayloads(largeLogRequestIPv4WithoutPort).size() == 2));
}

@Test
public void testConcurrentSlowLogEvents() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ def help
=> get largelog responses only related to meta
region
hbase> get_largelog_responses '*', {'TABLE_NAME' => 't1'} => get largelog responses only related to t1 table
hbase> get_largelog_responses '*', {'CLIENT_IP' => '192.162.1.40'}
=> get largelog responses only related to the given
client IP address
hbase> get_largelog_responses '*', {'CLIENT_IP' => '192.162.1.40:60225'}
=> get largelog responses only related to the given
client IP address and port
hbase> get_largelog_responses '*', {'CLIENT_IP' => '192.162.1.40:60225', 'LIMIT' => 100}
=> get largelog responses with given client
IP address and get 100 records limit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ def help
=> get slowlog responses only related to meta
region
hbase> get_slowlog_responses '*', {'TABLE_NAME' => 't1'} => get slowlog responses only related to t1 table
hbase> get_slowlog_responses '*', {'CLIENT_IP' => '192.162.1.40'}
=> get slowlog responses only related to the given
client IP address
hbase> get_slowlog_responses '*', {'CLIENT_IP' => '192.162.1.40:60225'}
=> get slowlog responses only related to the given
client IP address and port
hbase> get_slowlog_responses '*', {'CLIENT_IP' => '192.162.1.40:60225', 'LIMIT' => 100}
=> get slowlog responses with given client
IP address and get 100 records limit
Expand Down