Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private void addPair(T compressor, E decompressor, String name) {
builder.add(new TesterPair<T, E>(name, compressor, decompressor));
}

public void test() throws InstantiationException, IllegalAccessException {
public void test() throws Exception {
pairs = builder.build();
pairs = assertionDelegate.filterOnAssumeWhat(pairs);

Expand Down Expand Up @@ -287,47 +287,45 @@ private boolean checkSetInputArrayIndexOutOfBoundsException(

@Override
public void assertCompression(String name, Compressor compressor,
Decompressor decompressor, byte[] rawData) {
Decompressor decompressor, byte[] rawData) throws Exception {

int cSize = 0;
int decompressedSize = 0;
byte[] compressedResult = new byte[rawData.length];
// Snappy compression can increase data size
int maxCompressedLength = 32 + rawData.length + rawData.length/6;
byte[] compressedResult = new byte[maxCompressedLength];
byte[] decompressedBytes = new byte[rawData.length];
try {
assertTrue(
joiner.join(name, "compressor.needsInput before error !!!"),
compressor.needsInput());
assertTrue(
assertTrue(
joiner.join(name, "compressor.needsInput before error !!!"),
compressor.needsInput());
assertEquals(
joiner.join(name, "compressor.getBytesWritten before error !!!"),
compressor.getBytesWritten() == 0);
compressor.setInput(rawData, 0, rawData.length);
compressor.finish();
while (!compressor.finished()) {
cSize += compressor.compress(compressedResult, 0,
compressedResult.length);
}
compressor.reset();

assertTrue(
joiner.join(name, "decompressor.needsInput() before error !!!"),
decompressor.needsInput());
decompressor.setInput(compressedResult, 0, cSize);
assertFalse(
joiner.join(name, "decompressor.needsInput() after error !!!"),
decompressor.needsInput());
while (!decompressor.finished()) {
decompressedSize = decompressor.decompress(decompressedBytes, 0,
decompressedBytes.length);
}
decompressor.reset();
assertTrue(joiner.join(name, " byte size not equals error !!!"),
decompressedSize == rawData.length);
assertArrayEquals(
joiner.join(name, " byte arrays not equals error !!!"), rawData,
decompressedBytes);
} catch (Exception ex) {
fail(joiner.join(name, ex.getMessage()));
0, compressor.getBytesWritten());
compressor.setInput(rawData, 0, rawData.length);
compressor.finish();
while (!compressor.finished()) {
cSize += compressor.compress(compressedResult, 0,
compressedResult.length);
}
compressor.reset();

assertTrue(
joiner.join(name, "decompressor.needsInput() before error !!!"),
decompressor.needsInput());
decompressor.setInput(compressedResult, 0, cSize);
assertFalse(
joiner.join(name, "decompressor.needsInput() after error !!!"),
decompressor.needsInput());
while (!decompressor.finished()) {
decompressedSize = decompressor.decompress(decompressedBytes, 0,
decompressedBytes.length);
}
decompressor.reset();
assertEquals(joiner.join(name, " byte size not equals error !!!"),
rawData.length, decompressedSize);
assertArrayEquals(
joiner.join(name, " byte arrays not equals error !!!"), rawData,
decompressedBytes);
}
}),

Expand Down Expand Up @@ -519,6 +517,6 @@ abstract static class TesterCompressionStrategy {
protected final Logger logger = Logger.getLogger(getClass());

abstract void assertCompression(String name, Compressor compressor,
Decompressor decompressor, byte[] originalRawData);
Decompressor decompressor, byte[] originalRawData) throws Exception;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.io.compress.snappy;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -44,11 +45,16 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.junit.Assume.*;

public class TestSnappyCompressorDecompressor {

public static final Logger LOG =
LoggerFactory.getLogger(TestSnappyCompressorDecompressor.class);

@Before
public void before() {
assumeTrue(SnappyCodec.isNativeCodeLoaded());
Expand Down Expand Up @@ -167,40 +173,41 @@ public void testSnappyDecompressorCompressAIOBException() {
}

@Test
public void testSnappyCompressDecompress() {
public void testSnappyCompressDecompress() throws Exception {
int BYTE_SIZE = 1024 * 54;
byte[] bytes = BytesGenerator.get(BYTE_SIZE);
SnappyCompressor compressor = new SnappyCompressor();
try {
compressor.setInput(bytes, 0, bytes.length);
assertTrue("SnappyCompressDecompress getBytesRead error !!!",
compressor.getBytesRead() > 0);
assertTrue(
"SnappyCompressDecompress getBytesWritten before compress error !!!",
compressor.getBytesWritten() == 0);

byte[] compressed = new byte[BYTE_SIZE];
int cSize = compressor.compress(compressed, 0, compressed.length);
assertTrue(
"SnappyCompressDecompress getBytesWritten after compress error !!!",
compressor.getBytesWritten() > 0);

SnappyDecompressor decompressor = new SnappyDecompressor(BYTE_SIZE);
// set as input for decompressor only compressed data indicated with cSize
decompressor.setInput(compressed, 0, cSize);
byte[] decompressed = new byte[BYTE_SIZE];
decompressor.decompress(decompressed, 0, decompressed.length);

assertTrue("testSnappyCompressDecompress finished error !!!",
decompressor.finished());
Assert.assertArrayEquals(bytes, decompressed);
compressor.reset();
decompressor.reset();
assertTrue("decompressor getRemaining error !!!",
decompressor.getRemaining() == 0);
} catch (Exception e) {
fail("testSnappyCompressDecompress ex error!!!");
}
compressor.setInput(bytes, 0, bytes.length);
assertTrue("SnappyCompressDecompress getBytesRead error !!!",
compressor.getBytesRead() > 0);
assertEquals(
"SnappyCompressDecompress getBytesWritten before compress error !!!",
0, compressor.getBytesWritten());

// snappy compression may increase data size.
// This calculation comes from "Snappy::MaxCompressedLength(size_t)"
int maxSize = 32 + BYTE_SIZE + BYTE_SIZE / 6;
byte[] compressed = new byte[maxSize];
int cSize = compressor.compress(compressed, 0, compressed.length);
LOG.info("input size: {}", BYTE_SIZE);
LOG.info("compressed size: {}", cSize);
assertTrue(
"SnappyCompressDecompress getBytesWritten after compress error !!!",
compressor.getBytesWritten() > 0);

SnappyDecompressor decompressor = new SnappyDecompressor();
// set as input for decompressor only compressed data indicated with cSize
decompressor.setInput(compressed, 0, cSize);
byte[] decompressed = new byte[BYTE_SIZE];
decompressor.decompress(decompressed, 0, decompressed.length);

assertTrue("testSnappyCompressDecompress finished error !!!",
decompressor.finished());
Assert.assertArrayEquals(bytes, decompressed);
compressor.reset();
decompressor.reset();
assertEquals("decompressor getRemaining error !!!",
0, decompressor.getRemaining());
}

@Test
Expand Down Expand Up @@ -278,7 +285,38 @@ public void testSnappyBlockCompression() {
fail("testSnappyBlockCompression ex error !!!");
}
}


@Test
// The buffer size is smaller than the input.
public void testSnappyCompressDecompressWithSmallBuffer() throws Exception {
int inputSize = 1024 * 50;
int bufferSize = 512;
ByteArrayOutputStream out = new ByteArrayOutputStream();
byte[] buffer = new byte[bufferSize];
byte[] input = BytesGenerator.get(inputSize);

SnappyCompressor compressor = new SnappyCompressor();
compressor.setInput(input, 0, inputSize);
compressor.finish();
while (!compressor.finished()) {
int len = compressor.compress(buffer, 0, buffer.length);
out.write(buffer, 0, len);
}
byte[] compressed = out.toByteArray();
assertThat(compressed).hasSizeGreaterThan(0);
out.reset();

SnappyDecompressor decompressor = new SnappyDecompressor();
decompressor.setInput(compressed, 0, compressed.length);
while (!decompressor.finished()) {
int len = decompressor.decompress(buffer, 0, buffer.length);
out.write(buffer, 0, len);
}
byte[] decompressed = out.toByteArray();

assertThat(decompressed).isEqualTo(input);
}

private void compressDecompressLoop(int rawDataSize) throws IOException {
byte[] rawData = BytesGenerator.get(rawDataSize);
byte[] compressedResult = new byte[rawDataSize+20];
Expand Down