Skip to content

Commit 9396d72

Browse files
committed
HBASE-29658: Add RISC-V Vector (RVV) optimizations
1 parent e575525 commit 9396d72

File tree

18 files changed

+6582
-0
lines changed

18 files changed

+6582
-0
lines changed

hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparatorImpl.java

Lines changed: 491 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package org.apache.hadoop.hbase.util;
2+
3+
import java.nio.ByteBuffer;
4+
import org.apache.yetus.audience.InterfaceAudience;
5+
6+
@InterfaceAudience.Private
7+
public class RVVByteBufferUtils {
8+
9+
static {
10+
try {
11+
System.loadLibrary("scan_rvv_jni");
12+
} catch (Throwable t) {
13+
// ignore; use availability checks
14+
}
15+
}
16+
17+
public static boolean available() {
18+
return ScanRVV.available();
19+
}
20+
21+
public static native int compareToRvv(ByteBuffer a, int aOffset, int aLen,
22+
ByteBuffer b, int bOffset, int bLen);
23+
24+
public static native int commonPrefixRvv(byte[] left, int leftOffset,
25+
byte[] right, int rightOffset,
26+
int maxLen);
27+
28+
public static native int findCommonPrefixRvv(ByteBuffer a, int aOffset, int aLen,
29+
ByteBuffer b, int bOffset, int bLen);
30+
31+
public static native int findCommonPrefixRvv(ByteBuffer a, int aOffset, int aLen,
32+
byte[] b, int bOffset, int bLen);
33+
34+
public static byte[] readBytesRvv(ByteBuffer buf) {
35+
int len = buf.remaining();
36+
byte[] dst = new byte[len];
37+
if (!available()) {
38+
if (buf.hasArray()) {
39+
System.arraycopy(buf.array(), buf.arrayOffset() + buf.position(), dst, 0, len);
40+
} else {
41+
int pos = buf.position();
42+
for (int i = 0; i < len; i++) {
43+
dst[i] = buf.get(pos + i);
44+
}
45+
}
46+
return dst;
47+
}
48+
if (buf.hasArray()) {
49+
System.arraycopy(buf.array(), buf.arrayOffset() + buf.position(), dst, 0, len);
50+
} else {
51+
ScanRVV.rvvMemcpy(dst, 0, buf, buf.position(), len);
52+
}
53+
return dst;
54+
}
55+
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
package org.apache.hadoop.hbase.util;
2+
3+
import java.nio.ByteBuffer;
4+
5+
import org.apache.yetus.audience.InterfaceAudience;
6+
7+
/**
8+
* RISC-V RVV 向量化扫描优化工具类
9+
*/
10+
@InterfaceAudience.Private
11+
public class ScanRVV {
12+
13+
private static boolean rvvEnabled = false;
14+
15+
static {
16+
try {
17+
System.loadLibrary("scan_rvv_jni");
18+
rvvEnabled = true;
19+
} catch (UnsatisfiedLinkError e) {
20+
rvvEnabled = false;
21+
} catch (Throwable t) {
22+
rvvEnabled = false;
23+
}
24+
}
25+
26+
public static native boolean isEnabled();
27+
28+
public static void setEnabled(boolean enabled) {
29+
rvvEnabled = enabled;
30+
}
31+
32+
public static boolean available() {
33+
if (!rvvEnabled) {
34+
return false;
35+
}
36+
try {
37+
return isEnabled();
38+
} catch (Throwable t) {
39+
return false;
40+
}
41+
}
42+
43+
public static native int compareCells(byte[] aKey, int aLen, byte[] bKey, int bLen);
44+
45+
public static native int compareKeyForNextRow(byte[] indexedKey, int idxLen, byte[] curKey, int curLen);
46+
47+
public static native int compareKeyForNextColumn(byte[] indexedKey, int idxLen, byte[] curKey, int curLen);
48+
49+
public static native int memcmp(byte[] a, int offsetA, int lengthA,
50+
byte[] b, int offsetB, int lengthB);
51+
52+
public static native boolean prefixMatch(byte[] a, int offsetA,
53+
byte[] b, int offsetB,
54+
int prefixLen);
55+
56+
public static int memcmp(byte[] a, byte[] b, int length) {
57+
if (a == null || b == null) {
58+
throw new IllegalArgumentException("Input arrays cannot be null");
59+
}
60+
if (length < 0) {
61+
throw new IllegalArgumentException("Length cannot be negative");
62+
}
63+
return memcmp(a, 0, length, b, 0, length);
64+
}
65+
66+
public static boolean prefixMatch(byte[] a, byte[] b, int prefixLen) {
67+
if (a == null || b == null) {
68+
return false;
69+
}
70+
if (prefixLen <= 0 || prefixLen > a.length || prefixLen > b.length) {
71+
return false;
72+
}
73+
return prefixMatch(a, 0, b, 0, prefixLen);
74+
}
75+
76+
public static native int rvvCommonPrefix(byte[] a, int offsetA, int lengthA,
77+
byte[] b, int offsetB, int lengthB);
78+
79+
public static native void rvvMemcpy(byte[] dst, int dstOffset,
80+
byte[] src, int srcOffset,
81+
int length);
82+
83+
public static void rvvMemcpy(byte[] dst, int dstOffset,
84+
ByteBuffer src, int srcOffset,
85+
int length) {
86+
if (dst == null || src == null) {
87+
throw new IllegalArgumentException("Input parameters cannot be null");
88+
}
89+
if (length < 0) {
90+
throw new IllegalArgumentException("Length cannot be negative");
91+
}
92+
93+
if (src.hasArray()) {
94+
System.arraycopy(src.array(), src.arrayOffset() + srcOffset, dst, dstOffset, length);
95+
} else {
96+
byte[] tmp = new byte[length];
97+
src.position(srcOffset);
98+
src.get(tmp);
99+
rvvMemcpy(dst, dstOffset, tmp, 0, length);
100+
}
101+
}
102+
103+
public static byte[] copyToArray(byte[] src, int offset, int length) {
104+
if (src == null) {
105+
throw new IllegalArgumentException("Source array cannot be null");
106+
}
107+
if (offset < 0 || length < 0 || offset + length > src.length) {
108+
throw new IllegalArgumentException("Invalid offset or length");
109+
}
110+
111+
byte[] dst = new byte[length];
112+
ScanRVV.rvvMemcpy(dst, 0, src, offset, length);
113+
return dst;
114+
}
115+
116+
public static byte[] copyToArray(ByteBuffer src) {
117+
if (src == null) {
118+
throw new IllegalArgumentException("Source ByteBuffer cannot be null");
119+
}
120+
121+
int len = src.remaining();
122+
byte[] dst = new byte[len];
123+
if (src.hasArray()) {
124+
System.arraycopy(src.array(), src.arrayOffset() + src.position(), dst, 0, len);
125+
} else {
126+
rvvMemcpy(dst, 0, src, src.position(), len);
127+
}
128+
return dst;
129+
}
130+
}

hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Compressor.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,21 @@ public class Lz4Compressor implements CanReinit, Compressor {
3939
protected boolean finish, finished;
4040
protected long bytesRead, bytesWritten;
4141

42+
<<<<<<< HEAD
4243
Lz4Compressor(int bufferSize) {
4344
compressor = LZ4Factory.fastestInstance().fastCompressor();
4445
this.bufferSize = bufferSize;
4546
this.inBuf = ByteBuffer.allocate(bufferSize);
4647
this.outBuf = ByteBuffer.allocate(bufferSize);
48+
=======
49+
private final boolean useNative = Lz4Native.available();
50+
51+
Lz4Compressor(int bufferSize) {
52+
compressor = LZ4Factory.fastestInstance().fastCompressor();
53+
this.bufferSize = bufferSize;
54+
this.inBuf = ByteBuffer.allocateDirect(bufferSize);
55+
this.outBuf = ByteBuffer.allocateDirect(bufferSize);
56+
>>>>>>> rvv-optimization
4757
this.outBuf.position(bufferSize);
4858
}
4959

@@ -61,6 +71,7 @@ public int compress(byte[] b, int off, int len) throws IOException {
6171
if (inBuf.position() > 0) {
6272
inBuf.flip();
6373
int uncompressed = inBuf.remaining();
74+
<<<<<<< HEAD
6475
int needed = maxCompressedLength(uncompressed);
6576
// Can we decompress directly into the provided array?
6677
ByteBuffer writeBuffer;
@@ -87,11 +98,64 @@ public int compress(byte[] b, int off, int len) throws IOException {
8798
finished = true;
8899
if (!direct) {
89100
outBuf.flip();
101+
=======
102+
103+
if (useNative && inBuf.isDirect() && outBuf.isDirect()) {
104+
// 如果你现在用的是 heap ByteBuffer,改成 direct 分配(见 reset/reinit 部分)
105+
int needed = Lz4Native.maxCompressedLength(uncompressed);
106+
if (outBuf.capacity() < needed) {
107+
needed = org.apache.hadoop.hbase.io.compress.CompressionUtil.roundInt2(needed);
108+
outBuf = ByteBuffer.allocateDirect(needed);
109+
} else {
110+
outBuf.clear();
111+
}
112+
int written = Lz4Native.compressDirect(inBuf, inBuf.position(), uncompressed,
113+
outBuf, outBuf.position(), outBuf.remaining());
114+
if (written < 0)
115+
throw new IOException("LZ4 native compress failed: " + written);
116+
bytesWritten += written;
117+
inBuf.clear();
118+
finished = true;
119+
outBuf.limit(outBuf.position() + written);
120+
>>>>>>> rvv-optimization
90121
int n = Math.min(written, len);
91122
outBuf.get(b, off, n);
92123
return n;
93124
} else {
125+
<<<<<<< HEAD
94126
return written;
127+
=======
128+
// 走原 lz4-java 路径
129+
int needed = maxCompressedLength(uncompressed);
130+
ByteBuffer writeBuffer;
131+
boolean direct = false;
132+
if (len <= needed) {
133+
writeBuffer = ByteBuffer.wrap(b, off, len);
134+
direct = true;
135+
} else {
136+
if (outBuf.capacity() < needed) {
137+
needed = org.apache.hadoop.hbase.io.compress.CompressionUtil.roundInt2(needed);
138+
outBuf = ByteBuffer.allocate(needed);
139+
} else {
140+
outBuf.clear();
141+
}
142+
writeBuffer = outBuf;
143+
}
144+
final int oldPos = writeBuffer.position();
145+
compressor.compress(inBuf, writeBuffer);
146+
final int written = writeBuffer.position() - oldPos;
147+
bytesWritten += written;
148+
inBuf.clear();
149+
finished = true;
150+
if (!direct) {
151+
outBuf.flip();
152+
int n = Math.min(written, len);
153+
outBuf.get(b, off, n);
154+
return n;
155+
} else {
156+
return written;
157+
}
158+
>>>>>>> rvv-optimization
95159
}
96160
} else {
97161
finished = true;
@@ -136,8 +200,13 @@ public void reinit(Configuration conf) {
136200
int newBufferSize = Lz4Codec.getBufferSize(conf);
137201
if (bufferSize != newBufferSize) {
138202
bufferSize = newBufferSize;
203+
<<<<<<< HEAD
139204
this.inBuf = ByteBuffer.allocate(bufferSize);
140205
this.outBuf = ByteBuffer.allocate(bufferSize);
206+
=======
207+
this.inBuf = ByteBuffer.allocateDirect(bufferSize);
208+
this.outBuf = ByteBuffer.allocateDirect(bufferSize);
209+
>>>>>>> rvv-optimization
141210
}
142211
}
143212
reset();
@@ -162,11 +231,20 @@ public void setDictionary(byte[] b, int off, int len) {
162231
@Override
163232
public void setInput(byte[] b, int off, int len) {
164233
if (inBuf.remaining() < len) {
234+
<<<<<<< HEAD
165235
// Get a new buffer that can accomodate the accumulated input plus the additional
166236
// input that would cause a buffer overflow without reallocation.
167237
// This condition should be fortunately rare, because it is expensive.
168238
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
169239
ByteBuffer newBuf = ByteBuffer.allocate(needed);
240+
=======
241+
// Get a new buffer that can accomodate the accumulated input plus the
242+
// additional
243+
// input that would cause a buffer overflow without reallocation.
244+
// This condition should be fortunately rare, because it is expensive.
245+
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
246+
ByteBuffer newBuf = ByteBuffer.allocateDirect(needed);
247+
>>>>>>> rvv-optimization
170248
inBuf.flip();
171249
newBuf.put(inBuf);
172250
inBuf = newBuf;
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package org.apache.hadoop.hbase.io.compress.lz4;
2+
3+
import java.nio.ByteBuffer;
4+
5+
final class Lz4Native {
6+
static {
7+
NativeLoader.load();
8+
}
9+
10+
static boolean available() {
11+
return NativeLoader.isLoaded();
12+
}
13+
14+
static boolean isAvailable() {
15+
return available();
16+
}
17+
18+
static native int maxCompressedLength(int srcLen);
19+
static native int compressDirect(ByteBuffer src, int srcOff, int srcLen,
20+
ByteBuffer dst, int dstOff, int dstCap);
21+
static native int decompressDirect(ByteBuffer src, int srcOff, int srcLen,
22+
ByteBuffer dst, int dstOff, int dstCap);
23+
24+
25+
private Lz4Native() {}
26+
}
27+
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package org.apache.hadoop.hbase.io.compress.lz4;
2+
3+
import java.io.File;
4+
5+
final class NativeLoader {
6+
private static volatile boolean loaded = false;
7+
8+
static synchronized boolean load() {
9+
if (loaded) return true;
10+
11+
try {
12+
// 尝试使用环境变量
13+
String lib = System.getenv("HBASE_LZ4RVV_LIB");
14+
if (lib != null && !lib.isEmpty()) {
15+
File f = new File(lib);
16+
System.load(f.getAbsolutePath());
17+
} else {
18+
// 使用相对路径加载
19+
String relativePath = "libhbase_lz4rvv.so";
20+
File f = new File(relativePath);
21+
System.load(f.getAbsolutePath());
22+
}
23+
loaded = true;
24+
} catch (Throwable t) {
25+
loaded = false;
26+
System.err.println("[NativeLoader ERROR] Failed to load native library: " + t);
27+
t.printStackTrace();
28+
}
29+
30+
return loaded;
31+
}
32+
33+
static boolean isLoaded() {
34+
return loaded;
35+
}
36+
37+
private NativeLoader() {}
38+
}
39+

0 commit comments

Comments
 (0)