Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.Collection;
Expand All @@ -41,7 +38,6 @@
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.PoolMap;
import org.apache.hadoop.hbase.util.Threads;
Expand Down Expand Up @@ -369,7 +365,7 @@ private T getConnection(ConnectionId remoteId) throws IOException {
protected abstract T createConnection(ConnectionId remoteId) throws IOException;

private void onCallFinished(Call call, HBaseRpcController hrc, Address addr,
RpcCallback<Message> callback) {
RpcCallback<Message> callback) {
call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime());
if (metrics != null) {
metrics.updateRpc(call.md, call.param, call.callStats);
Expand All @@ -392,59 +388,44 @@ private void onCallFinished(Call call, HBaseRpcController hrc, Address addr,
}
}

private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
final Message param, Message returnType, final User ticket,
final Address addr, final RpcCallback<Message> callback) {
Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcClient.callMethod." + md.getFullName())
.startSpan();
try (Scope scope = span.makeCurrent()) {
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
cs.setStartTime(EnvironmentEdgeManager.currentTime());

if (param instanceof ClientProtos.MultiRequest) {
ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param;
int numActions = 0;
for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
numActions += regionAction.getActionCount();
}

cs.setNumActionsPerServer(numActions);
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
cs.setStartTime(EnvironmentEdgeManager.currentTime());

if (param instanceof ClientProtos.MultiRequest) {
ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param;
int numActions = 0;
for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
numActions += regionAction.getActionCount();
}

final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
cs.setNumActionsPerServer(numActions);
}

final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
@Override
public void run(Call call) {
try (Scope scope = call.span.makeCurrent()) {
counter.decrementAndGet();
onCallFinished(call, hrc, addr, callback);
} finally {
if (hrc.failed()) {
span.setStatus(StatusCode.ERROR);
span.recordException(hrc.getFailed());
} else {
span.setStatus(StatusCode.OK);
}
span.end();
}
counter.decrementAndGet();
onCallFinished(call, hrc, addr, callback);
}
}, cs);
ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr);
int count = counter.incrementAndGet();
try {
if (count > maxConcurrentCallsPerServer) {
throw new ServerTooBusyException(addr, count);
}
cs.setConcurrentCallsPerServer(count);
T connection = getConnection(remoteId);
connection.sendRequest(call, hrc);
} catch (Exception e) {
call.setException(toIOE(e));
span.end();
ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr);
int count = counter.incrementAndGet();
try {
if (count > maxConcurrentCallsPerServer) {
throw new ServerTooBusyException(addr, count);
}
return call;
cs.setConcurrentCallsPerServer(count);
T connection = getConnection(remoteId);
connection.sendRequest(call, hrc);
} catch (Exception e) {
call.setException(toIOE(e));
}
return call;
}

private static Address createAddr(ServerName sn) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
import static org.apache.hadoop.hbase.ipc.IPCUtil.write;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
Expand Down Expand Up @@ -55,6 +57,7 @@
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.io.IOUtils;
Expand Down Expand Up @@ -189,8 +192,8 @@ public void run() {
if (call.isDone()) {
continue;
}
try (Scope scope = call.span.makeCurrent()) {
writeRequest(call);
try {
tracedWriteRequest(call);
} catch (IOException e) {
// exception here means the call has not been added to the pendingCalls yet, so we need
// to fail it by our own.
Expand Down Expand Up @@ -591,6 +594,16 @@ private void negotiateCryptoAes(RPCProtos.CryptoCipherMeta cryptoCipherMeta)
this.out = new DataOutputStream(new BufferedOutputStream(saslRpcClient.getOutputStream()));
}

private void tracedWriteRequest(Call call) throws IOException {
Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcClientImpl.tracedWriteRequest")
.setParent(Context.current().with(call.span)).startSpan();
try (Scope scope = span.makeCurrent()) {
writeRequest(call);
} finally {
span.end();
}
}

/**
* Initiates a call by sending the parameter to the remote server. Note: this is not called from
* the Connection thread, but by other threads.
Expand Down Expand Up @@ -798,9 +811,7 @@ public void run(boolean cancelled) throws IOException {
if (callSender != null) {
callSender.sendCall(call);
} else {
// this is in the same thread with the caller so do not need to attach the trace context
// again.
writeRequest(call);
tracedWriteRequest(call);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class Call {
final Span span;
Timeout timeoutTask;

Call(int id, final Descriptors.MethodDescriptor md, Message param,
protected Call(int id, final Descriptors.MethodDescriptor md, Message param,
final CellScanner cells, final Message responseDefaultType, int timeout, int priority,
RpcCallback<Call> callback, MetricsConnection.CallStats callStats) {
this.param = param;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hbase.ipc;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.context.Context;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
Expand Down Expand Up @@ -51,7 +49,6 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo;

/**
* Utility to help ipc'ing.
Expand Down Expand Up @@ -115,10 +112,11 @@ public static int getTotalSizeWhenWrittenDelimited(Message... messages) {
static RequestHeader buildRequestHeader(Call call, CellBlockMeta cellBlockMeta) {
RequestHeader.Builder builder = RequestHeader.newBuilder();
builder.setCallId(call.id);
RPCTInfo.Builder traceBuilder = RPCTInfo.newBuilder();
GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(Context.current(),
traceBuilder, (carrier, key, value) -> carrier.putHeaders(key, value));
builder.setTraceInfo(traceBuilder.build());
//TODO handle htrace API change, see HBASE-18895
/*if (call.span != null) {
builder.setTraceInfo(RPCTInfo.newBuilder().setParentId(call.span.getSpanId())
.setTraceId(call.span.getTracerId()));
}*/
builder.setMethodName(call.md.getName());
builder.setRequestParam(call.param != null);
if (cellBlockMeta != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.ipc;

import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -115,12 +114,9 @@ private void writeRequest(ChannelHandlerContext ctx, Call call, ChannelPromise p

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
throws Exception {
if (msg instanceof Call) {
Call call = (Call) msg;
try (Scope scope = call.span.makeCurrent()) {
writeRequest(ctx, call, promise);
}
writeRequest(ctx, (Call) msg, promise);
} else {
ctx.write(msg, promise);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hbase.trace;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import org.apache.yetus.audience.InterfaceAudience;

Expand All @@ -30,6 +30,6 @@ private TraceUtil() {
}

public static Tracer getGlobalTracer() {
return GlobalOpenTelemetry.getTracer(INSTRUMENTATION_NAME);
return OpenTelemetry.getGlobalTracer(INSTRUMENTATION_NAME);
}
}
14 changes: 7 additions & 7 deletions hbase-protocol-shaded/src/main/protobuf/Tracing.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ option java_outer_classname = "TracingProtos";
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

// OpenTelemetry propagates trace context through https://www.w3.org/TR/trace-context/, which
// is a text-based approach that passes properties with http headers. Here we will also use this
// approach so we just need a map to store the key value pair.

//Used to pass through the information necessary to continue
//a trace after an RPC is made. All we need is the traceid
//(so we know the overarching trace this message is a part of), and
//the id of the current span when this message was sent, so we know
//what span caused the new span we will create when this message is received.
message RPCTInfo {
optional int64 trace_id = 1 [deprecated = true];
optional int64 parent_id = 2 [deprecated = true];
map<string, string> headers = 3;
optional int64 trace_id = 1;
optional int64 parent_id = 2;
}
10 changes: 0 additions & 10 deletions hbase-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -440,16 +440,6 @@
<artifactId>hamcrest-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package org.apache.hadoop.hbase.ipc;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
Expand Down Expand Up @@ -75,6 +73,15 @@ public RpcCall getRpcCall() {
return call;
}

/**
* Keep for backward compatibility.
* @deprecated As of release 2.0, this will be removed in HBase 3.0
*/
@Deprecated
public ServerCall<?> getCall() {
return (ServerCall<?>) call;
}

public void setStatus(MonitoredRPCHandler status) {
this.status = status;
}
Expand Down Expand Up @@ -123,8 +130,7 @@ public void run() {
String serviceName = getServiceName();
String methodName = getMethodName();
String traceString = serviceName + "." + methodName;
Span span = TraceUtil.getGlobalTracer().spanBuilder(traceString)
.setParent(Context.current().with(((ServerCall<?>) call).getSpan())).startSpan();
Span span = TraceUtil.getGlobalTracer().spanBuilder(traceString).startSpan();
try (Scope traceScope = span.makeCurrent()) {
if (!this.rpcServer.isStarted()) {
InetSocketAddress address = rpcServer.getListenerAddress();
Expand All @@ -135,12 +141,8 @@ public void run() {
resultPair = this.rpcServer.call(call, this.status);
} catch (TimeoutIOException e){
RpcServer.LOG.warn("Can not complete this request in time, drop it: " + call);
span.recordException(e);
span.setStatus(StatusCode.ERROR);
return;
} catch (Throwable e) {
span.recordException(e);
span.setStatus(StatusCode.ERROR);
if (e instanceof ServerNotRunningYetException) {
// If ServerNotRunningYetException, don't spew stack trace.
if (RpcServer.LOG.isTraceEnabled()) {
Expand All @@ -159,7 +161,6 @@ public void run() {
RpcServer.CurCall.set(null);
if (resultPair != null) {
this.rpcServer.addCallSize(call.getSize() * -1);
span.setStatus(StatusCode.OK);
sucessful = true;
}
span.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hbase.ipc;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -104,8 +102,6 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
// from WAL side on release
private final AtomicInteger reference = new AtomicInteger(0x80000000);

private final Span span;

@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH",
justification = "Can't figure why this complaint is happening... see below")
ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
Expand Down Expand Up @@ -136,7 +132,6 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
this.bbAllocator = byteBuffAllocator;
this.cellBlockBuilder = cellBlockBuilder;
this.reqCleanup = reqCleanup;
this.span = Span.current();
}

/**
Expand All @@ -155,7 +150,6 @@ public void done() {
// If the call was run successfuly, we might have already returned the BB
// back to pool. No worries..Then inputCellBlock will be null
cleanup();
span.end();
}

@Override
Expand Down Expand Up @@ -232,10 +226,6 @@ public synchronized void setResponse(Message m, final CellScanner cells, Throwab
}
if (t != null) {
this.isError = true;
span.recordException(t);
span.setStatus(StatusCode.ERROR);
} else {
span.setStatus(StatusCode.OK);
}
BufferChain bc = null;
try {
Expand Down Expand Up @@ -570,8 +560,4 @@ public synchronized BufferChain getResponse() {
return response;
}
}

public Span getSpan() {
return span;
}
}
Loading