Skip to content

Commit 6c5ebff

Browse files
committed
HBASE-21725 Implement BufferedMutator Based on AsyncBufferedMutator
1 parent f47f6df commit 6c5ebff

File tree

6 files changed

+309
-90
lines changed

6 files changed

+309
-90
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,11 @@
6262
public interface BufferedMutator extends Closeable {
6363
/**
6464
* Key to use setting non-default BufferedMutator implementation in Configuration.
65+
* <p/>
66+
* @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it
67+
* any more.
6568
*/
69+
@Deprecated
6670
String CLASSNAME_KEY = "hbase.client.bufferedmutator.classname";
6771

6872
/**
@@ -179,12 +183,18 @@ default long getWriteBufferPeriodicFlushTimerTickMs() {
179183

180184
/**
181185
* Set rpc timeout for this mutator instance
186+
* @deprecated Since 3.0.0, will be removed in 4.0.0. Please set this through the
187+
* {@link BufferedMutatorParams}.
182188
*/
189+
@Deprecated
183190
void setRpcTimeout(int timeout);
184191

185192
/**
186193
* Set operation timeout for this mutator instance
194+
* @deprecated Since 3.0.0, will be removed in 4.0.0. Please set this through the
195+
* {@link BufferedMutatorParams}.
187196
*/
197+
@Deprecated
188198
void setOperationTimeout(int timeout);
189199

190200
/**
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.client;
19+
20+
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
21+
22+
import java.io.IOException;
23+
import java.util.ArrayList;
24+
import java.util.Collections;
25+
import java.util.List;
26+
import java.util.concurrent.CompletableFuture;
27+
import java.util.concurrent.CompletionException;
28+
import java.util.concurrent.ConcurrentLinkedQueue;
29+
import java.util.regex.Matcher;
30+
import java.util.regex.Pattern;
31+
import java.util.stream.Collectors;
32+
import org.apache.hadoop.conf.Configuration;
33+
import org.apache.hadoop.hbase.TableName;
34+
import org.apache.hadoop.hbase.util.Pair;
35+
import org.apache.yetus.audience.InterfaceAudience;
36+
37+
/**
38+
* {@link BufferedMutator} implementation based on {@link AsyncBufferedMutator}.
39+
*/
40+
@InterfaceAudience.Private
41+
class BufferedMutatorOverAsyncBufferedMutator implements BufferedMutator {
42+
43+
private final AsyncBufferedMutator mutator;
44+
45+
private final ExceptionListener listener;
46+
47+
private List<CompletableFuture<Void>> futures = new ArrayList<>();
48+
49+
private final ConcurrentLinkedQueue<Pair<Mutation, Throwable>> errors =
50+
new ConcurrentLinkedQueue<>();
51+
52+
private final static int BUFFERED_FUTURES_THRESHOLD = 1024;
53+
54+
BufferedMutatorOverAsyncBufferedMutator(AsyncBufferedMutator mutator,
55+
ExceptionListener listener) {
56+
this.mutator = mutator;
57+
this.listener = listener;
58+
}
59+
60+
@Override
61+
public TableName getName() {
62+
return mutator.getName();
63+
}
64+
65+
@Override
66+
public Configuration getConfiguration() {
67+
return mutator.getConfiguration();
68+
}
69+
70+
@Override
71+
public void mutate(Mutation mutation) throws IOException {
72+
mutate(Collections.singletonList(mutation));
73+
}
74+
75+
private static final Pattern ADDR_MSG_MATCHER = Pattern.compile("Call to (\\S+) failed");
76+
77+
// not always work, so may return an empty string
78+
private String getHostnameAndPort(Throwable error) {
79+
Matcher matcher = ADDR_MSG_MATCHER.matcher(error.getMessage());
80+
if (matcher.matches()) {
81+
return matcher.group(1);
82+
} else {
83+
return "";
84+
}
85+
}
86+
87+
private RetriesExhaustedWithDetailsException makeError() {
88+
List<Row> rows = new ArrayList<>();
89+
List<Throwable> throwables = new ArrayList<>();
90+
List<String> hostnameAndPorts = new ArrayList<>();
91+
for (;;) {
92+
Pair<Mutation, Throwable> pair = errors.poll();
93+
if (pair == null) {
94+
break;
95+
}
96+
rows.add(pair.getFirst());
97+
throwables.add(pair.getSecond());
98+
hostnameAndPorts.add(getHostnameAndPort(pair.getSecond()));
99+
}
100+
return new RetriesExhaustedWithDetailsException(throwables, rows, hostnameAndPorts);
101+
}
102+
103+
@Override
104+
public void mutate(List<? extends Mutation> mutations) throws IOException {
105+
List<CompletableFuture<Void>> toBuffered = new ArrayList<>();
106+
List<CompletableFuture<Void>> fs = mutator.mutate(mutations);
107+
for (int i = 0, n = fs.size(); i < n; i++) {
108+
CompletableFuture<Void> toComplete = new CompletableFuture<>();
109+
final int index = i;
110+
addListener(fs.get(index), (r, e) -> {
111+
if (e != null) {
112+
errors.add(Pair.newPair(mutations.get(index), e));
113+
toComplete.completeExceptionally(e);
114+
} else {
115+
toComplete.complete(r);
116+
}
117+
});
118+
toBuffered.add(toComplete);
119+
}
120+
synchronized (this) {
121+
futures.addAll(toBuffered);
122+
if (futures.size() > BUFFERED_FUTURES_THRESHOLD) {
123+
tryCompleteFuture();
124+
}
125+
if (!errors.isEmpty()) {
126+
RetriesExhaustedWithDetailsException error = makeError();
127+
listener.onException(error, this);
128+
}
129+
}
130+
}
131+
132+
private void tryCompleteFuture() {
133+
futures = futures.stream().filter(f -> !f.isDone()).collect(Collectors.toList());
134+
}
135+
136+
@Override
137+
public void close() throws IOException {
138+
flush();
139+
mutator.close();
140+
}
141+
142+
@Override
143+
public void flush() throws IOException {
144+
mutator.flush();
145+
synchronized (this) {
146+
List<CompletableFuture<Void>> toComplete = this.futures;
147+
this.futures = new ArrayList<>();
148+
try {
149+
CompletableFuture.allOf(toComplete.toArray(new CompletableFuture<?>[toComplete.size()]))
150+
.join();
151+
} catch (CompletionException e) {
152+
// just ignore, we will record the actual error in the errors field
153+
}
154+
if (!errors.isEmpty()) {
155+
RetriesExhaustedWithDetailsException error = makeError();
156+
listener.onException(error, this);
157+
}
158+
}
159+
}
160+
161+
@Override
162+
public long getWriteBufferSize() {
163+
return mutator.getWriteBufferSize();
164+
}
165+
166+
@Override
167+
public void setRpcTimeout(int timeout) {
168+
// no effect
169+
}
170+
171+
@Override
172+
public void setOperationTimeout(int timeout) {
173+
// no effect
174+
}
175+
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -101,13 +101,21 @@ public BufferedMutatorParams setWriteBufferPeriodicFlushTimeoutMs(long timeoutMs
101101
return this;
102102
}
103103

104+
/**
105+
* @deprecated Since 3.0.0, will be removed in 4.0.0. We use a common timer in the whole client
106+
* implementation so you can not set it any more.
107+
*/
108+
@Deprecated
104109
public long getWriteBufferPeriodicFlushTimerTickMs() {
105110
return writeBufferPeriodicFlushTimerTickMs;
106111
}
107112

108113
/**
109114
* Set the TimerTick how often the buffer timeout if checked.
115+
* @deprecated Since 3.0.0, will be removed in 4.0.0. We use a common timer in the whole client
116+
* implementation so you can not set it any more.
110117
*/
118+
@Deprecated
111119
public BufferedMutatorParams setWriteBufferPeriodicFlushTimerTickMs(long timerTickMs) {
112120
this.writeBufferPeriodicFlushTimerTickMs = timerTickMs;
113121
return this;
@@ -141,17 +149,23 @@ public BufferedMutatorParams pool(ExecutorService pool) {
141149
}
142150

143151
/**
144-
* @return Name of the class we will use when we construct a
145-
* {@link BufferedMutator} instance or null if default implementation.
152+
* @return Name of the class we will use when we construct a {@link BufferedMutator} instance or
153+
* null if default implementation.
154+
* @deprecated Since 3.0.0, will be removed in 4.0.0. You can not set it any more as the
155+
* implementation has to use too many internal stuffs in HBase.
146156
*/
157+
@Deprecated
147158
public String getImplementationClassName() {
148159
return this.implementationClassName;
149160
}
150161

151162
/**
152163
* Specify a BufferedMutator implementation other than the default.
153164
* @param implementationClassName Name of the BufferedMutator implementation class
165+
* @deprecated Since 3.0.0, will be removed in 4.0.0. You can not set it any more as the
166+
* implementation has to use too many internal stuffs in HBase.
154167
*/
168+
@Deprecated
155169
public BufferedMutatorParams implementationClassName(String implementationClassName) {
156170
this.implementationClassName = implementationClassName;
157171
return this;
@@ -169,11 +183,6 @@ public BufferedMutatorParams listener(BufferedMutator.ExceptionListener listener
169183
return this;
170184
}
171185

172-
/*
173-
* (non-Javadoc)
174-
*
175-
* @see java.lang.Object#clone()
176-
*/
177186
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="CN_IDIOM_NO_SUPER_CALL",
178187
justification="The clone below is complete")
179188
@Override

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,24 @@ public Configuration getConfiguration() {
8787

8888
@Override
8989
public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
90-
return oldConn.getBufferedMutator(params);
90+
AsyncBufferedMutatorBuilder builder = conn.getBufferedMutatorBuilder(params.getTableName());
91+
if (params.getRpcTimeout() != BufferedMutatorParams.UNSET) {
92+
builder.setRpcTimeout(params.getRpcTimeout(), TimeUnit.MILLISECONDS);
93+
}
94+
if (params.getOperationTimeout() != BufferedMutatorParams.UNSET) {
95+
builder.setOperationTimeout(params.getOperationTimeout(), TimeUnit.MILLISECONDS);
96+
}
97+
if (params.getWriteBufferSize() != BufferedMutatorParams.UNSET) {
98+
builder.setWriteBufferSize(params.getWriteBufferSize());
99+
}
100+
if (params.getWriteBufferPeriodicFlushTimeoutMs() != BufferedMutatorParams.UNSET) {
101+
builder.setWriteBufferPeriodicFlush(params.getWriteBufferPeriodicFlushTimeoutMs(),
102+
TimeUnit.MILLISECONDS);
103+
}
104+
if (params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET) {
105+
builder.setMaxKeyValueSize(params.getMaxKeyValueSize());
106+
}
107+
return new BufferedMutatorOverAsyncBufferedMutator(builder.build(), params.getListener());
91108
}
92109

93110
@Override

hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java

Lines changed: 0 additions & 82 deletions
This file was deleted.

0 commit comments

Comments
 (0)