1- /**
1+ /*
22 * Licensed to the Apache Software Foundation (ASF) under one
33 * or more contributor license agreements. See the NOTICE file
44 * distributed with this work for additional information
2727import static org .apache .hadoop .hbase .trace .TraceUtil .tracedFutures ;
2828import static org .apache .hadoop .hbase .util .FutureUtils .addListener ;
2929
30+ import io .opentelemetry .api .trace .Span ;
3031import java .io .IOException ;
3132import java .util .ArrayList ;
3233import java .util .Arrays ;
3637import java .util .concurrent .atomic .AtomicBoolean ;
3738import java .util .concurrent .atomic .AtomicInteger ;
3839import java .util .function .Function ;
40+ import java .util .function .Supplier ;
3941import org .apache .hadoop .conf .Configuration ;
4042import org .apache .hadoop .hbase .CompareOperator ;
4143import org .apache .hadoop .hbase .DoNotRetryIOException ;
4446import org .apache .hadoop .hbase .TableName ;
4547import org .apache .hadoop .hbase .client .AsyncRpcRetryingCallerFactory .SingleRequestCallerBuilder ;
4648import org .apache .hadoop .hbase .client .ConnectionUtils .Converter ;
49+ import org .apache .hadoop .hbase .client .trace .TableOperationSpanBuilder ;
4750import org .apache .hadoop .hbase .filter .Filter ;
4851import org .apache .hadoop .hbase .io .TimeRange ;
4952import org .apache .hadoop .hbase .ipc .HBaseRpcController ;
53+ import org .apache .hadoop .hbase .trace .HBaseSemanticAttributes ;
5054import org .apache .hadoop .hbase .util .Bytes ;
5155import org .apache .hadoop .hbase .util .ReflectionUtils ;
5256import org .apache .yetus .audience .InterfaceAudience ;
@@ -220,35 +224,47 @@ private CompletableFuture<Result> get(Get get, int replicaId) {
220224
221225 @ Override
222226 public CompletableFuture <Result > get (Get get ) {
227+ final Supplier <Span > supplier = new TableOperationSpanBuilder ()
228+ .setTableName (tableName )
229+ .setOperation (get );
223230 return tracedFuture (
224231 () -> timelineConsistentRead (conn .getLocator (), tableName , get , get .getRow (),
225232 RegionLocateType .CURRENT , replicaId -> get (get , replicaId ), readRpcTimeoutNs ,
226233 conn .connConf .getPrimaryCallTimeoutNs (), retryTimer , conn .getConnectionMetrics ()),
227- "AsyncTable.get" , tableName );
234+ supplier );
228235 }
229236
230237 @ Override
231238 public CompletableFuture <Void > put (Put put ) {
232239 validatePut (put , conn .connConf .getMaxKeyValueSize ());
240+ final Supplier <Span > supplier = new TableOperationSpanBuilder ()
241+ .setTableName (tableName )
242+ .setOperation (put );
233243 return tracedFuture (() -> this .<Void , Put > newCaller (put , writeRpcTimeoutNs )
234244 .action ((controller , loc , stub ) -> RawAsyncTableImpl .<Put > voidMutate (controller , loc , stub ,
235245 put , RequestConverter ::buildMutateRequest ))
236- .call (), "AsyncTable.put" , tableName );
246+ .call (), supplier );
237247 }
238248
239249 @ Override
240250 public CompletableFuture <Void > delete (Delete delete ) {
251+ final Supplier <Span > supplier = new TableOperationSpanBuilder ()
252+ .setTableName (tableName )
253+ .setOperation (delete );
241254 return tracedFuture (
242255 () -> this .<Void , Delete > newCaller (delete , writeRpcTimeoutNs )
243256 .action ((controller , loc , stub ) -> RawAsyncTableImpl .<Delete > voidMutate (controller , loc ,
244257 stub , delete , RequestConverter ::buildMutateRequest ))
245258 .call (),
246- "AsyncTable.delete" , tableName );
259+ supplier );
247260 }
248261
249262 @ Override
250263 public CompletableFuture <Result > append (Append append ) {
251264 checkHasFamilies (append );
265+ final Supplier <Span > supplier = new TableOperationSpanBuilder ()
266+ .setTableName (tableName )
267+ .setOperation (append );
252268 return tracedFuture (() -> {
253269 long nonceGroup = conn .getNonceGenerator ().getNonceGroup ();
254270 long nonce = conn .getNonceGenerator ().newNonce ();
@@ -257,12 +273,15 @@ public CompletableFuture<Result> append(Append append) {
257273 controller , loc , stub , append , RequestConverter ::buildMutateRequest ,
258274 RawAsyncTableImpl ::toResult ))
259275 .call ();
260- }, "AsyncTable.append" , tableName );
276+ }, supplier );
261277 }
262278
263279 @ Override
264280 public CompletableFuture <Result > increment (Increment increment ) {
265281 checkHasFamilies (increment );
282+ final Supplier <Span > supplier = new TableOperationSpanBuilder ()
283+ .setTableName (tableName )
284+ .setOperation (increment );
266285 return tracedFuture (() -> {
267286 long nonceGroup = conn .getNonceGenerator ().getNonceGroup ();
268287 long nonce = conn .getNonceGenerator ().newNonce ();
@@ -271,7 +290,7 @@ public CompletableFuture<Result> increment(Increment increment) {
271290 controller , loc , stub , increment , RequestConverter ::buildMutateRequest ,
272291 RawAsyncTableImpl ::toResult ))
273292 .call ();
274- }, "AsyncTable.increment" , tableName );
293+ }, supplier );
275294 }
276295
277296 private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
@@ -329,43 +348,52 @@ private void preCheck() {
329348 public CompletableFuture <Boolean > thenPut (Put put ) {
330349 validatePut (put , conn .connConf .getMaxKeyValueSize ());
331350 preCheck ();
351+ final Supplier <Span > supplier = new TableOperationSpanBuilder ()
352+ .setTableName (tableName )
353+ .setOperation (HBaseSemanticAttributes .Operation .CHECK_AND_MUTATE );
332354 return tracedFuture (
333355 () -> RawAsyncTableImpl .this .<Boolean > newCaller (row , put .getPriority (), rpcTimeoutNs )
334356 .action ((controller , loc , stub ) -> RawAsyncTableImpl .mutate (controller , loc , stub , put ,
335357 (rn , p ) -> RequestConverter .buildMutateRequest (rn , row , family , qualifier , op , value ,
336358 null , timeRange , p , HConstants .NO_NONCE , HConstants .NO_NONCE ),
337359 (c , r ) -> r .getProcessed ()))
338360 .call (),
339- "AsyncTable.CheckAndMutateBuilder.thenPut" , tableName );
361+ supplier );
340362 }
341363
342364 @ Override
343365 public CompletableFuture <Boolean > thenDelete (Delete delete ) {
344366 preCheck ();
367+ final Supplier <Span > supplier = new TableOperationSpanBuilder ()
368+ .setTableName (tableName )
369+ .setOperation (HBaseSemanticAttributes .Operation .CHECK_AND_MUTATE );
345370 return tracedFuture (
346371 () -> RawAsyncTableImpl .this .<Boolean > newCaller (row , delete .getPriority (), rpcTimeoutNs )
347372 .action ((controller , loc , stub ) -> RawAsyncTableImpl .mutate (controller , loc , stub , delete ,
348373 (rn , d ) -> RequestConverter .buildMutateRequest (rn , row , family , qualifier , op , value ,
349374 null , timeRange , d , HConstants .NO_NONCE , HConstants .NO_NONCE ),
350375 (c , r ) -> r .getProcessed ()))
351376 .call (),
352- "AsyncTable.CheckAndMutateBuilder.thenDelete" , tableName );
377+ supplier );
353378 }
354379
355380 @ Override
356- public CompletableFuture <Boolean > thenMutate (RowMutations mutation ) {
381+ public CompletableFuture <Boolean > thenMutate (RowMutations mutations ) {
357382 preCheck ();
358- validatePutsInRowMutations (mutation , conn .connConf .getMaxKeyValueSize ());
383+ validatePutsInRowMutations (mutations , conn .connConf .getMaxKeyValueSize ());
384+ final Supplier <Span > supplier = new TableOperationSpanBuilder ()
385+ .setTableName (tableName )
386+ .setOperation (HBaseSemanticAttributes .Operation .BATCH );
359387 return tracedFuture (
360388 () -> RawAsyncTableImpl .this
361- .<Boolean > newCaller (row , mutation .getMaxPriority (), rpcTimeoutNs )
389+ .<Boolean > newCaller (row , mutations .getMaxPriority (), rpcTimeoutNs )
362390 .action ((controller , loc , stub ) -> RawAsyncTableImpl .this .mutateRow (controller , loc , stub ,
363- mutation ,
391+ mutations ,
364392 (rn , rm ) -> RequestConverter .buildMultiRequest (rn , row , family , qualifier , op , value ,
365393 null , timeRange , rm , HConstants .NO_NONCE , HConstants .NO_NONCE ),
366394 CheckAndMutateResult ::isSuccess ))
367395 .call (),
368- "AsyncTable.CheckAndMutateBuilder.thenMutate" , tableName );
396+ supplier );
369397 }
370398 }
371399
@@ -397,6 +425,9 @@ public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
397425 @ Override
398426 public CompletableFuture <Boolean > thenPut (Put put ) {
399427 validatePut (put , conn .connConf .getMaxKeyValueSize ());
428+ final Supplier <Span > supplier = new TableOperationSpanBuilder ()
429+ .setTableName (tableName )
430+ .setOperation (HBaseSemanticAttributes .Operation .CHECK_AND_MUTATE );
400431 return tracedFuture (
401432 () -> RawAsyncTableImpl .this .<Boolean > newCaller (row , put .getPriority (), rpcTimeoutNs )
402433 .action ((controller , loc , stub ) -> RawAsyncTableImpl .mutate (controller , loc ,
@@ -405,34 +436,40 @@ public CompletableFuture<Boolean> thenPut(Put put) {
405436 filter , timeRange , p , HConstants .NO_NONCE , HConstants .NO_NONCE ),
406437 (c , r ) -> r .getProcessed ()))
407438 .call (),
408- "AsyncTable.CheckAndMutateWithFilterBuilder.thenPut" , tableName );
439+ supplier );
409440 }
410441
411442 @ Override
412443 public CompletableFuture <Boolean > thenDelete (Delete delete ) {
444+ final Supplier <Span > supplier = new TableOperationSpanBuilder ()
445+ .setTableName (tableName )
446+ .setOperation (HBaseSemanticAttributes .Operation .CHECK_AND_MUTATE );
413447 return tracedFuture (
414448 () -> RawAsyncTableImpl .this .<Boolean > newCaller (row , delete .getPriority (), rpcTimeoutNs )
415449 .action ((controller , loc , stub ) -> RawAsyncTableImpl .mutate (controller , loc , stub , delete ,
416450 (rn , d ) -> RequestConverter .buildMutateRequest (rn , row , null , null , null , null , filter ,
417451 timeRange , d , HConstants .NO_NONCE , HConstants .NO_NONCE ),
418452 (c , r ) -> r .getProcessed ()))
419453 .call (),
420- "AsyncTable.CheckAndMutateWithFilterBuilder.thenDelete" , tableName );
454+ supplier );
421455 }
422456
423457 @ Override
424- public CompletableFuture <Boolean > thenMutate (RowMutations mutation ) {
425- validatePutsInRowMutations (mutation , conn .connConf .getMaxKeyValueSize ());
458+ public CompletableFuture <Boolean > thenMutate (RowMutations mutations ) {
459+ validatePutsInRowMutations (mutations , conn .connConf .getMaxKeyValueSize ());
460+ final Supplier <Span > supplier = new TableOperationSpanBuilder ()
461+ .setTableName (tableName )
462+ .setOperation (HBaseSemanticAttributes .Operation .BATCH );
426463 return tracedFuture (
427464 () -> RawAsyncTableImpl .this
428- .<Boolean > newCaller (row , mutation .getMaxPriority (), rpcTimeoutNs )
465+ .<Boolean > newCaller (row , mutations .getMaxPriority (), rpcTimeoutNs )
429466 .action ((controller , loc , stub ) -> RawAsyncTableImpl .this .mutateRow (controller , loc , stub ,
430- mutation ,
467+ mutations ,
431468 (rn , rm ) -> RequestConverter .buildMultiRequest (rn , row , null , null , null , null , filter ,
432469 timeRange , rm , HConstants .NO_NONCE , HConstants .NO_NONCE ),
433470 CheckAndMutateResult ::isSuccess ))
434471 .call (),
435- "AsyncTable.CheckAndMutateWithFilterBuilder.thenMutate" , tableName );
472+ supplier );
436473 }
437474 }
438475
@@ -443,6 +480,9 @@ public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter)
443480
444481 @ Override
445482 public CompletableFuture <CheckAndMutateResult > checkAndMutate (CheckAndMutate checkAndMutate ) {
483+ final Supplier <Span > supplier = new TableOperationSpanBuilder ()
484+ .setTableName (tableName )
485+ .setOperation (checkAndMutate );
446486 return tracedFuture (() -> {
447487 if (checkAndMutate .getAction () instanceof Put ||
448488 checkAndMutate .getAction () instanceof Delete ||
@@ -488,16 +528,19 @@ public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate che
488528 "CheckAndMutate doesn't support " + checkAndMutate .getAction ().getClass ().getName ()));
489529 return future ;
490530 }
491- }, "AsyncTable.checkAndMutate" , tableName );
531+ }, supplier );
492532 }
493533
494534 @ Override
495535 public List <CompletableFuture <CheckAndMutateResult >>
496536 checkAndMutate (List <CheckAndMutate > checkAndMutates ) {
537+ final Supplier <Span > supplier = new TableOperationSpanBuilder ()
538+ .setTableName (tableName )
539+ .setOperation (HBaseSemanticAttributes .Operation .BATCH );
497540 return tracedFutures (
498541 () -> batch (checkAndMutates , rpcTimeoutNs ).stream ()
499542 .map (f -> f .thenApply (r -> (CheckAndMutateResult ) r )).collect (toList ()),
500- "AsyncTable.checkAndMutateList" , tableName );
543+ supplier );
501544 }
502545
503546 // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse,
@@ -548,14 +591,17 @@ public CompletableFuture<Result> mutateRow(RowMutations mutations) {
548591 validatePutsInRowMutations (mutations , conn .connConf .getMaxKeyValueSize ());
549592 long nonceGroup = conn .getNonceGenerator ().getNonceGroup ();
550593 long nonce = conn .getNonceGenerator ().newNonce ();
594+ final Supplier <Span > supplier = new TableOperationSpanBuilder ()
595+ .setTableName (tableName )
596+ .setOperation (HBaseSemanticAttributes .Operation .BATCH );
551597 return tracedFuture (
552598 () -> this
553599 .<Result > newCaller (mutations .getRow (), mutations .getMaxPriority (), writeRpcTimeoutNs )
554600 .action ((controller , loc , stub ) -> this .<Result , Result > mutateRow (controller , loc , stub ,
555601 mutations , (rn , rm ) -> RequestConverter .buildMultiRequest (rn , rm , nonceGroup , nonce ),
556602 resp -> resp ))
557603 .call (),
558- "AsyncTable.mutateRow" , tableName );
604+ supplier );
559605 }
560606
561607 private Scan setDefaultScanConfig (Scan scan ) {
@@ -591,6 +637,9 @@ public ResultScanner getScanner(Scan scan) {
591637
592638 @ Override
593639 public CompletableFuture <List <Result >> scanAll (Scan scan ) {
640+ final Supplier <Span > supplier = new TableOperationSpanBuilder ()
641+ .setTableName (tableName )
642+ .setOperation (scan );
594643 return tracedFuture (() -> {
595644 CompletableFuture <List <Result >> future = new CompletableFuture <>();
596645 List <Result > scanResults = new ArrayList <>();
@@ -612,27 +661,39 @@ public void onComplete() {
612661 }
613662 });
614663 return future ;
615- }, "AsyncTable.scanAll" , tableName );
664+ }, supplier );
616665 }
617666
618667 @ Override
619668 public List <CompletableFuture <Result >> get (List <Get > gets ) {
620- return tracedFutures (() -> batch (gets , readRpcTimeoutNs ), "AsyncTable.getList" , tableName );
669+ final Supplier <Span > supplier = new TableOperationSpanBuilder ()
670+ .setTableName (tableName )
671+ .setOperation (HBaseSemanticAttributes .Operation .BATCH );
672+ return tracedFutures (() -> batch (gets , readRpcTimeoutNs ), supplier );
621673 }
622674
623675 @ Override
624676 public List <CompletableFuture <Void >> put (List <Put > puts ) {
625- return tracedFutures (() -> voidMutate (puts ), "AsyncTable.putList" , tableName );
677+ final Supplier <Span > supplier = new TableOperationSpanBuilder ()
678+ .setTableName (tableName )
679+ .setOperation (HBaseSemanticAttributes .Operation .BATCH );
680+ return tracedFutures (() -> voidMutate (puts ), supplier );
626681 }
627682
628683 @ Override
629684 public List <CompletableFuture <Void >> delete (List <Delete > deletes ) {
630- return tracedFutures (() -> voidMutate (deletes ), "AsyncTable.deleteList" , tableName );
685+ final Supplier <Span > supplier = new TableOperationSpanBuilder ()
686+ .setTableName (tableName )
687+ .setOperation (HBaseSemanticAttributes .Operation .BATCH );
688+ return tracedFutures (() -> voidMutate (deletes ), supplier );
631689 }
632690
633691 @ Override
634692 public <T > List <CompletableFuture <T >> batch (List <? extends Row > actions ) {
635- return tracedFutures (() -> batch (actions , rpcTimeoutNs ), "AsyncTable.batch" , tableName );
693+ final Supplier <Span > supplier = new TableOperationSpanBuilder ()
694+ .setTableName (tableName )
695+ .setOperation (HBaseSemanticAttributes .Operation .BATCH );
696+ return tracedFutures (() -> batch (actions , rpcTimeoutNs ), supplier );
636697 }
637698
638699 private List <CompletableFuture <Void >> voidMutate (List <? extends Row > actions ) {
0 commit comments