Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 43 additions & 61 deletions hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -433,33 +433,27 @@ public Result[] get(List<Get> gets) throws IOException {
@Override
public void batch(final List<? extends Row> actions, final Object[] results)
throws InterruptedException, IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
.setContainerOperations(actions);
TraceUtil.traceWithIOException(() -> {
int rpcTimeout = writeRpcTimeoutMs;
boolean hasRead = false;
boolean hasWrite = false;
for (Row action : actions) {
if (action instanceof Mutation) {
hasWrite = true;
} else {
hasRead = true;
}
if (hasRead && hasWrite) {
break;
}
}
if (hasRead && !hasWrite) {
rpcTimeout = readRpcTimeoutMs;
int rpcTimeout = writeRpcTimeoutMs;
boolean hasRead = false;
boolean hasWrite = false;
for (Row action : actions) {
if (action instanceof Mutation) {
hasWrite = true;
} else {
hasRead = true;
}
try {
batch(actions, results, rpcTimeout);
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
if (hasRead && hasWrite) {
break;
}
}, supplier);
}
if (hasRead && !hasWrite) {
rpcTimeout = readRpcTimeoutMs;
}
try {
batch(actions, results, rpcTimeout);
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
}
}

public void batch(final List<? extends Row> actions, final Object[] results, int rpcTimeout)
Expand Down Expand Up @@ -555,29 +549,23 @@ protected Void rpcCall() throws Exception {

@Override
public void delete(final List<Delete> deletes) throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
.setContainerOperations(deletes);
TraceUtil.traceWithIOException(() -> {
Object[] results = new Object[deletes.size()];
try {
batch(deletes, results, writeRpcTimeoutMs);
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
} finally {
// TODO: to be consistent with batch put(), do not modify input list
// mutate list so that it is empty for complete success, or contains only failed records
// results are returned in the same order as the requests in list walk the list backwards,
// so we can remove from list without impacting the indexes of earlier members
for (int i = results.length - 1; i >= 0; i--) {
// if result is not null, it succeeded
if (results[i] instanceof Result) {
deletes.remove(i);
}
Object[] results = new Object[deletes.size()];
try {
batch(deletes, results, writeRpcTimeoutMs);
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
} finally {
// TODO: to be consistent with batch put(), do not modify input list
// mutate list so that it is empty for complete success, or contains only failed records
// results are returned in the same order as the requests in list walk the list backwards,
// so we can remove from list without impacting the indexes of earlier members
for (int i = results.length - 1; i >= 0; i--) {
// if result is not null, it succeeded
if (results[i] instanceof Result) {
deletes.remove(i);
}
}
}, supplier);
}
}

@Override
Expand Down Expand Up @@ -605,21 +593,15 @@ protected Void rpcCall() throws Exception {

@Override
public void put(final List<Put> puts) throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
.setContainerOperations(puts);
TraceUtil.traceWithIOException(() -> {
for (Put put : puts) {
validatePut(put);
}
Object[] results = new Object[puts.size()];
try {
batch(puts, results, writeRpcTimeoutMs);
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
}
}, supplier);
for (Put put : puts) {
validatePut(put);
}
Object[] results = new Object[puts.size()];
try {
batch(puts, results, writeRpcTimeoutMs);
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
Expand All @@ -41,9 +42,11 @@
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderFactory;
Expand Down Expand Up @@ -253,11 +256,12 @@ private void assertTrace(String tableOperation, Matcher<SpanData> matcher) {
Waiter.waitFor(CONF, 1000, new MatcherPredicate<>(
"waiting for span to emit",
() -> traceRule.getSpans(), hasItem(spanLocator)));
SpanData data = traceRule.getSpans()
List<SpanData> candidateSpans = traceRule.getSpans()
.stream()
.filter(spanLocator::matches)
.findFirst()
.orElseThrow(AssertionError::new);
.collect(Collectors.toList());
assertThat(candidateSpans, hasSize(1));
SpanData data = candidateSpans.iterator().next();
assertThat(data, allOf(
hasName(expectedName),
hasKind(SpanKind.CLIENT),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
Expand All @@ -42,8 +43,10 @@
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
Expand Down Expand Up @@ -251,11 +254,12 @@ private void assertTrace(String tableOperation, Matcher<SpanData> matcher) {
Waiter.waitFor(conf, 1000, new MatcherPredicate<>(
"waiting for span to emit",
() -> TRACE_RULE.getSpans(), hasItem(spanLocator)));
SpanData data = TRACE_RULE.getSpans()
List<SpanData> candidateSpans = TRACE_RULE.getSpans()
.stream()
.filter(spanLocator::matches)
.findFirst()
.orElseThrow(AssertionError::new);
.collect(Collectors.toList());
assertThat(candidateSpans, hasSize(1));
SpanData data = candidateSpans.iterator().next();
assertThat(data, allOf(
hasName(expectedName),
hasKind(SpanKind.CLIENT),
Expand Down