diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/CheckAndMutate.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/CheckAndMutate.java new file mode 100644 index 000000000000..b3272b5653ac --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/CheckAndMutate.java @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.util.Collections; +import java.util.List; +import java.util.NavigableMap; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.OperationWithAttributes; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.io.TimeRange; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; + +/** + * Used to perform CheckAndMutate operations. + * + * Use the builder classes to instantiate a CheckAndMutate object. + * This builder classes are fluent style APIs, the code are like: + * + *
+ * 
+ * CheckAndMutate checkAndMutate = CheckAndMutate.builder(row, family).qualifier(qualifier)
+ *   .ifNotExists().build(put);
+ * 
+ * 
+ * CheckAndMutate checkAndMutate = CheckAndMutate.builder(row, filter).build(put);
+ * 
+ * 
+ */ +@InterfaceAudience.Public +public final class CheckAndMutate extends Mutation { + + /** + * A builder class for building a CheckAndMutate object. + */ + public static final class Builder { + private final byte[] row; + private final byte[] family; + private byte[] qualifier; + private CompareOperator op; + private byte[] value; + private TimeRange timeRange; + + private Builder(byte[] row, byte[] family) { + this.row = Preconditions.checkNotNull(row, "row is null"); + this.family = Preconditions.checkNotNull(family, "family is null"); + } + + /** + * @param qualifier column qualifier to check + * @return the builder object + */ + public Builder qualifier(byte[] qualifier) { + this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" + + " an empty byte array, or just do not call this method if you want a null qualifier"); + return this; + } + + /** + * @param timeRange time range to check + * @return the builder object + */ + public Builder timeRange(TimeRange timeRange) { + this.timeRange = timeRange; + return this; + } + + /** + * Check for lack of column + * @return the builder object + */ + public Builder ifNotExists() { + this.op = CompareOperator.EQUAL; + this.value = null; + return this; + } + + /** + * Check for equality + * @param value the expected value + * @return the builder object + */ + public Builder ifEquals(byte[] value) { + return ifMatches(CompareOperator.EQUAL, value); + } + + /** + * @param compareOp comparison operator to use + * @param value the expected value + * @return the builder object + */ + public Builder ifMatches(CompareOperator compareOp, byte[] value) { + this.op = Preconditions.checkNotNull(compareOp, "compareOp is null"); + this.value = Preconditions.checkNotNull(value, "value is null"); + return this; + } + + private void preCheck() { + Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" + + " calling ifNotExists/ifEquals/ifMatches before executing the request"); + } + + /** + * @param put data to put if check succeeds + * @return a CheckAndMutate object + */ + public CheckAndMutate build(Put put) { + preCheck(); + return new CheckAndMutate(row, family, qualifier, op, value, timeRange, put); + } + + /** + * @param delete data to delete if check succeeds + * @return a CheckAndMutate object + */ + public CheckAndMutate build(Delete delete) { + preCheck(); + return new CheckAndMutate(row, family, qualifier, op, value, timeRange, delete); + } + + /** + * @param mutation mutations to perform if check succeeds + * @return a CheckAndMutate object + */ + public CheckAndMutate build(RowMutations mutation) { + preCheck(); + return new CheckAndMutate(row, family, qualifier, op, value, timeRange, mutation); + } + } + + /** + * A builder class for building a CheckAndMutate object with a filter. + */ + public static final class WithFilterBuilder { + private final byte[] row; + private final Filter filter; + private TimeRange timeRange; + + private WithFilterBuilder(byte[] row, Filter filter) { + this.row = Preconditions.checkNotNull(row, "row is null"); + this.filter = Preconditions.checkNotNull(filter, "filter is null"); + } + + /** + * @param timeRange timeRange to check + * @return the builder object + */ + public WithFilterBuilder timeRange(TimeRange timeRange) { + this.timeRange = timeRange; + return this; + } + + /** + * @param put data to put if check succeeds + * @return a CheckAndMutate object + */ + public CheckAndMutate build(Put put) { + return new CheckAndMutate(row, filter, timeRange, put); + } + + /** + * @param delete data to delete if check succeeds + * @return a CheckAndMutate object + */ + public CheckAndMutate build(Delete delete) { + return new CheckAndMutate(row, filter, timeRange, delete); + } + + /** + * @param mutation mutations to perform if check succeeds + * @return a CheckAndMutate object + */ + public CheckAndMutate build(RowMutations mutation) { + return new CheckAndMutate(row, filter, timeRange, mutation); + } + } + + /** + * returns a builder object to build a CheckAndMutate object + * + * @param row row + * @param family column family to check + * @return a builder object + */ + public static Builder builder(byte[] row, byte[] family) { + return new Builder(row, family); + } + + /** + * returns a builder object to build a CheckAndMutate object with a filter + * + * @param row row + * @param filter filter to check + * @return a builder object + */ + public static WithFilterBuilder builder(byte[] row, Filter filter) { + return new WithFilterBuilder(row, filter); + } + + private final byte[] family; + private final byte[] qualifier; + private final CompareOperator op; + private final byte[] value; + private final Filter filter; + private final TimeRange timeRange; + private final Row action; + + private CheckAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, + byte[] value, TimeRange timeRange, Row action) { + super(row, HConstants.LATEST_TIMESTAMP, Collections.emptyNavigableMap()); + + this.family = family; + this.qualifier = qualifier; + this.timeRange = timeRange; + this.op = op; + this.value = value; + this.filter = null; + this.action = action; + } + + private CheckAndMutate(byte[] row, Filter filter, TimeRange timeRange, Row action) { + super(row, HConstants.LATEST_TIMESTAMP, Collections.emptyNavigableMap()); + + this.family = null; + this.qualifier = null; + this.timeRange = timeRange; + this.op = null; + this.value = null; + this.filter = filter; + this.action = action; + } + + /** + * @return the family to check + */ + public byte[] getFamily() { + return family; + } + + /** + * @return the qualifier to check + */ + public byte[] getQualifier() { + return qualifier; + } + + /** + * @return the comparison operator + */ + public CompareOperator getCompareOp() { + return op; + } + + /** + * @return the expected value + */ + public byte[] getValue() { + return value; + } + + /** + * @return the filter to check + */ + public Filter getFilter() { + return filter; + } + + /** + * @return the timeRange to check + */ + public TimeRange getTimeRange() { + return timeRange; + } + + /** + * @return the action done if check succeeds + */ + public Row getAction() { + return action; + } + + @Override + public NavigableMap> getFamilyCellMap() { + if (action instanceof Mutation) { + return ((Mutation) action).getFamilyCellMap(); + } else { + throw new UnsupportedOperationException(); + } + } + + @Override + public CellBuilder getCellBuilder(CellBuilderType cellBuilderType) { + if (action instanceof Mutation) { + return ((Mutation) action).getCellBuilder(); + } else { + throw new UnsupportedOperationException(); + } + } + + @Override + public long getTimestamp() { + if (action instanceof Mutation) { + return ((Mutation) action).getTimestamp(); + } else { + throw new UnsupportedOperationException(); + } + } + + @Override + public Mutation setTimestamp(long timestamp) { + if (action instanceof Mutation) { + return ((Mutation) action).setTimestamp(timestamp); + } else { + throw new UnsupportedOperationException(); + } + } + + @Override + public Durability getDurability() { + if (action instanceof Mutation) { + return ((Mutation) action).getDurability(); + } else { + throw new UnsupportedOperationException(); + } + } + + @Override + public Mutation setDurability(Durability d) { + if (action instanceof Mutation) { + return ((Mutation) action).setDurability(d); + } else { + throw new UnsupportedOperationException(); + } + } + + @Override + public byte[] getAttribute(String name) { + if (action instanceof Mutation) { + return ((Mutation) action).getAttribute(name); + } else { + throw new UnsupportedOperationException(); + } + } + + @Override + public OperationWithAttributes setAttribute(String name, byte[] value) { + if (action instanceof Mutation) { + return ((Mutation) action).setAttribute(name, value); + } else { + throw new UnsupportedOperationException(); + } + } + + @Override + public int getPriority() { + if (action instanceof Mutation) { + return ((Mutation) action).getPriority(); + } else { + return ((RowMutations) action).getMaxPriority(); + } + } + + @Override + public OperationWithAttributes setPriority(int priority) { + if (action instanceof Mutation) { + return ((Mutation) action).setPriority(priority); + } else { + throw new UnsupportedOperationException(); + } + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java index 464eff54fcbb..7e05b05c8154 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java @@ -256,7 +256,7 @@ private void failAll(Stream actions, int tries) { } private ClientProtos.MultiRequest buildReq(Map actionsByRegion, - List cells, Map rowMutationsIndexMap) throws IOException { + List cells, Map indexMap) throws IOException { ClientProtos.MultiRequest.Builder multiRequestBuilder = ClientProtos.MultiRequest.newBuilder(); ClientProtos.RegionAction.Builder regionActionBuilder = ClientProtos.RegionAction.newBuilder(); ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder(); @@ -264,14 +264,14 @@ private ClientProtos.MultiRequest buildReq(Map actionsByR for (Map.Entry entry : actionsByRegion.entrySet()) { long nonceGroup = conn.getNonceGenerator().getNonceGroup(); // multiRequestBuilder will be populated with region actions. - // rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the + // indexMap will be non-empty after the call if there is RowMutations/CheckAndMutate in the // action list. RequestConverter.buildNoDataRegionActions(entry.getKey(), entry.getValue().actions.stream() .sorted((a1, a2) -> Integer.compare(a1.getOriginalIndex(), a2.getOriginalIndex())) .collect(Collectors.toList()), - cells, multiRequestBuilder, regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup, - rowMutationsIndexMap); + cells, multiRequestBuilder, regionActionBuilder, actionBuilder, mutationBuilder, + nonceGroup, indexMap); } return multiRequestBuilder.build(); } @@ -367,10 +367,10 @@ private void sendToServer(ServerName serverName, ServerRequest serverReq, int tr List cells = new ArrayList<>(); // Map from a created RegionAction to the original index for a RowMutations within // the original list of actions. This will be used to process the results when there - // is RowMutations in the action list. - Map rowMutationsIndexMap = new HashMap<>(); + // is RowMutations/CheckAndMutate in the action list. + Map indexMap = new HashMap<>(); try { - req = buildReq(serverReq.actionsByRegion, cells, rowMutationsIndexMap); + req = buildReq(serverReq.actionsByRegion, cells, indexMap); } catch (IOException e) { onError(serverReq.actionsByRegion, tries, e, serverName); return; @@ -387,7 +387,7 @@ private void sendToServer(ServerName serverName, ServerRequest serverReq, int tr } else { try { onComplete(serverReq.actionsByRegion, tries, serverName, ResponseConverter.getResults(req, - rowMutationsIndexMap, resp, controller.cellScanner())); + indexMap, resp, controller.cellScanner())); } catch (Exception e) { onError(serverReq.actionsByRegion, tries, e, serverName); return; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java index ce1c1dc4240c..81adc795877f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CheckAndMutate; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.filter.Filter; @@ -231,12 +232,20 @@ default CompletableFuture incrementColumnValue(byte[] row, byte[] family, * }); * * + * + * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it + * any more. */ + @Deprecated CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family); /** * A helper class for sending checkAndMutate request. + * + * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it + * any more. */ + @Deprecated interface CheckAndMutateBuilder { /** @@ -309,12 +318,20 @@ default CheckAndMutateBuilder ifEquals(byte[] value) { * }); * * + * + * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it + * any more. */ + @Deprecated CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter); /** * A helper class for sending checkAndMutate request with a filter. + * + * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it + * any more. */ + @Deprecated interface CheckAndMutateWithFilterBuilder { /** @@ -344,6 +361,35 @@ interface CheckAndMutateWithFilterBuilder { CompletableFuture thenMutate(RowMutations mutation); } + /** + * checkAndMutate that atomically checks if a row matches the specified condition. If it does, + * it adds the Put/Delete/RowMutations. + * + * @param checkAndMutate The CheckAndMutate object. + * @return A {@link CompletableFuture}s that represent the result for the CheckAndMutate. + */ + CompletableFuture checkAndMutate(CheckAndMutate checkAndMutate); + + /** + * Batch version of checkAndMutate. + * + * @param checkAndMutates The list of CheckAndMutate. + * @return A list of {@link CompletableFuture}s that represent the result for each + * CheckAndMutate. + */ + List> checkAndMutate(List checkAndMutates); + + /** + * A simple version of batch checkAndMutate. It will fail if there are any failures. + * + * @param checkAndMutates The list of rows to apply. + * @return A {@link CompletableFuture} that wrapper the result boolean list. + */ + default CompletableFuture> checkAndMutateAll( + List checkAndMutates) { + return allOf(checkAndMutate(checkAndMutates)); + } + /** * Performs multiple mutations atomically on a single row. Currently {@link Put} and * {@link Delete} are supported. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index afd0fac1f855..d49d48bfa875 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CheckAndMutate; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.filter.Filter; @@ -205,6 +206,17 @@ public CompletableFuture thenMutate(RowMutations mutation) { }; } + @Override + public CompletableFuture checkAndMutate(CheckAndMutate checkAndMutate) { + return wrap(rawTable.checkAndMutate(checkAndMutate)); + } + + @Override + public List> checkAndMutate(List checkAndMutates) { + return rawTable.checkAndMutate(checkAndMutates).stream() + .map(this::wrap).collect(toList()); + } + @Override public CompletableFuture mutateRow(RowMutations mutation) { return wrap(rawTable.mutateRow(mutation)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java index 2bfa49dfbed3..d575d0b9517c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java @@ -153,10 +153,10 @@ public CellScanner cellScanner() { * @return a list of Cell objects, returns an empty list if one doesn't exist. */ List getCellList(byte[] family) { - List list = this.familyMap.get(family); + List list = getFamilyCellMap().get(family); if (list == null) { list = new ArrayList<>(); - this.familyMap.put(family, list); + getFamilyCellMap().put(family, list); } return list; } @@ -205,11 +205,11 @@ KeyValue createPutKeyValue(byte[] family, ByteBuffer qualifier, long ts, ByteBuf @Override public Map getFingerprint() { Map map = new HashMap<>(); - List families = new ArrayList<>(this.familyMap.entrySet().size()); + List families = new ArrayList<>(getFamilyCellMap().entrySet().size()); // ideally, we would also include table information, but that information // is not stored in each Operation instance. map.put("families", families); - for (Map.Entry> entry : this.familyMap.entrySet()) { + for (Map.Entry> entry : getFamilyCellMap().entrySet()) { families.add(Bytes.toStringBinary(entry.getKey())); } return map; @@ -233,7 +233,7 @@ public Map toMap(int maxCols) { map.put("row", Bytes.toStringBinary(this.row)); int colCount = 0; // iterate through all column families affected - for (Map.Entry> entry : this.familyMap.entrySet()) { + for (Map.Entry> entry : getFamilyCellMap().entrySet()) { // map from this family to details for each cell affected within the family List> qualifierDetails = new ArrayList<>(); columns.put(Bytes.toStringBinary(entry.getKey()), qualifierDetails); @@ -310,7 +310,7 @@ public Durability getDurability() { * @return true if empty, false otherwise */ public boolean isEmpty() { - return familyMap.isEmpty(); + return getFamilyCellMap().isEmpty(); } /** @@ -441,7 +441,7 @@ private static CellVisibility toCellVisibility(byte[] protoBytes) throws Deseria */ public int size() { int size = 0; - for (List cells : this.familyMap.values()) { + for (List cells : getFamilyCellMap().values()) { size += cells.size(); } return size; @@ -451,7 +451,7 @@ public int size() { * @return the number of different families */ public int numFamilies() { - return familyMap.size(); + return getFamilyCellMap().size(); } /** @@ -465,8 +465,8 @@ public long heapSize() { // Adding map overhead heapsize += - ClassSize.align(this.familyMap.size() * ClassSize.MAP_ENTRY); - for(Map.Entry> entry : this.familyMap.entrySet()) { + ClassSize.align(getFamilyCellMap().size() * ClassSize.MAP_ENTRY); + for(Map.Entry> entry : getFamilyCellMap().entrySet()) { //Adding key overhead heapsize += ClassSize.align(ClassSize.ARRAY + entry.getKey().length); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 0c861617d909..fbe64045b088 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CheckAndMutate; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; @@ -265,91 +266,49 @@ public CompletableFuture increment(Increment increment) { private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder { - private final byte[] row; - - private final byte[] family; - - private byte[] qualifier; - - private TimeRange timeRange; - - private CompareOperator op; - - private byte[] value; + private final CheckAndMutate.Builder builder; public CheckAndMutateBuilderImpl(byte[] row, byte[] family) { - this.row = Preconditions.checkNotNull(row, "row is null"); - this.family = Preconditions.checkNotNull(family, "family is null"); + builder = CheckAndMutate.builder(row, family); } @Override public CheckAndMutateBuilder qualifier(byte[] qualifier) { - this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" + - " an empty byte array, or just do not call this method if you want a null qualifier"); + builder.qualifier(qualifier); return this; } @Override public CheckAndMutateBuilder timeRange(TimeRange timeRange) { - this.timeRange = timeRange; + builder.timeRange(timeRange); return this; } @Override public CheckAndMutateBuilder ifNotExists() { - this.op = CompareOperator.EQUAL; - this.value = null; + builder.ifNotExists(); return this; } @Override public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) { - this.op = Preconditions.checkNotNull(compareOp, "compareOp is null"); - this.value = Preconditions.checkNotNull(value, "value is null"); + builder.ifMatches(compareOp, value); return this; } - private void preCheck() { - Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" + - " calling ifNotExists/ifEquals/ifMatches before executing the request"); - } - @Override public CompletableFuture thenPut(Put put) { - validatePut(put, conn.connConf.getMaxKeyValueSize()); - preCheck(); - return RawAsyncTableImpl.this. newCaller(row, put.getPriority(), rpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, - stub, put, - (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, - null, timeRange, p), - (c, r) -> r.getProcessed())) - .call(); + return checkAndMutate(builder.build(put)); } @Override public CompletableFuture thenDelete(Delete delete) { - preCheck(); - return RawAsyncTableImpl.this. newCaller(row, delete.getPriority(), rpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, - loc, stub, delete, - (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, - null, timeRange, d), - (c, r) -> r.getProcessed())) - .call(); + return checkAndMutate(builder.build(delete)); } @Override public CompletableFuture thenMutate(RowMutations mutation) { - preCheck(); - return RawAsyncTableImpl.this. newCaller(row, mutation.getMaxPriority(), - rpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, - loc, stub, mutation, - (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, - null, timeRange, rm), - resp -> resp.getExists())) - .call(); + return checkAndMutate(builder.build(mutation)); } } @@ -358,66 +317,79 @@ public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { return new CheckAndMutateBuilderImpl(row, family); } - private final class CheckAndMutateWithFilterBuilderImpl implements CheckAndMutateWithFilterBuilder { - private final byte[] row; - - private final Filter filter; - - private TimeRange timeRange; + private final CheckAndMutate.WithFilterBuilder builder; public CheckAndMutateWithFilterBuilderImpl(byte[] row, Filter filter) { - this.row = Preconditions.checkNotNull(row, "row is null"); - this.filter = Preconditions.checkNotNull(filter, "filter is null"); + builder = CheckAndMutate.builder(row, filter); } @Override public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) { - this.timeRange = timeRange; + builder.timeRange(timeRange); return this; } @Override public CompletableFuture thenPut(Put put) { - validatePut(put, conn.connConf.getMaxKeyValueSize()); - return RawAsyncTableImpl.this. newCaller(row, put.getPriority(), rpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, - stub, put, - (rn, p) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, - filter, timeRange, p), - (c, r) -> r.getProcessed())) - .call(); + return checkAndMutate(builder.build(put)); } @Override public CompletableFuture thenDelete(Delete delete) { - return RawAsyncTableImpl.this. newCaller(row, delete.getPriority(), rpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, - loc, stub, delete, - (rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, - filter, timeRange, d), - (c, r) -> r.getProcessed())) - .call(); + return checkAndMutate(builder.build(delete)); } @Override public CompletableFuture thenMutate(RowMutations mutation) { - return RawAsyncTableImpl.this. newCaller(row, mutation.getMaxPriority(), - rpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, + return checkAndMutate(builder.build(mutation)); + } + } + + @Override + public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) { + return new CheckAndMutateWithFilterBuilderImpl(row, filter); + } + + @Override + public CompletableFuture checkAndMutate(CheckAndMutate checkAndMutate) { + if (checkAndMutate.getAction() instanceof Mutation) { + Mutation mutation = (Mutation) checkAndMutate.getAction(); + if (mutation instanceof Put) { + validatePut((Put) mutation, conn.connConf.getMaxKeyValueSize()); + } + return RawAsyncTableImpl.this. newCaller(checkAndMutate.getRow(), + mutation.getPriority(), rpcTimeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, mutation, - (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, - filter, timeRange, rm), + (rn, m) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(), + checkAndMutate.getFamily(), checkAndMutate.getQualifier(), + checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(), + checkAndMutate.getTimeRange(), m), + (c, r) -> r.getProcessed())) + .call(); + } else { + RowMutations rowMutations = (RowMutations) checkAndMutate.getAction(); + return RawAsyncTableImpl.this. newCaller(checkAndMutate.getRow(), + rowMutations.getMaxPriority(), rpcTimeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, + loc, stub, rowMutations, + (rn, rm) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(), + checkAndMutate.getFamily(), checkAndMutate.getQualifier(), + checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(), + checkAndMutate.getTimeRange(), rm), resp -> resp.getExists())) .call(); } } @Override - public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) { - return new CheckAndMutateWithFilterBuilderImpl(row, filter); + public List> checkAndMutate(List checkAndMutates) { + return batch(checkAndMutates, rpcTimeoutNs).stream() + .map(f -> f.thenApply(r -> ((Result)r).getExists())) + .collect(toList()); } // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse, @@ -556,8 +528,15 @@ private List> voidMutate(List actions) { } private List> batch(List actions, long rpcTimeoutNs) { - actions.stream().filter(action -> action instanceof Put).map(action -> (Put) action) - .forEach(put -> validatePut(put, conn.connConf.getMaxKeyValueSize())); + for (Row action : actions) { + if (action instanceof Put) { + validatePut((Put) action, conn.connConf.getMaxKeyValueSize()); + } else if (action instanceof CheckAndMutate && + ((CheckAndMutate) action).getAction() instanceof Put) { + validatePut((Put) ((CheckAndMutate) action).getAction(), + conn.connConf.getMaxKeyValueSize()); + } + } return conn.callerFactory.batch().table(tableName).actions(actions) .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java index 870d83d9149e..81ea7e12fc34 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java @@ -28,6 +28,7 @@ import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CheckAndMutate; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.coprocessor.Batch; @@ -307,14 +308,22 @@ default void delete(List deletes) throws IOException { * table.checkAndMutate(row, family).qualifier(qualifier).ifNotExists().thenPut(put); * * + * + * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it + * any more. */ + @Deprecated default CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { throw new NotImplementedException("Add an implementation!"); } /** * A helper class for sending checkAndMutate request. + * + * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it + * any more. */ + @Deprecated interface CheckAndMutateBuilder { /** @@ -377,14 +386,22 @@ default CheckAndMutateBuilder ifEquals(byte[] value) { * table.checkAndMutate(row, filter).thenPut(put); * * + * + * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it + * any more. */ + @Deprecated default CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) { throw new NotImplementedException("Add an implementation!"); } /** * A helper class for sending checkAndMutate request with a filter. + * + * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it + * any more. */ + @Deprecated interface CheckAndMutateWithFilterBuilder { /** @@ -411,6 +428,29 @@ interface CheckAndMutateWithFilterBuilder { boolean thenMutate(RowMutations mutation) throws IOException; } + /** + * checkAndMutate that atomically checks if a row matches the specified condition. If it does, + * it adds the Put/Delete/RowMutations. + * + * @param checkAndMutate The CheckAndMutate object. + * @return boolean that represents the result for the CheckAndMutate. + * @throws IOException if a remote or network exception occurs. + */ + default boolean checkAndMutate(CheckAndMutate checkAndMutate) throws IOException { + return checkAndMutate(Collections.singletonList(checkAndMutate))[0]; + } + + /** + * Batch version of checkAndMutate. + * + * @param checkAndMutates The list of CheckAndMutate. + * @return A array of boolean that represents the result for each CheckAndMutate. + * @throws IOException if a remote or network exception occurs. + */ + default boolean[] checkAndMutate(List checkAndMutates) throws IOException { + throw new NotImplementedException("Add an implementation!"); + } + /** * Performs multiple mutations atomically on a single row. Currently * {@link Put} and {@link Delete} are supported. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java index 841f8ba5733a..9ea1b7597c6c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java @@ -37,6 +37,7 @@ import java.util.stream.Collectors; import org.apache.commons.lang3.ArrayUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CheckAndMutate; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; @@ -299,6 +300,16 @@ public boolean thenMutate(RowMutations mutation) throws IOException { }; } + @Override + public boolean checkAndMutate(CheckAndMutate checkAndMutate) throws IOException { + return FutureUtils.get(table.checkAndMutate(checkAndMutate)); + } + + @Override + public boolean[] checkAndMutate(List checkAndMutates) throws IOException { + return Booleans.toArray(FutureUtils.get(table.checkAndMutateAll(checkAndMutates))); + } + @Override public void mutateRow(RowMutations rm) throws IOException { FutureUtils.get(table.mutateRow(rm)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 9a5156d992f1..8f05023a8ff2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -28,6 +28,7 @@ import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hbase.CellScannable; +import org.apache.hadoop.hbase.CheckAndMutate; import org.apache.hadoop.hbase.ClusterMetrics.Option; import org.apache.hadoop.hbase.ClusterMetricsBuilder; import org.apache.hadoop.hbase.CompareOperator; @@ -193,37 +194,20 @@ public static GetRequest buildGetRequest(final byte[] regionName, } /** - * Create a protocol buffer MutateRequest for a conditioned put + * Create a protocol buffer MutateRequest for a conditioned put/delete * * @return a mutate request * @throws IOException */ - public static MutateRequest buildMutateRequest( - final byte[] regionName, final byte[] row, final byte[] family, - final byte [] qualifier, final CompareOperator op, final byte[] value, final Filter filter, - final TimeRange timeRange, final Put put) throws IOException { - return buildMutateRequest(regionName, row, family, qualifier, op, value, filter, timeRange, - put, MutationType.PUT); - } - - /** - * Create a protocol buffer MutateRequest for a conditioned delete - * - * @return a mutate request - * @throws IOException - */ - public static MutateRequest buildMutateRequest( - final byte[] regionName, final byte[] row, final byte[] family, - final byte [] qualifier, final CompareOperator op, final byte[] value, final Filter filter, - final TimeRange timeRange, final Delete delete) throws IOException { - return buildMutateRequest(regionName, row, family, qualifier, op, value, filter, timeRange, - delete, MutationType.DELETE); - } - public static MutateRequest buildMutateRequest(final byte[] regionName, final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator op, final byte[] value, - final Filter filter, final TimeRange timeRange, final Mutation mutation, - final MutationType type) throws IOException { + final Filter filter, final TimeRange timeRange, final Mutation mutation) throws IOException { + MutationType type; + if (mutation instanceof Put) { + type = MutationType.PUT; + } else { + type = MutationType.DELETE; + } return MutateRequest.newBuilder() .setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)) .setMutation(ProtobufUtil.toMutation(type, mutation)) @@ -262,9 +246,8 @@ public static ClientProtos.MultiRequest buildMutateRequest(final byte[] regionNa actionBuilder.setMutation(mp); builder.addAction(actionBuilder.build()); } - return ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build()) - .setCondition(buildCondition(row, family, qualifier, op, value, filter, timeRange)) - .build(); + return ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.setCondition( + buildCondition(row, family, qualifier, op, value, filter, timeRange)).build()).build(); } /** @@ -382,42 +365,6 @@ public static RegionAction.Builder buildRegionAction(final byte [] regionName, return builder; } - /** - * Create a protocol buffer MultiRequest for row mutations that does not hold data. Data/Cells - * are carried outside of protobuf. Return references to the Cells in cells param. - * Does not propagate Action absolute position. Does not set atomic action on the created - * RegionAtomic. Caller should do that if wanted. - * @param regionName - * @param rowMutations - * @param cells Return in here a list of Cells as CellIterable. - * @return a region mutation minus data - * @throws IOException - */ - public static RegionAction.Builder buildNoDataRegionAction(final byte[] regionName, - final RowMutations rowMutations, final List cells, - final RegionAction.Builder regionActionBuilder, - final ClientProtos.Action.Builder actionBuilder, - final MutationProto.Builder mutationBuilder) - throws IOException { - for (Mutation mutation: rowMutations.getMutations()) { - MutationType type = null; - if (mutation instanceof Put) { - type = MutationType.PUT; - } else if (mutation instanceof Delete) { - type = MutationType.DELETE; - } else { - throw new DoNotRetryIOException("RowMutations supports only put and delete, not " + - mutation.getClass().getName()); - } - mutationBuilder.clear(); - MutationProto mp = ProtobufUtil.toMutationNoData(type, mutation, mutationBuilder); - cells.add(mutation); - actionBuilder.clear(); - regionActionBuilder.addAction(actionBuilder.setMutation(mp).build()); - } - return regionActionBuilder; - } - public static RegionAction.Builder getRegionActionBuilderWithRegion( final RegionAction.Builder regionActionBuilder, final byte [] regionName) { RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName); @@ -572,8 +519,8 @@ public static BulkLoadHFileRequest buildBulkLoadHFileRequest( * @param actionBuilder actionBuilder to be used to build action. * @param mutationBuilder mutationBuilder to be used to build mutation. * @param nonceGroup nonceGroup to be applied. - * @param rowMutationsIndexMap Map of created RegionAction to the original index for a - * RowMutations within the original list of actions + * @param indexMap Map of created RegionAction to the original index for a + * RowMutations/CheckAndMutate within the original list of actions * @throws IOException */ public static void buildNoDataRegionActions(final byte[] regionName, @@ -582,14 +529,14 @@ public static void buildNoDataRegionActions(final byte[] regionName, final RegionAction.Builder regionActionBuilder, final ClientProtos.Action.Builder actionBuilder, final MutationProto.Builder mutationBuilder, - long nonceGroup, final Map rowMutationsIndexMap) throws IOException { + long nonceGroup, final Map indexMap) throws IOException { regionActionBuilder.clear(); RegionAction.Builder builder = getRegionActionBuilderWithRegion( regionActionBuilder, regionName); ClientProtos.CoprocessorServiceCall.Builder cpBuilder = null; - RegionAction.Builder rowMutationsRegionActionBuilder = null; boolean hasNonce = false; List rowMutationsList = new ArrayList<>(); + List checkAndMutates = new ArrayList<>(); for (Action action: actions) { Row row = action.getAction(); @@ -600,26 +547,9 @@ public static void buildNoDataRegionActions(final byte[] regionName, Get g = (Get)row; builder.addAction(actionBuilder.setGet(ProtobufUtil.toGet(g))); } else if (row instanceof Put) { - Put p = (Put)row; - cells.add(p); - builder.addAction(actionBuilder. - setMutation(ProtobufUtil.toMutationNoData(MutationType.PUT, p, mutationBuilder))); + buildNoDataRegionAction((Put) row, cells, builder, actionBuilder, mutationBuilder); } else if (row instanceof Delete) { - Delete d = (Delete)row; - int size = d.size(); - // Note that a legitimate Delete may have a size of zero; i.e. a Delete that has nothing - // in it but the row to delete. In this case, the current implementation does not make - // a KeyValue to represent a delete-of-all-the-row until we serialize... For such cases - // where the size returned is zero, we will send the Delete fully pb'd rather than have - // metadata only in the pb and then send the kv along the side in cells. - if (size > 0) { - cells.add(d); - builder.addAction(actionBuilder. - setMutation(ProtobufUtil.toMutationNoData(MutationType.DELETE, d, mutationBuilder))); - } else { - builder.addAction(actionBuilder. - setMutation(ProtobufUtil.toMutation(MutationType.DELETE, d, mutationBuilder))); - } + buildNoDataRegionAction((Delete) row, cells, builder, actionBuilder, mutationBuilder); } else if (row instanceof Append) { Append a = (Append)row; cells.add(a); @@ -650,6 +580,8 @@ public static void buildNoDataRegionActions(final byte[] regionName, .setRequest(value))); } else if (row instanceof RowMutations) { rowMutationsList.add(action); + } else if (row instanceof CheckAndMutate) { + checkAndMutates.add(action); } else { throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName()); } @@ -665,23 +597,104 @@ public static void buildNoDataRegionActions(final byte[] regionName, // on the one row. We do separate RegionAction for each RowMutations. // We maintain a map to keep track of this RegionAction and the original Action index. for (Action action : rowMutationsList) { - RowMutations rms = (RowMutations) action.getAction(); - if (rowMutationsRegionActionBuilder == null) { - rowMutationsRegionActionBuilder = ClientProtos.RegionAction.newBuilder(); + builder.clear(); + getRegionActionBuilderWithRegion(builder, regionName); + actionBuilder.clear(); + mutationBuilder.clear(); + + buildNoDataRegionAction((RowMutations) action.getAction(), cells, builder, actionBuilder, + mutationBuilder); + builder.setAtomic(true); + + multiRequestBuilder.addRegionAction(builder.build()); + + // This rowMutations region action is at (multiRequestBuilder.getRegionActionCount() - 1) + // in the overall multiRequest. + indexMap.put(multiRequestBuilder.getRegionActionCount() - 1, action.getOriginalIndex()); + } + + // Process CheckAndMutate here. Similar to RowMutations, we do separate RegionAction for each + // CheckAndMutate and maintain a map to keep track of this RegionAction and the original + // Action index. + for (Action action : checkAndMutates) { + builder.clear(); + getRegionActionBuilderWithRegion(builder, regionName); + actionBuilder.clear(); + mutationBuilder.clear(); + + CheckAndMutate cam = (CheckAndMutate) action.getAction(); + builder.setCondition(buildCondition(cam.getRow(), cam.getFamily(), cam.getQualifier(), + cam.getCompareOp(), cam.getValue(), cam.getFilter(), cam.getTimeRange())); + + if (cam.getAction() instanceof Put) { + buildNoDataRegionAction((Put) cam.getAction(), cells, builder, actionBuilder, + mutationBuilder); + } else if (cam.getAction() instanceof Delete) { + buildNoDataRegionAction((Delete) cam.getAction(), cells, builder, actionBuilder, + mutationBuilder); + } else if (cam.getAction() instanceof RowMutations) { + buildNoDataRegionAction((RowMutations) cam.getAction(), cells, builder, actionBuilder, + mutationBuilder); + builder.setAtomic(true); } else { - rowMutationsRegionActionBuilder.clear(); + throw new DoNotRetryIOException("CheckAndMutate doesn't support " + + cam.getAction().getClass().getName()); } - rowMutationsRegionActionBuilder.setRegion( - RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)); - rowMutationsRegionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, rms, - cells, rowMutationsRegionActionBuilder, actionBuilder, mutationBuilder); - rowMutationsRegionActionBuilder.setAtomic(true); - // Put it in the multiRequestBuilder - multiRequestBuilder.addRegionAction(rowMutationsRegionActionBuilder.build()); - // This rowMutations region action is at (multiRequestBuilder.getRegionActionCount() - 1) + + multiRequestBuilder.addRegionAction(builder.build()); + + // This CheckAndMutate region action is at (multiRequestBuilder.getRegionActionCount() - 1) // in the overall multiRequest. - rowMutationsIndexMap.put(multiRequestBuilder.getRegionActionCount() - 1, - action.getOriginalIndex()); + indexMap.put(multiRequestBuilder.getRegionActionCount() - 1, action.getOriginalIndex()); + } + } + + private static void buildNoDataRegionAction(final Put put, final List cells, + final RegionAction.Builder regionActionBuilder, + final ClientProtos.Action.Builder actionBuilder, + final MutationProto.Builder mutationBuilder) throws IOException { + cells.add(put); + regionActionBuilder.addAction(actionBuilder. + setMutation(ProtobufUtil.toMutationNoData(MutationType.PUT, put, mutationBuilder))); + } + + private static void buildNoDataRegionAction(final Delete delete, + final List cells, final RegionAction.Builder regionActionBuilder, + final ClientProtos.Action.Builder actionBuilder, final MutationProto.Builder mutationBuilder) + throws IOException { + int size = delete.size(); + // Note that a legitimate Delete may have a size of zero; i.e. a Delete that has nothing + // in it but the row to delete. In this case, the current implementation does not make + // a KeyValue to represent a delete-of-all-the-row until we serialize... For such cases + // where the size returned is zero, we will send the Delete fully pb'd rather than have + // metadata only in the pb and then send the kv along the side in cells. + if (size > 0) { + cells.add(delete); + regionActionBuilder.addAction(actionBuilder. + setMutation(ProtobufUtil.toMutationNoData(MutationType.DELETE, delete, mutationBuilder))); + } else { + regionActionBuilder.addAction(actionBuilder. + setMutation(ProtobufUtil.toMutation(MutationType.DELETE, delete, mutationBuilder))); + } + } + + private static void buildNoDataRegionAction(final RowMutations rowMutations, + final List cells, final RegionAction.Builder regionActionBuilder, + final ClientProtos.Action.Builder actionBuilder, final MutationProto.Builder mutationBuilder) + throws IOException { + for (Mutation mutation: rowMutations.getMutations()) { + MutationType type; + if (mutation instanceof Put) { + type = MutationType.PUT; + } else if (mutation instanceof Delete) { + type = MutationType.DELETE; + } else { + throw new DoNotRetryIOException("RowMutations supports only put and delete, not " + + mutation.getClass().getName()); + } + MutationProto mp = ProtobufUtil.toMutationNoData(type, mutation, mutationBuilder); + cells.add(mutation); + regionActionBuilder.addAction(actionBuilder.setMutation(mp).build()); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java index d7378a685f3c..19e67351470f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java @@ -105,14 +105,14 @@ public static org.apache.hadoop.hbase.client.MultiResponse getResults(final Mult * Get the results from a protocol buffer MultiResponse * * @param request the original protocol buffer MultiRequest - * @param rowMutationsIndexMap Used to support RowMutations in batch + * @param indexMap Used to support RowMutations/CheckAndMutate in batch * @param response the protocol buffer MultiResponse to convert * @param cells Cells to go with the passed in proto. Can be null. * @return the results that were in the MultiResponse (a Result or an Exception). * @throws IOException */ public static org.apache.hadoop.hbase.client.MultiResponse getResults(final MultiRequest request, - final Map rowMutationsIndexMap, final MultiResponse response, + final Map indexMap, final MultiResponse response, final CellScanner cells) throws IOException { int requestRegionActionCount = request.getRegionActionCount(); int responseRegionActionResultCount = response.getRegionActionResultCount(); @@ -149,18 +149,17 @@ public static org.apache.hadoop.hbase.client.MultiResponse getResults(final Mult Object responseValue; - // For RowMutations action, if there is an exception, the exception is set + // For RowMutations/CheckAndMutate action, if there is an exception, the exception is set // at the RegionActionResult level and the ResultOrException is null at the original index - Integer rowMutationsIndex = - (rowMutationsIndexMap == null ? null : rowMutationsIndexMap.get(i)); - if (rowMutationsIndex != null) { - // This RegionAction is from a RowMutations in a batch. + Integer index = (indexMap == null ? null : indexMap.get(i)); + if (index != null) { + // This RegionAction is from a RowMutations/CheckAndMutate in a batch. // If there is an exception from the server, the exception is set at // the RegionActionResult level, which has been handled above. - responseValue = response.getProcessed() ? + responseValue = actionResult.getProcessed() ? ProtobufUtil.EMPTY_RESULT_EXISTS_TRUE : ProtobufUtil.EMPTY_RESULT_EXISTS_FALSE; - results.add(regionName, rowMutationsIndex, responseValue); + results.add(regionName, index, responseValue); continue; } @@ -171,11 +170,11 @@ public static org.apache.hadoop.hbase.client.MultiResponse getResults(final Mult responseValue = ProtobufUtil.toResult(roe.getResult(), cells); } else if (roe.hasServiceResult()) { responseValue = roe.getServiceResult(); - } else{ + } else { // Sometimes, the response is just "it was processed". Generally, this occurs for things // like mutateRows where either we get back 'processed' (or not) and optionally some // statistics about the regions we touched. - responseValue = response.getProcessed() ? + responseValue = actionResult.getProcessed() ? ProtobufUtil.EMPTY_RESULT_EXISTS_TRUE : ProtobufUtil.EMPTY_RESULT_EXISTS_FALSE; } diff --git a/hbase-protocol-shaded/src/main/protobuf/client/Client.proto b/hbase-protocol-shaded/src/main/protobuf/client/Client.proto index fbb07698ae8e..7678211ea382 100644 --- a/hbase-protocol-shaded/src/main/protobuf/client/Client.proto +++ b/hbase-protocol-shaded/src/main/protobuf/client/Client.proto @@ -455,6 +455,7 @@ message RegionAction { // When set, run mutations as atomic unit. optional bool atomic = 2; repeated Action action = 3; + optional Condition condition = 4; } /* @@ -499,6 +500,7 @@ message RegionActionResult { repeated ResultOrException resultOrException = 1; // If the operation failed globally for this region, this exception is set optional NameBytesPair exception = 2; + optional bool processed = 3; } /** @@ -511,13 +513,16 @@ message RegionActionResult { message MultiRequest { repeated RegionAction regionAction = 1; optional uint64 nonceGroup = 2; - optional Condition condition = 3; + // Moved this to RegionAction in HBASE-8458. Keep it for backward compatibility. Need to remove + // it in the future. + optional Condition condition = 3 [deprecated=true]; } message MultiResponse { repeated RegionActionResult regionActionResult = 1; - // used for mutate to indicate processed only - optional bool processed = 2; + // Moved this to RegionActionResult in HBASE-8458. Keep it for backward compatibility. Need to + // remove it in the future. + optional bool processed = 2 [deprecated=true]; optional MultiRegionLoadStats regionStatistics = 3; } diff --git a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java index aaf1954afde8..2ca0b4d32d23 100644 --- a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java +++ b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java @@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.CheckAndMutate; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; @@ -744,6 +745,16 @@ public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) throw new NotImplementedException("Implement later"); } + @Override + public boolean checkAndMutate(CheckAndMutate checkAndMutate) { + throw new NotImplementedException("Implement later"); + } + + @Override + public boolean[] checkAndMutate(List checkAndMutates) { + throw new NotImplementedException("Implement later"); + } + @Override public Result increment(Increment increment) throws IOException { throw new IOException("Increment not supported"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index d7381d1bbb93..ed4bc7727068 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -2753,18 +2753,93 @@ public MultiResponse multi(final RpcController rpcc, final MultiRequest request) long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; - // this will contain all the cells that we need to return. It's created later, if needed. - List cellsToReturn = null; MultiResponse.Builder responseBuilder = MultiResponse.newBuilder(); RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder(); - Boolean processed = null; - RegionScannersCloseCallBack closeCallBack = null; - RpcCallContext context = RpcServer.getCurrentCall().orElse(null); this.rpcMultiRequestCount.increment(); this.requestCount.increment(); + ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements(); + + // We no longer use MultiRequest#condition. Instead, we use RegionAction#condition. The + // following logic is for backward compatibility as old clients still use + // MultiRequest#condition in case of checkAndMutate with RowMutations. + if (request.hasCondition()) { + if (request.getRegionActionList().isEmpty()) { + // If the region action list is empty, do nothing. + responseBuilder.setProcessed(true); + return responseBuilder.build(); + } + + RegionAction regionAction = request.getRegionAction(0); + + // When request.hasCondition() is true, regionAction.getAtomic() should be always true. So + // we can assume regionAction.getAtomic() is true here. + assert regionAction.getAtomic(); + + OperationQuota quota; + HRegion region; + RegionSpecifier regionSpecifier = regionAction.getRegion(); + + try { + region = getRegion(regionSpecifier); + quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList()); + } catch (IOException e) { + failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); + return responseBuilder.build(); + } + + boolean rejectIfFromClient = shouldRejectRequestsFromClient(region); + // We only allow replication in standby state and it will not set the atomic flag. + if (rejectIfFromClient) { + failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, + new DoNotRetryIOException( + region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); + quota.close(); + return responseBuilder.build(); + } + + try { + Condition condition = request.getCondition(); + byte[] row = condition.getRow().toByteArray(); + byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null; + byte[] qualifier = condition.hasQualifier() ? + condition.getQualifier().toByteArray() : null; + CompareOperator op = condition.hasCompareType() ? + CompareOperator.valueOf(condition.getCompareType().name()) : null; + ByteArrayComparable comparator = condition.hasComparator() ? + ProtobufUtil.toComparator(condition.getComparator()) : null; + Filter filter = condition.hasFilter() ? + ProtobufUtil.toFilter(condition.getFilter()) : null; + TimeRange timeRange = condition.hasTimeRange() ? + ProtobufUtil.toTimeRange(condition.getTimeRange()) : + TimeRange.allTime(); + boolean processed = + checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family, + qualifier, op, comparator, filter, timeRange, regionActionResultBuilder, + spaceQuotaEnforcement); + responseBuilder.setProcessed(processed); + } catch (IOException e) { + rpcServer.getMetrics().exception(e); + // As it's an atomic operation with a condition, we may expect it's a global failure. + regionActionResultBuilder.setException(ResponseConverter.buildException(e)); + } + + responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); + quota.close(); + ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); + if (regionLoadStats != null) { + responseBuilder.setRegionStatistics(MultiRegionLoadStats.newBuilder() + .addRegion(regionSpecifier).addStat(regionLoadStats).build()); + } + return responseBuilder.build(); + } + + // this will contain all the cells that we need to return. It's created later, if needed. + List cellsToReturn = null; + RegionScannersCloseCallBack closeCallBack = null; + RpcCallContext context = RpcServer.getCurrentCall().orElse(null); Map regionStats = new HashMap<>(request .getRegionActionCount()); - ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements(); + for (RegionAction regionAction : request.getRegionActionList()) { OperationQuota quota; HRegion region; @@ -2777,8 +2852,10 @@ public MultiResponse multi(final RpcController rpcc, final MultiRequest request) failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); continue; // For this region it's a failure. } + boolean rejectIfFromClient = shouldRejectRequestsFromClient(region); - if (regionAction.hasAtomic() && regionAction.getAtomic()) { + + if (regionAction.hasCondition()) { // We only allow replication in standby state and it will not set the atomic flag. if (rejectIfFromClient) { failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, @@ -2787,33 +2864,104 @@ public MultiResponse multi(final RpcController rpcc, final MultiRequest request) quota.close(); continue; } - // How does this call happen? It may need some work to play well w/ the surroundings. - // Need to return an item per Action along w/ Action index. TODO. + try { - if (request.hasCondition()) { - Condition condition = request.getCondition(); - byte[] row = condition.getRow().toByteArray(); - byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null; - byte[] qualifier = condition.hasQualifier() ? - condition.getQualifier().toByteArray() : null; - CompareOperator op = condition.hasCompareType() ? - CompareOperator.valueOf(condition.getCompareType().name()) : null; - ByteArrayComparable comparator = condition.hasComparator() ? - ProtobufUtil.toComparator(condition.getComparator()) : null; - Filter filter = condition.hasFilter() ? - ProtobufUtil.toFilter(condition.getFilter()) : null; - TimeRange timeRange = condition.hasTimeRange() ? - ProtobufUtil.toTimeRange(condition.getTimeRange()) : - TimeRange.allTime(); + Condition condition = regionAction.getCondition(); + byte[] row = condition.getRow().toByteArray(); + byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null; + byte[] qualifier = condition.hasQualifier() ? + condition.getQualifier().toByteArray() : null; + CompareOperator op = condition.hasCompareType() ? + CompareOperator.valueOf(condition.getCompareType().name()) : null; + ByteArrayComparable comparator = condition.hasComparator() ? + ProtobufUtil.toComparator(condition.getComparator()) : null; + Filter filter = condition.hasFilter() ? + ProtobufUtil.toFilter(condition.getFilter()) : null; + TimeRange timeRange = condition.hasTimeRange() ? + ProtobufUtil.toTimeRange(condition.getTimeRange()) : + TimeRange.allTime(); + + boolean processed; + if (regionAction.hasAtomic() && regionAction.getAtomic()) { + // RowMutations processed = checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family, qualifier, op, comparator, filter, timeRange, regionActionResultBuilder, spaceQuotaEnforcement); } else { - doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(), - cellScanner, spaceQuotaEnforcement); - processed = Boolean.TRUE; + if (regionAction.getActionList().isEmpty()) { + // If the region action list is empty, do nothing. + regionActionResultBuilder.setProcessed(true); + continue; + } + Action action = regionAction.getAction(0); + if (action.hasGet()) { + throw new DoNotRetryIOException("CheckAndMutate doesn't support GET=" + + action.getGet()); + } + MutationProto mutation = action.getMutation(); + switch (mutation.getMutateType()) { + case PUT: + Put put = ProtobufUtil.toPut(mutation, cellScanner); + checkCellSizeLimit(region, put); + // Throws an exception when violated + spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); + quota.addMutation(put); + + if (filter != null) { + processed = region.checkAndMutate(row, filter, timeRange, put); + } else { + processed = region.checkAndMutate(row, family, qualifier, op, comparator, + timeRange, put); + } + break; + + case DELETE: + Delete delete = ProtobufUtil.toDelete(mutation, cellScanner); + checkCellSizeLimit(region, delete); + spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete); + quota.addMutation(delete); + + if (filter != null) { + processed = region.checkAndMutate(row, filter, timeRange, delete); + } else { + processed = region.checkAndMutate(row, family, qualifier, op, comparator, + timeRange, delete); + } + break; + + default: + throw new DoNotRetryIOException("CheckAndMutate doesn't support " + + mutation.getMutateType()); + } + + // To unify the response format with doNonAtomicRegionMutation and read through + // client's AsyncProcess we have to add an empty result instance per operation + regionActionResultBuilder.addResultOrException(ClientProtos.ResultOrException + .newBuilder().setIndex(0).build()); } + regionActionResultBuilder.setProcessed(processed); + } catch (IOException e) { + rpcServer.getMetrics().exception(e); + // As it's an atomic operation with a condition, we may expect it's a global failure. + regionActionResultBuilder.setException(ResponseConverter.buildException(e)); + } + } else if (regionAction.hasAtomic() && regionAction.getAtomic()) { + // We only allow replication in standby state and it will not set the atomic flag. + if (rejectIfFromClient) { + failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, + new DoNotRetryIOException( + region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); + quota.close(); + continue; + } + try { + doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(), + cellScanner, spaceQuotaEnforcement); + regionActionResultBuilder.setProcessed(true); + // We no longer use MultiResponse#processed. Instead, we use + // RegionActionResult#condition. This is for backward compatibility for old clients. + responseBuilder.setProcessed(true); } catch (IOException e) { rpcServer.getMetrics().exception(e); // As it's atomic, we may expect it's a global failure. @@ -2853,10 +3001,6 @@ public MultiResponse multi(final RpcController rpcc, final MultiRequest request) controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn)); } - if (processed != null) { - responseBuilder.setProcessed(processed); - } - MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder(); for(Entry stat: regionStats.entrySet()){ builder.addRegion(stat.getKey()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java index 964e929f5d09..29b4bf76d146 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CheckAndMutate; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.filter.Filter; @@ -112,6 +113,16 @@ public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) return null; } + @Override + public CompletableFuture checkAndMutate(CheckAndMutate checkAndMutate) { + return null; + } + + @Override + public List> checkAndMutate(List checkAndMutates) { + return null; + } + @Override public CompletableFuture mutateRow(RowMutations mutation) { return null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java index b9fb81195756..5241d08d48e8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java @@ -42,6 +42,7 @@ import java.util.function.Supplier; import java.util.stream.IntStream; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.hbase.CheckAndMutate; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -634,4 +635,319 @@ public void testInvalidPut() { assertThat(e.getMessage(), containsString("KeyValue size too large")); } } + + @Test + public void testCheckAndMutateBatch() throws Throwable { + AsyncTable table = getTable.get(); + byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); + byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3"); + byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4"); + + table.putAll(Arrays.asList( + new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")), + new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")), + new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), + new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get(); + + // Test for Put + CheckAndMutate checkAndMutate1 = CheckAndMutate.builder(row, FAMILY) + .qualifier(Bytes.toBytes("A")) + .ifEquals(Bytes.toBytes("a")) + .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e"))); + + CheckAndMutate checkAndMutate2 = CheckAndMutate.builder(row2, FAMILY) + .qualifier(Bytes.toBytes("B")) + .ifEquals(Bytes.toBytes("a")) + .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f"))); + + List results = + table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); + + assertTrue(results.get(0)); + assertFalse(results.get(1)); + + Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get(); + assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); + + result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get(); + assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); + + // Test for Delete + checkAndMutate1 = CheckAndMutate.builder(row, FAMILY) + .qualifier(Bytes.toBytes("A")) + .ifEquals(Bytes.toBytes("e")) + .build(new Delete(row)); + + checkAndMutate2 = CheckAndMutate.builder(row2, FAMILY) + .qualifier(Bytes.toBytes("B")) + .ifEquals(Bytes.toBytes("a")) + .build(new Delete(row2)); + + results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); + + assertTrue(results.get(0)); + assertFalse(results.get(1)); + + assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get()); + + result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get(); + assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); + + // Test for RowMutations + checkAndMutate1 = CheckAndMutate.builder(row3, FAMILY) + .qualifier(Bytes.toBytes("C")) + .ifEquals(Bytes.toBytes("c")) + .build(new RowMutations(row3) + .add((Mutation) new Put(row3) + .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))) + .add((Mutation) new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C")))); + + checkAndMutate2 = CheckAndMutate.builder(row4, FAMILY) + .qualifier(Bytes.toBytes("D")) + .ifEquals(Bytes.toBytes("f")) + .build(new RowMutations(row4) + .add((Mutation) new Put(row4) + .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))) + .add((Mutation) new Delete(row4).addColumns(FAMILY, Bytes.toBytes("D")))); + + results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); + + assertTrue(results.get(0)); + assertFalse(results.get(1)); + + result = table.get(new Get(row3)).get(); + assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); + assertNull(result.getValue(FAMILY, Bytes.toBytes("D"))); + + result = table.get(new Get(row4)).get(); + assertNull(result.getValue(FAMILY, Bytes.toBytes("F"))); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + } + + @Test + public void testCheckAndMutateBatch2() throws Throwable { + AsyncTable table = getTable.get(); + byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); + byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3"); + byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4"); + + table.putAll(Arrays.asList( + new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")), + new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")), + new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), 100, Bytes.toBytes("c")), + new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d")))).get(); + + // Test for ifNotExists() + CheckAndMutate checkAndMutate1 = CheckAndMutate.builder(row, FAMILY) + .qualifier(Bytes.toBytes("B")) + .ifNotExists() + .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e"))); + + CheckAndMutate checkAndMutate2 = CheckAndMutate.builder(row2, FAMILY) + .qualifier(Bytes.toBytes("B")) + .ifNotExists() + .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f"))); + + List results = + table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); + + assertTrue(results.get(0)); + assertFalse(results.get(1)); + + Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get(); + assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); + + result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get(); + assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); + + // Test for ifMatches() + checkAndMutate1 = CheckAndMutate.builder(row, FAMILY) + .qualifier(Bytes.toBytes("A")) + .ifMatches(CompareOperator.NOT_EQUAL, Bytes.toBytes("a")) + .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))); + + checkAndMutate2 = CheckAndMutate.builder(row2, FAMILY) + .qualifier(Bytes.toBytes("B")) + .ifMatches(CompareOperator.GREATER, Bytes.toBytes("b")) + .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f"))); + + results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); + + assertTrue(results.get(0)); + assertFalse(results.get(1)); + + result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get(); + assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); + + result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get(); + assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); + + // Test for timeRange() + checkAndMutate1 = CheckAndMutate.builder(row3, FAMILY) + .qualifier(Bytes.toBytes("C")) + .timeRange(TimeRange.between(0, 101)) + .ifEquals(Bytes.toBytes("c")) + .build(new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("e"))); + + checkAndMutate2 = CheckAndMutate.builder(row4, FAMILY) + .qualifier(Bytes.toBytes("D")) + .timeRange(TimeRange.between(0, 100)) + .ifEquals(Bytes.toBytes("d")) + .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f"))); + + results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); + + assertTrue(results.get(0)); + assertFalse(results.get(1)); + + result = table.get(new Get(row3).addColumn(FAMILY, Bytes.toBytes("C"))).get(); + assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); + + result = table.get(new Get(row4).addColumn(FAMILY, Bytes.toBytes("D"))).get(); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + } + + @Test + public void testCheckAndMutateBatchWithFilter() throws Throwable { + AsyncTable table = getTable.get(); + byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); + + table.putAll(Arrays.asList( + new Put(row) + .addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) + .addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")) + .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), + new Put(row2) + .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")) + .addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")) + .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))).get(); + + // Test for Put + CheckAndMutate checkAndMutate1 = CheckAndMutate.builder(row, new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g"))); + + CheckAndMutate checkAndMutate2 = CheckAndMutate.builder(row2, new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h"))); + + List results = + table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); + + assertTrue(results.get(0)); + assertFalse(results.get(1)); + + Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get(); + assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); + + result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get(); + assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); + + // Test for Delete + checkAndMutate1 = CheckAndMutate.builder(row, new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("C"))); + + checkAndMutate2 = CheckAndMutate.builder(row2, new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .build(new Delete(row2).addColumn(FAMILY, Bytes.toBytes("F"))); + + results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); + + assertTrue(results.get(0)); + assertFalse(results.get(1)); + + assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get()); + + result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get(); + assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); + + // Test for RowMutations + checkAndMutate1 = CheckAndMutate.builder(row, new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .build(new RowMutations(row) + .add((Mutation) new Put(row) + .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))) + .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))); + + checkAndMutate2 = CheckAndMutate.builder(row2, new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .build(new RowMutations(row2) + .add((Mutation) new Put(row2) + .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("g"))) + .add((Mutation) new Delete(row2).addColumns(FAMILY, Bytes.toBytes("D")))); + + results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); + + assertTrue(results.get(0)); + assertFalse(results.get(1)); + + result = table.get(new Get(row)).get(); + assertNull(result.getValue(FAMILY, Bytes.toBytes("A"))); + assertEquals("c", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); + + result = table.get(new Get(row2)).get(); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); + } + + @Test + public void testCheckAndMutateBatchWithFilterAndTimeRange() throws Throwable { + AsyncTable table = getTable.get(); + byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); + + table.putAll(Arrays.asList( + new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")) + .addColumn(FAMILY, Bytes.toBytes("B"), 100, Bytes.toBytes("b")) + .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), + new Put(row2).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d")) + .addColumn(FAMILY, Bytes.toBytes("E"), 100, Bytes.toBytes("e")) + .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))).get(); + + CheckAndMutate checkAndMutate1 = CheckAndMutate.builder(row, new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .timeRange(TimeRange.between(0, 101)) + .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g"))); + + CheckAndMutate checkAndMutate2 = CheckAndMutate.builder(row2, new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL, + Bytes.toBytes("d")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL, + Bytes.toBytes("e")))) + .timeRange(TimeRange.between(0, 100)) + .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h"))); + + List results = + table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); + + assertTrue(results.get(0)); + assertFalse(results.get(1)); + + Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get(); + assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); + + result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get(); + assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java index 42e61d7456e6..7eab0f0a3ccf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -42,6 +43,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CheckAndMutate; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; @@ -333,4 +335,59 @@ public void testInvalidPut() { assertThat(e.getMessage(), containsString("KeyValue size too large")); } } + + @Test + public void testWithCheckAndMutate() throws Exception { + AsyncTable table = tableGetter.apply(TABLE_NAME); + + byte[] row1 = Bytes.toBytes("row1"); + byte[] row2 = Bytes.toBytes("row2"); + byte[] row3 = Bytes.toBytes("row3"); + byte[] row4 = Bytes.toBytes("row4"); + byte[] row5 = Bytes.toBytes("row5"); + + table.putAll(Arrays.asList( + new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")), + new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")), + new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), + new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")), + new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get(); + + CheckAndMutate checkAndMutate1 = CheckAndMutate.builder(row1, FAMILY) + .qualifier(Bytes.toBytes("A")) + .ifEquals(Bytes.toBytes("a")) + .build(new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("g"))); + Get get = new Get(row2).addColumn(FAMILY, Bytes.toBytes("B")); + RowMutations mutations = new RowMutations(row3) + .add((Mutation) new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C"))) + .add((Mutation) new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))); + CheckAndMutate checkAndMutate2 = CheckAndMutate.builder(row4, FAMILY) + .qualifier(Bytes.toBytes("D")) + .ifEquals(Bytes.toBytes("a")) + .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("h"))); + Put put = new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("f")); + + List actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put); + List results = table.batchAll(actions).get(); + + assertTrue(((Result) results.get(0)).getExists()); + assertEquals("b", + Bytes.toString(((Result) results.get(1)).getValue(FAMILY, Bytes.toBytes("B")))); + assertTrue(((Result) results.get(2)).getExists()); + assertFalse(((Result) results.get(3)).getExists()); + assertTrue(((Result) results.get(4)).isEmpty()); + + Result result = table.get(new Get(row1)).get(); + assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); + + result = table.get(new Get(row3)).get(); + assertNull(result.getValue(FAMILY, Bytes.toBytes("C"))); + assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); + + result = table.get(new Get(row4)).get(); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + + result = table.get(new Get(row5)).get(); + assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E")))); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java index f399e8619f0c..f4bf8263b502 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java @@ -20,12 +20,15 @@ import static org.hamcrest.CoreMatchers.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; +import org.apache.hadoop.hbase.CheckAndMutate; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -57,6 +60,9 @@ public class TestCheckAndMutate { private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static final byte[] ROWKEY = Bytes.toBytes("12345"); + private static final byte[] ROWKEY2 = Bytes.toBytes("67890"); + private static final byte[] ROWKEY3 = Bytes.toBytes("abcde"); + private static final byte[] ROWKEY4 = Bytes.toBytes("fghij"); private static final byte[] FAMILY = Bytes.toBytes("cf"); @Rule @@ -372,4 +378,307 @@ public void testCheckAndMutateWithNotSpecifyingCondition() throws Throwable { .thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))); } } + + @Test + public void testCheckAndMutateBatch() throws Throwable { + try (Table table = createTable()) { + table.put(Arrays.asList( + new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")), + new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")), + new Put(ROWKEY3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), + new Put(ROWKEY4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))); + + // Test for Put + CheckAndMutate checkAndMutate1 = CheckAndMutate.builder(ROWKEY, FAMILY) + .qualifier(Bytes.toBytes("A")) + .ifEquals(Bytes.toBytes("a")) + .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e"))); + + CheckAndMutate checkAndMutate2 = CheckAndMutate.builder(ROWKEY2, FAMILY) + .qualifier(Bytes.toBytes("B")) + .ifEquals(Bytes.toBytes("a")) + .build(new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f"))); + + boolean[] results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2)); + + assertTrue(results[0]); + assertFalse(results[1]); + + Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"))); + assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); + + result = table.get(new Get(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B"))); + assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); + + // Test for Delete + checkAndMutate1 = CheckAndMutate.builder(ROWKEY, FAMILY) + .qualifier(Bytes.toBytes("A")) + .ifEquals(Bytes.toBytes("e")) + .build(new Delete(ROWKEY)); + + checkAndMutate2 = CheckAndMutate.builder(ROWKEY2, FAMILY) + .qualifier(Bytes.toBytes("B")) + .ifEquals(Bytes.toBytes("a")) + .build(new Delete(ROWKEY2)); + + results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2)); + + assertTrue(results[0]); + assertFalse(results[1]); + + assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A")))); + + result = table.get(new Get(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B"))); + assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); + + // Test for RowMutations + checkAndMutate1 = CheckAndMutate.builder(ROWKEY3, FAMILY) + .qualifier(Bytes.toBytes("C")) + .ifEquals(Bytes.toBytes("c")) + .build(new RowMutations(ROWKEY3) + .add((Mutation) new Put(ROWKEY3) + .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))) + .add((Mutation) new Delete(ROWKEY3).addColumns(FAMILY, Bytes.toBytes("C")))); + + checkAndMutate2 = CheckAndMutate.builder(ROWKEY4, FAMILY) + .qualifier(Bytes.toBytes("D")) + .ifEquals(Bytes.toBytes("f")) + .build(new RowMutations(ROWKEY4) + .add((Mutation) new Put(ROWKEY4) + .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))) + .add((Mutation) new Delete(ROWKEY4).addColumns(FAMILY, Bytes.toBytes("D")))); + + results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2)); + + assertTrue(results[0]); + assertFalse(results[1]); + + result = table.get(new Get(ROWKEY3)); + assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); + assertNull(result.getValue(FAMILY, Bytes.toBytes("D"))); + + result = table.get(new Get(ROWKEY4)); + assertNull(result.getValue(FAMILY, Bytes.toBytes("F"))); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + } + } + + @Test + public void testCheckAndMutateBatch2() throws Throwable { + try (Table table = createTable()) { + table.put(Arrays.asList( + new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")), + new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")), + new Put(ROWKEY3).addColumn(FAMILY, Bytes.toBytes("C"), 100, Bytes.toBytes("c")), + new Put(ROWKEY4).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d")))); + + // Test for ifNotExists() + CheckAndMutate checkAndMutate1 = CheckAndMutate.builder(ROWKEY, FAMILY) + .qualifier(Bytes.toBytes("B")) + .ifNotExists() + .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e"))); + + CheckAndMutate checkAndMutate2 = CheckAndMutate.builder(ROWKEY2, FAMILY) + .qualifier(Bytes.toBytes("B")) + .ifNotExists() + .build(new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f"))); + + boolean[] results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2)); + + assertTrue(results[0]); + assertFalse(results[1]); + + Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"))); + assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); + + result = table.get(new Get(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B"))); + assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); + + // Test for ifMatches() + checkAndMutate1 = CheckAndMutate.builder(ROWKEY, FAMILY) + .qualifier(Bytes.toBytes("A")) + .ifMatches(CompareOperator.NOT_EQUAL, Bytes.toBytes("a")) + .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))); + + checkAndMutate2 = CheckAndMutate.builder(ROWKEY2, FAMILY) + .qualifier(Bytes.toBytes("B")) + .ifMatches(CompareOperator.GREATER, Bytes.toBytes("b")) + .build(new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f"))); + + results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2)); + + assertTrue(results[0]); + assertFalse(results[1]); + + result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"))); + assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); + + result = table.get(new Get(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B"))); + assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); + + // Test for timeRange() + checkAndMutate1 = CheckAndMutate.builder(ROWKEY3, FAMILY) + .qualifier(Bytes.toBytes("C")) + .timeRange(TimeRange.between(0, 101)) + .ifEquals(Bytes.toBytes("c")) + .build(new Put(ROWKEY3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("e"))); + + checkAndMutate2 = CheckAndMutate.builder(ROWKEY4, FAMILY) + .qualifier(Bytes.toBytes("D")) + .timeRange(TimeRange.between(0, 100)) + .ifEquals(Bytes.toBytes("d")) + .build(new Put(ROWKEY4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f"))); + + results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2)); + + assertTrue(results[0]); + assertFalse(results[1]); + + result = table.get(new Get(ROWKEY3).addColumn(FAMILY, Bytes.toBytes("C"))); + assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); + + result = table.get(new Get(ROWKEY4).addColumn(FAMILY, Bytes.toBytes("D"))); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + } + } + + @Test + public void testCheckAndMutateBatchWithFilter() throws Throwable { + try (Table table = createTable()) { + table.put(Arrays.asList( + new Put(ROWKEY) + .addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) + .addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")) + .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), + new Put(ROWKEY2) + .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")) + .addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")) + .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))); + + // Test for Put + CheckAndMutate checkAndMutate1 = CheckAndMutate.builder(ROWKEY, new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g"))); + + CheckAndMutate checkAndMutate2 = CheckAndMutate.builder(ROWKEY2, new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .build(new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h"))); + + boolean[] results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2)); + + assertTrue(results[0]); + assertFalse(results[1]); + + Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"))); + assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); + + result = table.get(new Get(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("F"))); + assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); + + // Test for Delete + checkAndMutate1 = CheckAndMutate.builder(ROWKEY, new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .build(new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("C"))); + + checkAndMutate2 = CheckAndMutate.builder(ROWKEY2, new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .build(new Delete(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("F"))); + + results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2)); + + assertTrue(results[0]); + assertFalse(results[1]); + + assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C")))); + + result = table.get(new Get(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("F"))); + assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); + + // Test for RowMutations + checkAndMutate1 = CheckAndMutate.builder(ROWKEY, new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .build(new RowMutations(ROWKEY) + .add((Mutation) new Put(ROWKEY) + .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))) + .add((Mutation) new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("A")))); + + checkAndMutate2 = CheckAndMutate.builder(ROWKEY2, new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .build(new RowMutations(ROWKEY2) + .add((Mutation) new Put(ROWKEY2) + .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("g"))) + .add((Mutation) new Delete(ROWKEY2).addColumns(FAMILY, Bytes.toBytes("D")))); + + results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2)); + + assertTrue(results[0]); + assertFalse(results[1]); + + result = table.get(new Get(ROWKEY)); + assertNull(result.getValue(FAMILY, Bytes.toBytes("A"))); + assertEquals("c", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); + + result = table.get(new Get(ROWKEY2)); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); + } + } + + @Test + public void testCheckAndMutateBatchWithFilterAndTimeRange() throws Throwable { + try (Table table = createTable()) { + table.put(Arrays.asList( + new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")) + .addColumn(FAMILY, Bytes.toBytes("B"), 100, Bytes.toBytes("b")) + .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), + new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d")) + .addColumn(FAMILY, Bytes.toBytes("E"), 100, Bytes.toBytes("e")) + .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))); + + CheckAndMutate checkAndMutate1 = CheckAndMutate.builder(ROWKEY, new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .timeRange(TimeRange.between(0, 101)) + .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g"))); + + CheckAndMutate checkAndMutate2 = CheckAndMutate.builder(ROWKEY2, new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL, + Bytes.toBytes("d")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL, + Bytes.toBytes("e")))) + .timeRange(TimeRange.between(0, 100)) + .build(new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h"))); + + boolean[] results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2)); + + assertTrue(results[0]); + assertFalse(results[1]); + + Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"))); + assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); + + result = table.get(new Get(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("F"))); + assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java index 3de5c1bf3ce4..159c93f859fd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java @@ -40,6 +40,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.CheckAndMutate; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -452,6 +453,62 @@ public void testBatchWithRowMutation() throws Exception { } } + @Test + public void testBatchWithCheckAndMutate() throws Exception { + try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { + byte[] row1 = Bytes.toBytes("row1"); + byte[] row2 = Bytes.toBytes("row2"); + byte[] row3 = Bytes.toBytes("row3"); + byte[] row4 = Bytes.toBytes("row4"); + byte[] row5 = Bytes.toBytes("row5"); + + table.put(Arrays.asList( + new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")), + new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")), + new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), + new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")), + new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))); + + CheckAndMutate checkAndMutate1 = CheckAndMutate.builder(row1, FAMILY) + .qualifier(Bytes.toBytes("A")) + .ifEquals(Bytes.toBytes("a")) + .build(new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("g"))); + Get get = new Get(row2).addColumn(FAMILY, Bytes.toBytes("B")); + RowMutations mutations = new RowMutations(row3) + .add((Mutation) new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C"))) + .add((Mutation) new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))); + CheckAndMutate checkAndMutate2 = CheckAndMutate.builder(row4, FAMILY) + .qualifier(Bytes.toBytes("D")) + .ifEquals(Bytes.toBytes("a")) + .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("h"))); + Put put = new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("f")); + + List actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put); + Object[] results = new Object[actions.size()]; + table.batch(actions, results); + + assertTrue(((Result) results[0]).getExists()); + assertEquals("b", + Bytes.toString(((Result) results[1]).getValue(FAMILY, Bytes.toBytes("B")))); + assertTrue(((Result) results[2]).getExists()); + assertFalse(((Result) results[3]).getExists()); + assertTrue(((Result) results[4]).isEmpty()); + + Result result = table.get(new Get(row1)); + assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); + + result = table.get(new Get(row3)); + assertNull(result.getValue(FAMILY, Bytes.toBytes("C"))); + assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); + + result = table.get(new Get(row4)); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + + result = table.get(new Get(row5)); + assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E")))); + } + } + @Test public void testHTableExistsMethodSingleRegionSingleGet() throws IOException, InterruptedException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java index 655225a776fa..dc8c4ef70c54 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java @@ -252,9 +252,8 @@ private static ClientProtos.MultiRequest createRequest(RowMutations rm, byte[] r actionBuilder.setMutation(mp); builder.addAction(actionBuilder.build()); } - ClientProtos.MultiRequest request = - ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build()) - .setCondition(condition).build(); + ClientProtos.MultiRequest request = ClientProtos.MultiRequest.newBuilder() + .addRegionAction(builder.setCondition(condition).build()).build(); return request; } diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java index 30b1fa1dbd98..795b2b36ce38 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java @@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CheckAndMutate; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; @@ -431,6 +432,16 @@ public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) throw new NotImplementedException("Implement later"); } + @Override + public boolean checkAndMutate(CheckAndMutate checkAndMutate) { + throw new NotImplementedException("Implement later"); + } + + @Override + public boolean[] checkAndMutate(List checkAndMutates) { + throw new NotImplementedException("Implement later"); + } + @Override public void mutateRow(RowMutations rm) throws IOException { TRowMutations tRowMutations = ThriftUtilities.rowMutationsFromHBase(rm);