Skip to content

Commit 4bb7409

Browse files
authored
HBASE-27981 Add connection and request attributes to slow log (#5412)
Signed-off-by: Bryan Beaudreault <[email protected]>
1 parent e8cbc3f commit 4bb7409

File tree

8 files changed

+250
-12
lines changed

8 files changed

+250
-12
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/OnlineLogRecord.java

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hbase.client;
1919

20+
import java.util.Map;
2021
import java.util.Optional;
2122
import org.apache.commons.lang3.builder.EqualsBuilder;
2223
import org.apache.commons.lang3.builder.HashCodeBuilder;
@@ -29,6 +30,8 @@
2930
import org.apache.hbase.thirdparty.com.google.gson.JsonObject;
3031
import org.apache.hbase.thirdparty.com.google.gson.JsonSerializer;
3132

33+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
34+
3235
/**
3336
* Slow/Large Log payload for hbase-client, to be used by Admin API get_slow_responses and
3437
* get_large_responses
@@ -53,6 +56,18 @@ final public class OnlineLogRecord extends LogEntry {
5356
if (slowLogPayload.getMultiServiceCalls() == 0) {
5457
jsonObj.remove("multiServiceCalls");
5558
}
59+
if (slowLogPayload.getRequestAttributes().isEmpty()) {
60+
jsonObj.remove("requestAttributes");
61+
} else {
62+
jsonObj.add("requestAttributes", gson
63+
.toJsonTree(ProtobufUtil.deserializeAttributes(slowLogPayload.getRequestAttributes())));
64+
}
65+
if (slowLogPayload.getConnectionAttributes().isEmpty()) {
66+
jsonObj.remove("connectionAttributes");
67+
} else {
68+
jsonObj.add("connectionAttributes", gson.toJsonTree(
69+
ProtobufUtil.deserializeAttributes(slowLogPayload.getConnectionAttributes())));
70+
}
5671
if (slowLogPayload.getScan().isPresent()) {
5772
jsonObj.add("scan", gson.toJsonTree(slowLogPayload.getScan().get().toMap()));
5873
} else {
@@ -79,6 +94,8 @@ final public class OnlineLogRecord extends LogEntry {
7994
private final int multiMutationsCount;
8095
private final int multiServiceCalls;
8196
private final Optional<Scan> scan;
97+
private final Map<String, byte[]> requestAttributes;
98+
private final Map<String, byte[]> connectionAttributes;
8299

83100
public long getStartTime() {
84101
return startTime;
@@ -152,11 +169,20 @@ public Optional<Scan> getScan() {
152169
return scan;
153170
}
154171

172+
public Map<String, byte[]> getRequestAttributes() {
173+
return requestAttributes;
174+
}
175+
176+
public Map<String, byte[]> getConnectionAttributes() {
177+
return connectionAttributes;
178+
}
179+
155180
OnlineLogRecord(final long startTime, final int processingTime, final int queueTime,
156181
final long responseSize, final long blockBytesScanned, final String clientAddress,
157182
final String serverClass, final String methodName, final String callDetails, final String param,
158183
final String regionName, final String userName, final int multiGetsCount,
159-
final int multiMutationsCount, final int multiServiceCalls, final Scan scan) {
184+
final int multiMutationsCount, final int multiServiceCalls, final Scan scan,
185+
final Map<String, byte[]> requestAttributes, final Map<String, byte[]> connectionAttributes) {
160186
this.startTime = startTime;
161187
this.processingTime = processingTime;
162188
this.queueTime = queueTime;
@@ -173,6 +199,8 @@ public Optional<Scan> getScan() {
173199
this.multiMutationsCount = multiMutationsCount;
174200
this.multiServiceCalls = multiServiceCalls;
175201
this.scan = Optional.ofNullable(scan);
202+
this.requestAttributes = requestAttributes;
203+
this.connectionAttributes = connectionAttributes;
176204
}
177205

178206
public static class OnlineLogRecordBuilder {
@@ -192,6 +220,8 @@ public static class OnlineLogRecordBuilder {
192220
private int multiMutationsCount;
193221
private int multiServiceCalls;
194222
private Scan scan = null;
223+
private Map<String, byte[]> requestAttributes;
224+
private Map<String, byte[]> connectionAttributes;
195225

196226
public OnlineLogRecordBuilder setStartTime(long startTime) {
197227
this.startTime = startTime;
@@ -276,10 +306,22 @@ public OnlineLogRecordBuilder setScan(Scan scan) {
276306
return this;
277307
}
278308

309+
public OnlineLogRecordBuilder setRequestAttributes(Map<String, byte[]> requestAttributes) {
310+
this.requestAttributes = requestAttributes;
311+
return this;
312+
}
313+
314+
public OnlineLogRecordBuilder
315+
setConnectionAttributes(Map<String, byte[]> connectionAttributes) {
316+
this.connectionAttributes = connectionAttributes;
317+
return this;
318+
}
319+
279320
public OnlineLogRecord build() {
280321
return new OnlineLogRecord(startTime, processingTime, queueTime, responseSize,
281322
blockBytesScanned, clientAddress, serverClass, methodName, callDetails, param, regionName,
282-
userName, multiGetsCount, multiMutationsCount, multiServiceCalls, scan);
323+
userName, multiGetsCount, multiMutationsCount, multiServiceCalls, scan, requestAttributes,
324+
connectionAttributes);
283325
}
284326
}
285327

@@ -304,7 +346,8 @@ public boolean equals(Object o) {
304346
.append(serverClass, that.serverClass).append(methodName, that.methodName)
305347
.append(callDetails, that.callDetails).append(param, that.param)
306348
.append(regionName, that.regionName).append(userName, that.userName).append(scan, that.scan)
307-
.isEquals();
349+
.append(requestAttributes, that.requestAttributes)
350+
.append(connectionAttributes, that.connectionAttributes).isEquals();
308351
}
309352

310353
@Override
@@ -313,7 +356,7 @@ public int hashCode() {
313356
.append(responseSize).append(blockBytesScanned).append(clientAddress).append(serverClass)
314357
.append(methodName).append(callDetails).append(param).append(regionName).append(userName)
315358
.append(multiGetsCount).append(multiMutationsCount).append(multiServiceCalls).append(scan)
316-
.toHashCode();
359+
.append(requestAttributes).append(connectionAttributes).toHashCode();
317360
}
318361

319362
@Override
@@ -330,7 +373,9 @@ public String toString() {
330373
.append("methodName", methodName).append("callDetails", callDetails).append("param", param)
331374
.append("regionName", regionName).append("userName", userName)
332375
.append("multiGetsCount", multiGetsCount).append("multiMutationsCount", multiMutationsCount)
333-
.append("multiServiceCalls", multiServiceCalls).append("scan", scan).toString();
376+
.append("multiServiceCalls", multiServiceCalls).append("scan", scan)
377+
.append("requestAttributes", requestAttributes)
378+
.append("connectionAttributes", connectionAttributes).toString();
334379
}
335380

336381
}

hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2163,6 +2163,25 @@ public static SlowLogParams getSlowLogParams(Message message, boolean slowLogSca
21632163
return new SlowLogParams(params);
21642164
}
21652165

2166+
/**
2167+
* Convert a list of NameBytesPair to a more readable CSV
2168+
*/
2169+
public static String convertAttributesToCsv(List<NameBytesPair> attributes) {
2170+
if (attributes.isEmpty()) {
2171+
return HConstants.EMPTY_STRING;
2172+
}
2173+
return deserializeAttributes(convertNameBytesPairsToMap(attributes)).entrySet().stream()
2174+
.map(entry -> entry.getKey() + " = " + entry.getValue()).collect(Collectors.joining(", "));
2175+
}
2176+
2177+
/**
2178+
* Convert a map of byte array attributes to a more readable map of binary string representations
2179+
*/
2180+
public static Map<String, String> deserializeAttributes(Map<String, byte[]> attributes) {
2181+
return attributes.entrySet().stream().collect(
2182+
Collectors.toMap(Map.Entry::getKey, entry -> Bytes.toStringBinary(entry.getValue())));
2183+
}
2184+
21662185
/**
21672186
* Print out some subset of a MutationProto rather than all of it and its data
21682187
* @param proto Protobuf to print out
@@ -3348,7 +3367,10 @@ private static LogEntry getSlowLogRecord(final TooSlowLog.SlowLogPayload slowLog
33483367
.setResponseSize(slowLogPayload.getResponseSize())
33493368
.setBlockBytesScanned(slowLogPayload.getBlockBytesScanned())
33503369
.setServerClass(slowLogPayload.getServerClass()).setStartTime(slowLogPayload.getStartTime())
3351-
.setUserName(slowLogPayload.getUserName());
3370+
.setUserName(slowLogPayload.getUserName())
3371+
.setRequestAttributes(convertNameBytesPairsToMap(slowLogPayload.getRequestAttributeList()))
3372+
.setConnectionAttributes(
3373+
convertNameBytesPairsToMap(slowLogPayload.getConnectionAttributeList()));
33523374
if (slowLogPayload.hasScan()) {
33533375
try {
33543376
onlineLogRecord.setScan(ProtobufUtil.toScan(slowLogPayload.getScan()));
@@ -3359,6 +3381,12 @@ private static LogEntry getSlowLogRecord(final TooSlowLog.SlowLogPayload slowLog
33593381
return onlineLogRecord.build();
33603382
}
33613383

3384+
private static Map<String, byte[]>
3385+
convertNameBytesPairsToMap(List<NameBytesPair> nameBytesPairs) {
3386+
return nameBytesPairs.stream().collect(Collectors.toMap(NameBytesPair::getName,
3387+
nameBytesPair -> nameBytesPair.getValue().toByteArray()));
3388+
}
3389+
33623390
/**
33633391
* Convert AdminProtos#SlowLogResponses to list of {@link OnlineLogRecord}
33643392
* @param logEntry slowlog response protobuf instance

hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestOnlineLogRecord.java

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
*/
1818
package org.apache.hadoop.hbase.client;
1919

20+
import java.util.Collections;
21+
import java.util.Map;
22+
import java.util.Set;
2023
import org.apache.hadoop.hbase.HBaseClassTestRule;
2124
import org.apache.hadoop.hbase.testclassification.ClientTests;
2225
import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -26,6 +29,9 @@
2629
import org.junit.Test;
2730
import org.junit.experimental.categories.Category;
2831

32+
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
33+
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
34+
2935
@Category({ ClientTests.class, SmallTests.class })
3036
public class TestOnlineLogRecord {
3137

@@ -47,10 +53,56 @@ public void itSerializesScan() {
4753
+ " \"maxResultSize\": -1,\n" + " \"families\": {},\n" + " \"caching\": -1,\n"
4854
+ " \"maxVersions\": 1,\n" + " \"timeRange\": [\n" + " 0,\n"
4955
+ " 9223372036854775807\n" + " ]\n" + " }\n" + "}";
50-
OnlineLogRecord o =
51-
new OnlineLogRecord(1, 2, 3, 4, 5, null, null, null, null, null, null, null, 6, 7, 0, scan);
56+
OnlineLogRecord o = new OnlineLogRecord(1, 2, 3, 4, 5, null, null, null, null, null, null, null,
57+
6, 7, 0, scan, Collections.emptyMap(), Collections.emptyMap());
5258
String actualOutput = o.toJsonPrettyPrint();
5359
System.out.println(actualOutput);
5460
Assert.assertEquals(actualOutput, expectedOutput);
5561
}
62+
63+
@Test
64+
public void itSerializesRequestAttributes() {
65+
Map<String, byte[]> requestAttributes = ImmutableMap.<String, byte[]> builder()
66+
.put("r", Bytes.toBytes("1")).put("2", Bytes.toBytes(0.0)).build();
67+
Set<String> expectedOutputs =
68+
ImmutableSet.<String> builder().add("requestAttributes").add("\"r\": \"1\"")
69+
.add("\"2\": \"\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\"").build();
70+
OnlineLogRecord o = new OnlineLogRecord(1, 2, 3, 4, 5, null, null, null, null, null, null, null,
71+
6, 7, 0, null, requestAttributes, Collections.emptyMap());
72+
String actualOutput = o.toJsonPrettyPrint();
73+
System.out.println(actualOutput);
74+
expectedOutputs.forEach(expected -> Assert.assertTrue(actualOutput.contains(expected)));
75+
}
76+
77+
@Test
78+
public void itOmitsEmptyRequestAttributes() {
79+
OnlineLogRecord o = new OnlineLogRecord(1, 2, 3, 4, 5, null, null, null, null, null, null, null,
80+
6, 7, 0, null, Collections.emptyMap(), Collections.emptyMap());
81+
String actualOutput = o.toJsonPrettyPrint();
82+
System.out.println(actualOutput);
83+
Assert.assertFalse(actualOutput.contains("requestAttributes"));
84+
}
85+
86+
@Test
87+
public void itSerializesConnectionAttributes() {
88+
Map<String, byte[]> connectionAttributes = ImmutableMap.<String, byte[]> builder()
89+
.put("c", Bytes.toBytes("1")).put("2", Bytes.toBytes(0.0)).build();
90+
Set<String> expectedOutputs =
91+
ImmutableSet.<String> builder().add("connectionAttributes").add("\"c\": \"1\"")
92+
.add("\"2\": \"\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\"").build();
93+
OnlineLogRecord o = new OnlineLogRecord(1, 2, 3, 4, 5, null, null, null, null, null, null, null,
94+
6, 7, 0, null, Collections.emptyMap(), connectionAttributes);
95+
String actualOutput = o.toJsonPrettyPrint();
96+
System.out.println(actualOutput);
97+
expectedOutputs.forEach(expected -> Assert.assertTrue(actualOutput.contains(expected)));
98+
}
99+
100+
@Test
101+
public void itOmitsEmptyConnectionAttributes() {
102+
OnlineLogRecord o = new OnlineLogRecord(1, 2, 3, 4, 5, null, null, null, null, null, null, null,
103+
6, 7, 0, null, Collections.emptyMap(), Collections.emptyMap());
104+
String actualOutput = o.toJsonPrettyPrint();
105+
System.out.println(actualOutput);
106+
Assert.assertFalse(actualOutput.contains("connectionAttributes"));
107+
}
56108
}

hbase-protocol-shaded/src/main/protobuf/TooSlowLog.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ option java_outer_classname = "TooSlowLog";
2727
option java_generate_equals_and_hash = true;
2828
option optimize_for = SPEED;
2929

30+
import "HBase.proto";
3031
import "Client.proto";
3132

3233
message SlowLogPayload {
@@ -49,6 +50,9 @@ message SlowLogPayload {
4950
optional int64 block_bytes_scanned = 16;
5051
optional Scan scan = 17;
5152

53+
repeated NameBytesPair connection_attribute = 18;
54+
repeated NameBytesPair request_attribute = 19;
55+
5256
// SLOW_LOG is RPC call slow in nature whereas LARGE_LOG is RPC call quite large.
5357
// Majority of times, slow logs are also large logs and hence, ALL is combination of
5458
// both

hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hbase.namequeues;
1919

20+
import java.util.Map;
2021
import org.apache.commons.lang3.builder.ToStringBuilder;
2122
import org.apache.hadoop.hbase.ipc.RpcCall;
2223
import org.apache.yetus.audience.InterfaceAudience;
@@ -39,6 +40,8 @@ public class RpcLogDetails extends NamedQueuePayload {
3940
private final String className;
4041
private final boolean isSlowLog;
4142
private final boolean isLargeLog;
43+
private final Map<String, byte[]> connectionAttributes;
44+
private final Map<String, byte[]> requestAttributes;
4245

4346
public RpcLogDetails(RpcCall rpcCall, Message param, String clientAddress, long responseSize,
4447
long blockBytesScanned, String className, boolean isSlowLog, boolean isLargeLog) {
@@ -51,6 +54,12 @@ public RpcLogDetails(RpcCall rpcCall, Message param, String clientAddress, long
5154
this.className = className;
5255
this.isSlowLog = isSlowLog;
5356
this.isLargeLog = isLargeLog;
57+
58+
// it's important to call getConnectionAttributes and getRequestAttributes here
59+
// because otherwise the buffers may get released before the log details are processed which
60+
// would result in corrupted attributes
61+
this.connectionAttributes = rpcCall.getConnectionAttributes();
62+
this.requestAttributes = rpcCall.getRequestAttributes();
5463
}
5564

5665
public RpcCall getRpcCall() {
@@ -85,11 +94,20 @@ public Message getParam() {
8594
return param;
8695
}
8796

97+
public Map<String, byte[]> getConnectionAttributes() {
98+
return connectionAttributes;
99+
}
100+
101+
public Map<String, byte[]> getRequestAttributes() {
102+
return requestAttributes;
103+
}
104+
88105
@Override
89106
public String toString() {
90107
return new ToStringBuilder(this).append("rpcCall", rpcCall).append("param", param)
91108
.append("clientAddress", clientAddress).append("responseSize", responseSize)
92109
.append("className", className).append("isSlowLog", isSlowLog)
93-
.append("isLargeLog", isLargeLog).toString();
110+
.append("isLargeLog", isLargeLog).append("connectionAttributes", connectionAttributes)
111+
.append("requestAttributes", requestAttributes).toString();
94112
}
95113
}

hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
package org.apache.hadoop.hbase.namequeues.impl;
1919

2020
import java.util.Arrays;
21+
import java.util.Collection;
2122
import java.util.Collections;
2223
import java.util.List;
24+
import java.util.Map;
2325
import java.util.Queue;
2426
import java.util.stream.Collectors;
2527
import org.apache.commons.lang3.StringUtils;
@@ -42,12 +44,14 @@
4244

4345
import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
4446
import org.apache.hbase.thirdparty.com.google.common.collect.Queues;
47+
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
4548
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
4649
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
4750

4851
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
4952
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
5053
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
54+
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
5155
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
5256

5357
/**
@@ -164,7 +168,9 @@ public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) {
164168
.setProcessingTime(processingTime).setQueueTime(qTime)
165169
.setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY)
166170
.setResponseSize(responseSize).setBlockBytesScanned(blockBytesScanned)
167-
.setServerClass(className).setStartTime(startTime).setType(type).setUserName(userName);
171+
.setServerClass(className).setStartTime(startTime).setType(type).setUserName(userName)
172+
.addAllRequestAttribute(buildNameBytesPairs(rpcLogDetails.getRequestAttributes()))
173+
.addAllConnectionAttribute(buildNameBytesPairs(rpcLogDetails.getConnectionAttributes()));
168174
if (slowLogParams != null && slowLogParams.getScan() != null) {
169175
slowLogPayloadBuilder.setScan(slowLogParams.getScan());
170176
}
@@ -177,6 +183,16 @@ public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) {
177183
}
178184
}
179185

186+
private static Collection<HBaseProtos.NameBytesPair>
187+
buildNameBytesPairs(Map<String, byte[]> attributes) {
188+
if (attributes == null) {
189+
return Collections.emptySet();
190+
}
191+
return attributes.entrySet().stream().map(attr -> HBaseProtos.NameBytesPair.newBuilder()
192+
.setName(attr.getKey()).setValue(ByteString.copyFrom(attr.getValue())).build())
193+
.collect(Collectors.toSet());
194+
}
195+
180196
@Override
181197
public boolean clearNamedQueue() {
182198
if (!isOnlineLogProviderEnabled) {

0 commit comments

Comments
 (0)