Skip to content

Commit f2202ac

Browse files
committed
Avoid a copy during decompression with new BoundedDelegatingInputStream.
1 parent a0a3384 commit f2202ac

File tree

3 files changed

+122
-22
lines changed

3 files changed

+122
-22
lines changed
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hbase.io;
20+
21+
import java.io.IOException;
22+
import java.io.InputStream;
23+
24+
import org.apache.yetus.audience.InterfaceAudience;
25+
26+
/**
27+
* This is a stream that will only supply bytes from its delegate up to a certain limit.
28+
* When there is an attempt to set the position beyond that it will signal that the input
29+
* is finished.
30+
*/
31+
@InterfaceAudience.Private
32+
public class BoundedDelegatingInputStream extends DelegatingInputStream {
33+
34+
protected long limit;
35+
protected long pos;
36+
37+
public BoundedDelegatingInputStream(InputStream in, long limit) {
38+
super(in);
39+
this.limit = limit;
40+
this.pos = 0;
41+
}
42+
43+
public void setDelegate(InputStream in, long limit) {
44+
this.in = in;
45+
this.limit = limit;
46+
this.pos = 0;
47+
}
48+
49+
/**
50+
* Call the delegate's {@code read()} method if the current position is less than the limit.
51+
* @return the byte read or -1 if the end of stream or the limit has been reached.
52+
*/
53+
@Override
54+
public int read() throws IOException {
55+
if (pos >= limit) {
56+
return -1;
57+
}
58+
int result = in.read();
59+
pos++;
60+
return result;
61+
}
62+
63+
/**
64+
* Call the delegate's {@code read(byte[], int, int)} method if the current position is less
65+
* than the limit.
66+
* @param b read buffer
67+
* @param off Start offset
68+
* @param len The number of bytes to read
69+
* @return the number of bytes read or -1 if the end of stream or the limit has been reached.
70+
*/
71+
@Override
72+
public int read(final byte[] b, final int off, final int len) throws IOException {
73+
if (pos >= limit) {
74+
return -1;
75+
}
76+
long readLen = Math.min(len, limit - pos);
77+
int read = in.read(b, off, (int)readLen);
78+
if (read < 0) {
79+
return -1;
80+
}
81+
pos += read;
82+
return read;
83+
}
84+
85+
/**
86+
* Call the delegate's {@code skip(long)} method.
87+
* @param len the number of bytes to skip
88+
* @return the actual number of bytes skipped
89+
*/
90+
@Override
91+
public long skip(final long len) throws IOException {
92+
long skipped = in.skip(Math.min(len, limit - pos));
93+
pos += skipped;
94+
return skipped;
95+
}
96+
97+
/**
98+
* Call the delegate's {@code available()} method.
99+
* @return the delegate's available bytes if the current position is less than the limit,
100+
* or 0 otherwise
101+
*/
102+
@Override
103+
public int available() throws IOException {
104+
if (pos >= limit) {
105+
return 0;
106+
}
107+
int available = in.available();
108+
return (int) Math.min(available, limit - pos);
109+
}
110+
111+
}

hbase-common/src/main/java/org/apache/hadoop/hbase/io/DelegatingInputStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/**
1+
/*
22
* Licensed to the Apache Software Foundation (ASF) under one
33
* or more contributor license agreements. See the NOTICE file
44
* distributed with this work for additional information

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
package org.apache.hadoop.hbase.regionserver.wal;
2020

21-
import java.io.ByteArrayInputStream;
2221
import java.io.ByteArrayOutputStream;
2322
import java.io.IOException;
2423
import java.io.InputStream;
@@ -27,10 +26,9 @@
2726
import java.lang.reflect.InvocationTargetException;
2827
import java.util.EnumMap;
2928
import java.util.Map;
30-
import org.apache.commons.io.IOUtils;
3129
import org.apache.hadoop.conf.Configuration;
3230
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
33-
import org.apache.hadoop.hbase.io.DelegatingInputStream;
31+
import org.apache.hadoop.hbase.io.BoundedDelegatingInputStream;
3432
import org.apache.hadoop.hbase.io.TagCompressionContext;
3533
import org.apache.hadoop.hbase.io.compress.Compression;
3634
import org.apache.hadoop.hbase.io.util.Dictionary;
@@ -71,7 +69,7 @@ static class ValueCompressor {
7169
static final int IO_BUFFER_SIZE = 4096;
7270

7371
private final Compression.Algorithm algorithm;
74-
private DelegatingInputStream lowerIn;
72+
private BoundedDelegatingInputStream lowerIn;
7573
private ByteArrayOutputStream lowerOut;
7674
private InputStream compressedIn;
7775
private OutputStream compressedOut;
@@ -102,31 +100,22 @@ public byte[] compress(byte[] valueArray, int valueOffset, int valueLength)
102100
public int decompress(InputStream in, int inLength, byte[] outArray, int outOffset,
103101
int outLength) throws IOException {
104102

105-
// We handle input as a sequence of byte[] arrays (call them segments), with
106-
// DelegatingInputStream providing a way to switch in a new segment, wrapped in a
107-
// ByteArrayInputStream, when the old segment has been fully consumed.
108-
109-
// Originally I looked at using BoundedInputStream but you can't reuse/reset the
110-
// BIS instance, and we can't just create new streams each time around because
111-
// that would reset compression codec state, which must accumulate over all values
112-
// in the file in order to build the dictionary in the same way as the compressor
113-
// did.
114-
115-
// Read in all of the next segment of compressed bytes to process.
116-
byte[] inBuffer = new byte[inLength];
117-
IOUtils.readFully(in, inBuffer);
103+
// Our input is a sequence of bounded byte ranges (call them segments), with
104+
// BoundedDelegatingInputStream providing a way to switch in a new segment when the
105+
// previous segment has been fully consumed.
118106

119107
// Create the input streams here the first time around.
120108
if (compressedIn == null) {
121-
lowerIn = new DelegatingInputStream(new ByteArrayInputStream(inBuffer));
109+
lowerIn = new BoundedDelegatingInputStream(in, inLength);
122110
compressedIn = algorithm.createDecompressionStream(lowerIn, algorithm.getDecompressor(),
123111
IO_BUFFER_SIZE);
124112
} else {
125-
lowerIn.setDelegate(new ByteArrayInputStream(inBuffer));
113+
lowerIn.setDelegate(in, inLength);
126114
}
127115

128-
// Caller must handle short reads. With current Hadoop compression codecs all 'outLength'
129-
// bytes are read in here, so not an issue now.
116+
// Caller must handle short reads.
117+
// With current Hadoop compression codecs all 'outLength' bytes are read in here, so not
118+
// an issue for now.
130119
return compressedIn.read(outArray, outOffset, outLength);
131120
}
132121

0 commit comments

Comments
 (0)