Skip to content

Commit fffdcba

Browse files
committed
Revert "HBASE-26125 Backport HBASE-25401 "Add trace support for async call in rpc client" to branch-2 (#3543)"
This reverts commit ca09643.
1 parent 620806e commit fffdcba

File tree

14 files changed

+174
-299
lines changed

14 files changed

+174
-299
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java

Lines changed: 28 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,6 @@
2121
import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
2222
import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
2323

24-
import io.opentelemetry.api.trace.Span;
25-
import io.opentelemetry.api.trace.StatusCode;
26-
import io.opentelemetry.context.Scope;
2724
import java.io.IOException;
2825
import java.net.SocketAddress;
2926
import java.util.Collection;
@@ -41,7 +38,6 @@
4138
import org.apache.hadoop.hbase.net.Address;
4239
import org.apache.hadoop.hbase.security.User;
4340
import org.apache.hadoop.hbase.security.UserProvider;
44-
import org.apache.hadoop.hbase.trace.TraceUtil;
4541
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
4642
import org.apache.hadoop.hbase.util.PoolMap;
4743
import org.apache.hadoop.hbase.util.Threads;
@@ -369,7 +365,7 @@ private T getConnection(ConnectionId remoteId) throws IOException {
369365
protected abstract T createConnection(ConnectionId remoteId) throws IOException;
370366

371367
private void onCallFinished(Call call, HBaseRpcController hrc, Address addr,
372-
RpcCallback<Message> callback) {
368+
RpcCallback<Message> callback) {
373369
call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime());
374370
if (metrics != null) {
375371
metrics.updateRpc(call.md, call.param, call.callStats);
@@ -392,59 +388,44 @@ private void onCallFinished(Call call, HBaseRpcController hrc, Address addr,
392388
}
393389
}
394390

395-
private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
391+
Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
396392
final Message param, Message returnType, final User ticket,
397393
final Address addr, final RpcCallback<Message> callback) {
398-
Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcClient.callMethod." + md.getFullName())
399-
.startSpan();
400-
try (Scope scope = span.makeCurrent()) {
401-
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
402-
cs.setStartTime(EnvironmentEdgeManager.currentTime());
403-
404-
if (param instanceof ClientProtos.MultiRequest) {
405-
ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param;
406-
int numActions = 0;
407-
for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
408-
numActions += regionAction.getActionCount();
409-
}
410-
411-
cs.setNumActionsPerServer(numActions);
394+
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
395+
cs.setStartTime(EnvironmentEdgeManager.currentTime());
396+
397+
if (param instanceof ClientProtos.MultiRequest) {
398+
ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param;
399+
int numActions = 0;
400+
for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
401+
numActions += regionAction.getActionCount();
412402
}
413403

414-
final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
415-
Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
404+
cs.setNumActionsPerServer(numActions);
405+
}
406+
407+
final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
408+
Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
416409
hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
417410
@Override
418411
public void run(Call call) {
419-
try (Scope scope = call.span.makeCurrent()) {
420-
counter.decrementAndGet();
421-
onCallFinished(call, hrc, addr, callback);
422-
} finally {
423-
if (hrc.failed()) {
424-
span.setStatus(StatusCode.ERROR);
425-
span.recordException(hrc.getFailed());
426-
} else {
427-
span.setStatus(StatusCode.OK);
428-
}
429-
span.end();
430-
}
412+
counter.decrementAndGet();
413+
onCallFinished(call, hrc, addr, callback);
431414
}
432415
}, cs);
433-
ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr);
434-
int count = counter.incrementAndGet();
435-
try {
436-
if (count > maxConcurrentCallsPerServer) {
437-
throw new ServerTooBusyException(addr, count);
438-
}
439-
cs.setConcurrentCallsPerServer(count);
440-
T connection = getConnection(remoteId);
441-
connection.sendRequest(call, hrc);
442-
} catch (Exception e) {
443-
call.setException(toIOE(e));
444-
span.end();
416+
ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr);
417+
int count = counter.incrementAndGet();
418+
try {
419+
if (count > maxConcurrentCallsPerServer) {
420+
throw new ServerTooBusyException(addr, count);
445421
}
446-
return call;
422+
cs.setConcurrentCallsPerServer(count);
423+
T connection = getConnection(remoteId);
424+
connection.sendRequest(call, hrc);
425+
} catch (Exception e) {
426+
call.setException(toIOE(e));
447427
}
428+
return call;
448429
}
449430

450431
private static Address createAddr(ServerName sn) {

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
2525
import static org.apache.hadoop.hbase.ipc.IPCUtil.write;
2626

27+
import io.opentelemetry.api.trace.Span;
28+
import io.opentelemetry.context.Context;
2729
import io.opentelemetry.context.Scope;
2830
import java.io.BufferedInputStream;
2931
import java.io.BufferedOutputStream;
@@ -55,6 +57,7 @@
5557
import org.apache.hadoop.hbase.security.SaslUtil;
5658
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
5759
import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
60+
import org.apache.hadoop.hbase.trace.TraceUtil;
5861
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
5962
import org.apache.hadoop.hbase.util.ExceptionUtil;
6063
import org.apache.hadoop.io.IOUtils;
@@ -189,8 +192,8 @@ public void run() {
189192
if (call.isDone()) {
190193
continue;
191194
}
192-
try (Scope scope = call.span.makeCurrent()) {
193-
writeRequest(call);
195+
try {
196+
tracedWriteRequest(call);
194197
} catch (IOException e) {
195198
// exception here means the call has not been added to the pendingCalls yet, so we need
196199
// to fail it by our own.
@@ -591,6 +594,16 @@ private void negotiateCryptoAes(RPCProtos.CryptoCipherMeta cryptoCipherMeta)
591594
this.out = new DataOutputStream(new BufferedOutputStream(saslRpcClient.getOutputStream()));
592595
}
593596

597+
private void tracedWriteRequest(Call call) throws IOException {
598+
Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcClientImpl.tracedWriteRequest")
599+
.setParent(Context.current().with(call.span)).startSpan();
600+
try (Scope scope = span.makeCurrent()) {
601+
writeRequest(call);
602+
} finally {
603+
span.end();
604+
}
605+
}
606+
594607
/**
595608
* Initiates a call by sending the parameter to the remote server. Note: this is not called from
596609
* the Connection thread, but by other threads.
@@ -798,9 +811,7 @@ public void run(boolean cancelled) throws IOException {
798811
if (callSender != null) {
799812
callSender.sendCall(call);
800813
} else {
801-
// this is in the same thread with the caller so do not need to attach the trace context
802-
// again.
803-
writeRequest(call);
814+
tracedWriteRequest(call);
804815
}
805816
}
806817
});

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ class Call {
6161
final Span span;
6262
Timeout timeoutTask;
6363

64-
Call(int id, final Descriptors.MethodDescriptor md, Message param,
64+
protected Call(int id, final Descriptors.MethodDescriptor md, Message param,
6565
final CellScanner cells, final Message responseDefaultType, int timeout, int priority,
6666
RpcCallback<Call> callback, MetricsConnection.CallStats callStats) {
6767
this.param = param;

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
*/
1818
package org.apache.hadoop.hbase.ipc;
1919

20-
import io.opentelemetry.api.GlobalOpenTelemetry;
21-
import io.opentelemetry.context.Context;
2220
import java.io.IOException;
2321
import java.io.OutputStream;
2422
import java.lang.reflect.InvocationTargetException;
@@ -51,7 +49,6 @@
5149
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
5250
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
5351
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
54-
import org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo;
5552

5653
/**
5754
* Utility to help ipc'ing.
@@ -115,10 +112,11 @@ public static int getTotalSizeWhenWrittenDelimited(Message... messages) {
115112
static RequestHeader buildRequestHeader(Call call, CellBlockMeta cellBlockMeta) {
116113
RequestHeader.Builder builder = RequestHeader.newBuilder();
117114
builder.setCallId(call.id);
118-
RPCTInfo.Builder traceBuilder = RPCTInfo.newBuilder();
119-
GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(Context.current(),
120-
traceBuilder, (carrier, key, value) -> carrier.putHeaders(key, value));
121-
builder.setTraceInfo(traceBuilder.build());
115+
//TODO handle htrace API change, see HBASE-18895
116+
/*if (call.span != null) {
117+
builder.setTraceInfo(RPCTInfo.newBuilder().setParentId(call.span.getSpanId())
118+
.setTraceId(call.span.getTracerId()));
119+
}*/
122120
builder.setMethodName(call.md.getName());
123121
builder.setRequestParam(call.param != null);
124122
if (cellBlockMeta != null) {

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
*/
1818
package org.apache.hadoop.hbase.ipc;
1919

20-
import io.opentelemetry.context.Scope;
2120
import java.io.IOException;
2221
import java.util.HashMap;
2322
import java.util.Map;
@@ -115,12 +114,9 @@ private void writeRequest(ChannelHandlerContext ctx, Call call, ChannelPromise p
115114

116115
@Override
117116
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
118-
throws Exception {
117+
throws Exception {
119118
if (msg instanceof Call) {
120-
Call call = (Call) msg;
121-
try (Scope scope = call.span.makeCurrent()) {
122-
writeRequest(ctx, call, promise);
123-
}
119+
writeRequest(ctx, (Call) msg, promise);
124120
} else {
125121
ctx.write(msg, promise);
126122
}

hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hbase.trace;
1919

20-
import io.opentelemetry.api.GlobalOpenTelemetry;
20+
import io.opentelemetry.api.OpenTelemetry;
2121
import io.opentelemetry.api.trace.Tracer;
2222
import org.apache.yetus.audience.InterfaceAudience;
2323

@@ -30,6 +30,6 @@ private TraceUtil() {
3030
}
3131

3232
public static Tracer getGlobalTracer() {
33-
return GlobalOpenTelemetry.getTracer(INSTRUMENTATION_NAME);
33+
return OpenTelemetry.getGlobalTracer(INSTRUMENTATION_NAME);
3434
}
3535
}

hbase-protocol-shaded/src/main/protobuf/Tracing.proto

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@ option java_outer_classname = "TracingProtos";
2323
option java_generate_equals_and_hash = true;
2424
option optimize_for = SPEED;
2525

26-
// OpenTelemetry propagates trace context through https://www.w3.org/TR/trace-context/, which
27-
// is a text-based approach that passes properties with http headers. Here we will also use this
28-
// approach so we just need a map to store the key value pair.
29-
26+
//Used to pass through the information necessary to continue
27+
//a trace after an RPC is made. All we need is the traceid
28+
//(so we know the overarching trace this message is a part of), and
29+
//the id of the current span when this message was sent, so we know
30+
//what span caused the new span we will create when this message is received.
3031
message RPCTInfo {
31-
optional int64 trace_id = 1 [deprecated = true];
32-
optional int64 parent_id = 2 [deprecated = true];
33-
map<string, string> headers = 3;
32+
optional int64 trace_id = 1;
33+
optional int64 parent_id = 2;
3434
}

hbase-server/pom.xml

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -440,16 +440,6 @@
440440
<artifactId>hamcrest-core</artifactId>
441441
<scope>test</scope>
442442
</dependency>
443-
<dependency>
444-
<groupId>io.opentelemetry</groupId>
445-
<artifactId>opentelemetry-sdk</artifactId>
446-
<scope>test</scope>
447-
</dependency>
448-
<dependency>
449-
<groupId>io.opentelemetry</groupId>
450-
<artifactId>opentelemetry-sdk-testing</artifactId>
451-
<scope>test</scope>
452-
</dependency>
453443
<dependency>
454444
<groupId>org.hamcrest</groupId>
455445
<artifactId>hamcrest-library</artifactId>

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
package org.apache.hadoop.hbase.ipc;
1919

2020
import io.opentelemetry.api.trace.Span;
21-
import io.opentelemetry.api.trace.StatusCode;
22-
import io.opentelemetry.context.Context;
2321
import io.opentelemetry.context.Scope;
2422
import java.net.InetSocketAddress;
2523
import java.nio.channels.ClosedChannelException;
@@ -75,6 +73,15 @@ public RpcCall getRpcCall() {
7573
return call;
7674
}
7775

76+
/**
77+
* Keep for backward compatibility.
78+
* @deprecated As of release 2.0, this will be removed in HBase 3.0
79+
*/
80+
@Deprecated
81+
public ServerCall<?> getCall() {
82+
return (ServerCall<?>) call;
83+
}
84+
7885
public void setStatus(MonitoredRPCHandler status) {
7986
this.status = status;
8087
}
@@ -123,8 +130,7 @@ public void run() {
123130
String serviceName = getServiceName();
124131
String methodName = getMethodName();
125132
String traceString = serviceName + "." + methodName;
126-
Span span = TraceUtil.getGlobalTracer().spanBuilder(traceString)
127-
.setParent(Context.current().with(((ServerCall<?>) call).getSpan())).startSpan();
133+
Span span = TraceUtil.getGlobalTracer().spanBuilder(traceString).startSpan();
128134
try (Scope traceScope = span.makeCurrent()) {
129135
if (!this.rpcServer.isStarted()) {
130136
InetSocketAddress address = rpcServer.getListenerAddress();
@@ -135,12 +141,8 @@ public void run() {
135141
resultPair = this.rpcServer.call(call, this.status);
136142
} catch (TimeoutIOException e){
137143
RpcServer.LOG.warn("Can not complete this request in time, drop it: " + call);
138-
span.recordException(e);
139-
span.setStatus(StatusCode.ERROR);
140144
return;
141145
} catch (Throwable e) {
142-
span.recordException(e);
143-
span.setStatus(StatusCode.ERROR);
144146
if (e instanceof ServerNotRunningYetException) {
145147
// If ServerNotRunningYetException, don't spew stack trace.
146148
if (RpcServer.LOG.isTraceEnabled()) {
@@ -159,7 +161,6 @@ public void run() {
159161
RpcServer.CurCall.set(null);
160162
if (resultPair != null) {
161163
this.rpcServer.addCallSize(call.getSize() * -1);
162-
span.setStatus(StatusCode.OK);
163164
sucessful = true;
164165
}
165166
span.end();

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
*/
1818
package org.apache.hadoop.hbase.ipc;
1919

20-
import io.opentelemetry.api.trace.Span;
21-
import io.opentelemetry.api.trace.StatusCode;
2220
import java.io.IOException;
2321
import java.net.InetAddress;
2422
import java.nio.ByteBuffer;
@@ -104,8 +102,6 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
104102
// from WAL side on release
105103
private final AtomicInteger reference = new AtomicInteger(0x80000000);
106104

107-
private final Span span;
108-
109105
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH",
110106
justification = "Can't figure why this complaint is happening... see below")
111107
ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
@@ -136,7 +132,6 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
136132
this.bbAllocator = byteBuffAllocator;
137133
this.cellBlockBuilder = cellBlockBuilder;
138134
this.reqCleanup = reqCleanup;
139-
this.span = Span.current();
140135
}
141136

142137
/**
@@ -155,7 +150,6 @@ public void done() {
155150
// If the call was run successfuly, we might have already returned the BB
156151
// back to pool. No worries..Then inputCellBlock will be null
157152
cleanup();
158-
span.end();
159153
}
160154

161155
@Override
@@ -232,10 +226,6 @@ public synchronized void setResponse(Message m, final CellScanner cells, Throwab
232226
}
233227
if (t != null) {
234228
this.isError = true;
235-
span.recordException(t);
236-
span.setStatus(StatusCode.ERROR);
237-
} else {
238-
span.setStatus(StatusCode.OK);
239229
}
240230
BufferChain bc = null;
241231
try {
@@ -570,8 +560,4 @@ public synchronized BufferChain getResponse() {
570560
return response;
571561
}
572562
}
573-
574-
public Span getSpan() {
575-
return span;
576-
}
577563
}

0 commit comments

Comments
 (0)