Skip to content

Commit 620e5c6

Browse files
committed
HBASE-26545 Implement tracing of scan
* on `AsyncTable`, both `scan` and `scanAll` methods should result in `SCAN` table operations. * the span of the `SCAN` table operation should have children representing all the RPC calls involved in servicing the scan. * when a user provides custom implementation of `AdvancedScanResultConsumer`, any spans emitted from the callback methods should also be tied to the span that represents the `SCAN` table operation. This is easily done because these callbacks are executed on the RPC thread. * when a user provides a custom implementation of `ScanResultConsumer`, any spans emitted from the callback methods should be also be tied to the span that represents the `SCAN` table operation. This accomplished by carefully passing the span instance around after it is created. Signed-off-by: Andrew Purtell <[email protected]> Signed-off-by: Duo Zhang <[email protected]>
1 parent 4f491fd commit 620e5c6

20 files changed

+1033
-234
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java

Lines changed: 94 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/**
1+
/*
22
* Licensed to the Apache Software Foundation (ASF) under one
33
* or more contributor license agreements. See the NOTICE file
44
* distributed with this work for additional information
@@ -27,19 +27,21 @@
2727
import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote;
2828
import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead;
2929
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
30-
30+
import io.opentelemetry.api.trace.Span;
31+
import io.opentelemetry.api.trace.StatusCode;
32+
import io.opentelemetry.context.Scope;
3133
import java.io.IOException;
3234
import java.util.concurrent.CompletableFuture;
3335
import java.util.concurrent.TimeUnit;
3436
import java.util.concurrent.atomic.AtomicInteger;
3537
import org.apache.hadoop.hbase.HRegionLocation;
3638
import org.apache.hadoop.hbase.TableName;
3739
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
40+
import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder;
3841
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
42+
import org.apache.hadoop.hbase.trace.TraceUtil;
3943
import org.apache.yetus.audience.InterfaceAudience;
40-
4144
import org.apache.hbase.thirdparty.io.netty.util.Timer;
42-
4345
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
4446
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
4547
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface;
@@ -85,6 +87,8 @@ class AsyncClientScanner {
8587

8688
private final ScanResultCache resultCache;
8789

90+
private final Span span;
91+
8892
public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName,
8993
AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseForCQTBENs,
9094
int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
@@ -112,6 +116,21 @@ public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableN
112116
} else {
113117
this.scanMetrics = null;
114118
}
119+
120+
/*
121+
* Assumes that the `start()` method is called immediately after construction. If this is no
122+
* longer the case, for tracing correctness, we should move the start of the span into the
123+
* `start()` method. The cost of doing so would be making access to the `span` safe for
124+
* concurrent threads.
125+
*/
126+
span = new TableOperationSpanBuilder(conn)
127+
.setTableName(tableName)
128+
.setOperation(scan)
129+
.build();
130+
if (consumer instanceof AsyncTableResultScanner) {
131+
AsyncTableResultScanner scanner = (AsyncTableResultScanner) consumer;
132+
scanner.setSpan(span);
133+
}
115134
}
116135

117136
private static final class OpenScannerResponse {
@@ -140,26 +159,35 @@ public OpenScannerResponse(HRegionLocation loc, boolean isRegionServerRemote, In
140159

141160
private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcController controller,
142161
HRegionLocation loc, ClientService.Interface stub) {
143-
boolean isRegionServerRemote = isRemote(loc.getHostname());
144-
incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
145-
if (openScannerTries.getAndIncrement() > 1) {
146-
incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
162+
try (Scope ignored = span.makeCurrent()) {
163+
boolean isRegionServerRemote = isRemote(loc.getHostname());
164+
incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
165+
if (openScannerTries.getAndIncrement() > 1) {
166+
incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
167+
}
168+
CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>();
169+
try {
170+
ScanRequest request = RequestConverter.buildScanRequest(
171+
loc.getRegion().getRegionName(), scan, scan.getCaching(), false);
172+
stub.scan(controller, request, resp -> {
173+
try (Scope ignored1 = span.makeCurrent()) {
174+
if (controller.failed()) {
175+
final IOException e = controller.getFailed();
176+
future.completeExceptionally(e);
177+
TraceUtil.setError(span, e);
178+
span.end();
179+
return;
180+
}
181+
future.complete(new OpenScannerResponse(
182+
loc, isRegionServerRemote, stub, controller, resp));
183+
}
184+
});
185+
} catch (IOException e) {
186+
// span is closed by listener attached to the Future in `openScanner()`
187+
future.completeExceptionally(e);
188+
}
189+
return future;
147190
}
148-
CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>();
149-
try {
150-
ScanRequest request = RequestConverter.buildScanRequest(loc.getRegion().getRegionName(), scan,
151-
scan.getCaching(), false);
152-
stub.scan(controller, request, resp -> {
153-
if (controller.failed()) {
154-
future.completeExceptionally(controller.getFailed());
155-
return;
156-
}
157-
future.complete(new OpenScannerResponse(loc, isRegionServerRemote, stub, controller, resp));
158-
});
159-
} catch (IOException e) {
160-
future.completeExceptionally(e);
161-
}
162-
return future;
163191
}
164192

165193
private void startScan(OpenScannerResponse resp) {
@@ -173,26 +201,40 @@ private void startScan(OpenScannerResponse resp) {
173201
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
174202
.startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp),
175203
(hasMore, error) -> {
176-
if (error != null) {
177-
consumer.onError(error);
178-
return;
179-
}
180-
if (hasMore) {
181-
openScanner();
182-
} else {
183-
consumer.onComplete();
204+
try (Scope ignored = span.makeCurrent()) {
205+
if (error != null) {
206+
try {
207+
consumer.onError(error);
208+
return;
209+
} finally {
210+
TraceUtil.setError(span, error);
211+
span.end();
212+
}
213+
}
214+
if (hasMore) {
215+
openScanner();
216+
} else {
217+
try {
218+
consumer.onComplete();
219+
} finally {
220+
span.setStatus(StatusCode.OK);
221+
span.end();
222+
}
223+
}
184224
}
185225
});
186226
}
187227

188228
private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) {
189-
return conn.callerFactory.<OpenScannerResponse> single().table(tableName)
190-
.row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan))
191-
.priority(scan.getPriority())
192-
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
193-
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
194-
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
195-
.startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner).call();
229+
try (Scope ignored = span.makeCurrent()) {
230+
return conn.callerFactory.<OpenScannerResponse> single().table(tableName)
231+
.row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan))
232+
.priority(scan.getPriority())
233+
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
234+
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
235+
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
236+
.startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner).call();
237+
}
196238
}
197239

198240
private long getPrimaryTimeoutNs() {
@@ -206,15 +248,24 @@ private void openScanner() {
206248
addListener(timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(),
207249
getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer,
208250
conn.getConnectionMetrics()), (resp, error) -> {
209-
if (error != null) {
210-
consumer.onError(error);
211-
return;
251+
try (Scope ignored = span.makeCurrent()) {
252+
if (error != null) {
253+
try {
254+
consumer.onError(error);
255+
return;
256+
} finally {
257+
TraceUtil.setError(span, error);
258+
span.end();
259+
}
260+
}
261+
startScan(resp);
212262
}
213-
startScan(resp);
214263
});
215264
}
216265

217266
public void start() {
218-
openScanner();
267+
try (Scope ignored = span.makeCurrent()) {
268+
openScanner();
269+
}
219270
}
220271
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@
2727
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
2828
import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics;
2929
import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics;
30-
30+
import io.opentelemetry.context.Context;
31+
import io.opentelemetry.context.Scope;
3132
import java.io.IOException;
3233
import java.util.ArrayList;
3334
import java.util.List;
@@ -50,11 +51,9 @@
5051
import org.apache.yetus.audience.InterfaceAudience;
5152
import org.slf4j.Logger;
5253
import org.slf4j.LoggerFactory;
53-
5454
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
5555
import org.apache.hbase.thirdparty.io.netty.util.Timeout;
5656
import org.apache.hbase.thirdparty.io.netty.util.Timer;
57-
5857
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
5958
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
6059
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
@@ -573,7 +572,12 @@ private void call() {
573572
resetController(controller, callTimeoutNs, priority);
574573
ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false,
575574
nextCallSeq, scan.isScanMetricsEnabled(), false, scan.getLimit());
576-
stub.scan(controller, req, resp -> onComplete(controller, resp));
575+
final Context context = Context.current();
576+
stub.scan(controller, req, resp -> {
577+
try (Scope ignored = context.makeCurrent()) {
578+
onComplete(controller, resp);
579+
}
580+
});
577581
}
578582

579583
private void next() {

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
package org.apache.hadoop.hbase.client;
1919

2020
import static java.util.stream.Collectors.toList;
21+
import io.opentelemetry.api.trace.Span;
2122
import io.opentelemetry.context.Context;
23+
import io.opentelemetry.context.Scope;
2224
import java.io.IOException;
2325
import java.util.List;
2426
import java.util.concurrent.CompletableFuture;
@@ -231,22 +233,29 @@ public ResultScanner getScanner(Scan scan) {
231233
}
232234

233235
private void scan0(Scan scan, ScanResultConsumer consumer) {
234-
try (ResultScanner scanner = getScanner(scan)) {
235-
consumer.onScanMetricsCreated(scanner.getScanMetrics());
236-
for (Result result; (result = scanner.next()) != null;) {
237-
if (!consumer.onNext(result)) {
238-
break;
236+
Span span = null;
237+
try (AsyncTableResultScanner scanner = rawTable.getScanner(scan)) {
238+
span = scanner.getSpan();
239+
try (Scope ignored = span.makeCurrent()) {
240+
consumer.onScanMetricsCreated(scanner.getScanMetrics());
241+
for (Result result; (result = scanner.next()) != null; ) {
242+
if (!consumer.onNext(result)) {
243+
break;
244+
}
239245
}
246+
consumer.onComplete();
240247
}
241-
consumer.onComplete();
242248
} catch (IOException e) {
243-
consumer.onError(e);
249+
try (Scope ignored = span.makeCurrent()) {
250+
consumer.onError(e);
251+
}
244252
}
245253
}
246254

247255
@Override
248256
public void scan(Scan scan, ScanResultConsumer consumer) {
249-
pool.execute(() -> scan0(scan, consumer));
257+
final Context context = Context.current();
258+
pool.execute(context.wrap(() -> scan0(scan, consumer)));
250259
}
251260

252261
@Override

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.hadoop.hbase.client;
1919

2020
import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
21-
21+
import io.opentelemetry.api.trace.Span;
2222
import java.io.IOException;
2323
import java.io.InterruptedIOException;
2424
import java.util.ArrayDeque;
@@ -58,6 +58,9 @@ class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsum
5858

5959
private ScanResumer resumer;
6060

61+
// Used to pass the span instance to the `AsyncTableImpl` from its underlying `rawAsyncTable`.
62+
private Span span = null;
63+
6164
public AsyncTableResultScanner(TableName tableName, Scan scan, long maxCacheSize) {
6265
this.tableName = tableName;
6366
this.maxCacheSize = maxCacheSize;
@@ -79,6 +82,14 @@ private void stopPrefetch(ScanController controller) {
7982
resumer = controller.suspend();
8083
}
8184

85+
Span getSpan() {
86+
return span;
87+
}
88+
89+
void setSpan(final Span span) {
90+
this.span = span;
91+
}
92+
8293
@Override
8394
public synchronized void onNext(Result[] results, ScanController controller) {
8495
assert results.length > 0;

hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -640,30 +640,26 @@ public AsyncTableResultScanner getScanner(Scan scan) {
640640

641641
@Override
642642
public CompletableFuture<List<Result>> scanAll(Scan scan) {
643-
final Supplier<Span> supplier = newTableOperationSpanBuilder()
644-
.setOperation(scan);
645-
return tracedFuture(() -> {
646-
CompletableFuture<List<Result>> future = new CompletableFuture<>();
647-
List<Result> scanResults = new ArrayList<>();
648-
scan(scan, new AdvancedScanResultConsumer() {
643+
CompletableFuture<List<Result>> future = new CompletableFuture<>();
644+
List<Result> scanResults = new ArrayList<>();
645+
scan(scan, new AdvancedScanResultConsumer() {
649646

650-
@Override
651-
public void onNext(Result[] results, ScanController controller) {
652-
scanResults.addAll(Arrays.asList(results));
653-
}
647+
@Override
648+
public void onNext(Result[] results, ScanController controller) {
649+
scanResults.addAll(Arrays.asList(results));
650+
}
654651

655-
@Override
656-
public void onError(Throwable error) {
657-
future.completeExceptionally(error);
658-
}
652+
@Override
653+
public void onError(Throwable error) {
654+
future.completeExceptionally(error);
655+
}
659656

660-
@Override
661-
public void onComplete() {
662-
future.complete(scanResults);
663-
}
664-
});
665-
return future;
666-
}, supplier);
657+
@Override
658+
public void onComplete() {
659+
future.complete(scanResults);
660+
}
661+
});
662+
return future;
667663
}
668664

669665
@Override

0 commit comments

Comments
 (0)