From 71d1d683d0a881ea356257a07c72cad13bf50c82 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 5 Nov 2025 17:02:51 +0000 Subject: [PATCH 1/4] Add generic truncated stream test to all decoders Added a generic truncated stream test to the test_cases! macro that automatically tests all decoders (bzip2, gzip, deflate, zlib, xz, lzma, lz4, zstd, brotli) for proper handling of incomplete streams. The test compresses data, truncates it, then attempts decompression. Decoders should return UnexpectedEof errors for truncated streams instead of silently accepting incomplete data. This test currently fails for bzip2, lz4, and zstd decoders, which will be fixed in subsequent commits. --- .../tests/utils/test_cases.rs | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/crates/async-compression/tests/utils/test_cases.rs b/crates/async-compression/tests/utils/test_cases.rs index f32b0a82..af79c4b1 100644 --- a/crates/async-compression/tests/utils/test_cases.rs +++ b/crates/async-compression/tests/utils/test_cases.rs @@ -231,6 +231,26 @@ macro_rules! io_test_cases { assert_eq!(output, &[1, 2, 3, 4, 5, 6, 6, 5, 4, 3, 2, 1][..]); } + + #[test] + #[ntest::timeout(1000)] + fn truncated() { + let compressed = sync::compress(&[1, 2, 3, 4, 5, 6]); + + // Truncate the compressed data (remove last 20 bytes or half, whichever is less) + let truncate_amount = std::cmp::min(20, compressed.len() / 2); + let truncated = &compressed[..compressed.len() - truncate_amount]; + + let input = InputStream::new(vec![truncated.to_vec()]); + + // Try to decompress - should get an error for incomplete stream + // The error manifests as a panic when read::to_vec calls unwrap() + let result = + std::panic::catch_unwind(|| bufread::decompress(bufread::from(&input))); + + // Should fail for truncated stream + assert!(result.is_err(), "Expected error for truncated stream"); + } } } From a82a5c908c8a63f2e2bf5bc3ef263e4cbebe255a Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 5 Nov 2025 17:03:04 +0000 Subject: [PATCH 2/4] Fix BzDecoder to propagate UnexpectedEof error on truncated streams Fixes #411 The async BzDecoder was silently accepting truncated bzip2 streams, returning Ok(0) instead of raising an error. This contrasts with the synchronous bzip2::read::BzDecoder which properly returns an UnexpectedEof error. Added state tracking to BzDecoder: - Added stream_ended field to track if Status::StreamEnd was received - Modified decode() to set stream_ended = true on Status::StreamEnd - Updated finish() to check stream_ended and return UnexpectedEof if false This ensures applications cannot accidentally accept corrupted or incomplete compressed data as valid, matching the behavior of the synchronous decoder. The generic truncated test now passes for bzip2. --- crates/compression-codecs/src/bzip2/decoder.rs | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/crates/compression-codecs/src/bzip2/decoder.rs b/crates/compression-codecs/src/bzip2/decoder.rs index 537549e9..26fa52b1 100644 --- a/crates/compression-codecs/src/bzip2/decoder.rs +++ b/crates/compression-codecs/src/bzip2/decoder.rs @@ -5,6 +5,7 @@ use std::{fmt, io}; pub struct BzDecoder { decompress: Decompress, + stream_ended: bool, } impl fmt::Debug for BzDecoder { @@ -22,6 +23,7 @@ impl Default for BzDecoder { fn default() -> Self { Self { decompress: Decompress::new(false), + stream_ended: false, } } } @@ -49,6 +51,11 @@ impl BzDecoder { input.advance((self.decompress.total_in() - prior_in) as usize); output.advance((self.decompress.total_out() - prior_out) as usize); + // Track when stream has properly ended + if status == Status::StreamEnd { + self.stream_ended = true; + } + Ok(status) } } @@ -56,6 +63,7 @@ impl BzDecoder { impl DecodeV2 for BzDecoder { fn reinit(&mut self) -> io::Result<()> { self.decompress = Decompress::new(false); + self.stream_ended = false; Ok(()) } @@ -101,6 +109,13 @@ impl DecodeV2 for BzDecoder { } fn finish(&mut self, _output: &mut WriteBuffer<'_>) -> io::Result { - Ok(true) + if self.stream_ended { + Ok(true) + } else { + Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "bzip2 stream did not finish", + )) + } } } From 7ccef0b5b940e4782eafc10ec0449c1197604a5e Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 5 Nov 2025 17:03:16 +0000 Subject: [PATCH 3/4] Fix Lz4Decoder to propagate UnexpectedEof error on truncated streams The LZ4 decoder was silently accepting truncated streams by not validating stream completion in finish(). This issue was discovered by the generic truncated stream test. Added state tracking to Lz4Decoder: - Added stream_ended field to track if remaining == 0 was seen - Modified decode() to set stream_ended = true when stream completes - Updated finish() to check stream_ended and return UnexpectedEof if false This matches the behavior of other decoders (bzip2, gzip, etc.) and ensures applications cannot accidentally accept corrupted or incomplete LZ4 data as valid. The generic truncated test now passes for LZ4. --- crates/compression-codecs/src/lz4/decoder.rs | 21 ++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/crates/compression-codecs/src/lz4/decoder.rs b/crates/compression-codecs/src/lz4/decoder.rs index aa7c71d1..66f3fc86 100644 --- a/crates/compression-codecs/src/lz4/decoder.rs +++ b/crates/compression-codecs/src/lz4/decoder.rs @@ -17,6 +17,7 @@ struct DecoderContext { #[derive(Debug)] pub struct Lz4Decoder { ctx: Unshared, + stream_ended: bool, } impl DecoderContext { @@ -37,6 +38,7 @@ impl Default for Lz4Decoder { fn default() -> Self { Self { ctx: Unshared::new(DecoderContext::new().unwrap()), + stream_ended: false, } } } @@ -50,6 +52,7 @@ impl Lz4Decoder { impl DecodeV2 for Lz4Decoder { fn reinit(&mut self) -> Result<()> { unsafe { LZ4F_resetDecompressionContext(self.ctx.get_mut().ctx) }; + self.stream_ended = false; Ok(()) } @@ -74,7 +77,12 @@ impl DecodeV2 for Lz4Decoder { }; input.advance(input_size); output.advance(output_size); - Ok(remaining == 0) + + let finished = remaining == 0; + if finished { + self.stream_ended = true; + } + Ok(finished) } fn flush(&mut self, output: &mut WriteBuffer<'_>) -> Result { @@ -92,6 +100,15 @@ impl DecodeV2 for Lz4Decoder { } fn finish(&mut self, output: &mut WriteBuffer<'_>) -> Result { - self.flush(output) + self.flush(output)?; + + if self.stream_ended { + Ok(true) + } else { + Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "lz4 stream did not finish", + )) + } } } From 64b93922f025da880cf33e30ce72c4df674960da Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 5 Nov 2025 17:03:39 +0000 Subject: [PATCH 4/4] Fix ZstdDecoder to propagate UnexpectedEof error on truncated streams The Zstd decoder was silently accepting truncated streams by not validating stream completion in finish(). This issue was discovered by the generic truncated stream test. Added state tracking to ZstdDecoder: - Added stream_ended field to track if remaining == 0 was seen - Modified decode() to set stream_ended = true when stream completes - Updated finish() to check stream_ended and return UnexpectedEof if false - Updated all constructors to initialize stream_ended = false This matches the behavior of other decoders (bzip2, gzip, lz4, etc.) and ensures applications cannot accidentally accept corrupted or incomplete zstd data as valid. The generic truncated test now passes for Zstd. --- crates/compression-codecs/src/zstd/decoder.rs | 26 +++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/crates/compression-codecs/src/zstd/decoder.rs b/crates/compression-codecs/src/zstd/decoder.rs index 44e72ef2..e6cfca21 100644 --- a/crates/compression-codecs/src/zstd/decoder.rs +++ b/crates/compression-codecs/src/zstd/decoder.rs @@ -13,12 +13,14 @@ use zstd_safe::get_error_name; #[derive(Debug)] pub struct ZstdDecoder { decoder: Unshared>, + stream_ended: bool, } impl Default for ZstdDecoder { fn default() -> Self { Self { decoder: Unshared::new(Decoder::new().unwrap()), + stream_ended: false, } } } @@ -35,6 +37,7 @@ impl ZstdDecoder { } Self { decoder: Unshared::new(decoder), + stream_ended: false, } } @@ -42,6 +45,7 @@ impl ZstdDecoder { let decoder = Decoder::with_dictionary(dictionary)?; Ok(Self { decoder: Unshared::new(decoder), + stream_ended: false, }) } @@ -64,6 +68,7 @@ impl ZstdDecoder { impl DecodeV2 for ZstdDecoder { fn reinit(&mut self) -> Result<()> { self.decoder.get_mut().reinit()?; + self.stream_ended = false; Ok(()) } @@ -80,15 +85,32 @@ impl DecodeV2 for ZstdDecoder { .run_on_buffers(input.unwritten(), output.unwritten_initialized_mut())?; input.advance(status.bytes_read); output.advance(status.bytes_written); - Ok(status.remaining == 0) + + let finished = status.remaining == 0; + if finished { + self.stream_ended = true; + } + Ok(finished) } fn flush(&mut self, output: &mut WriteBuffer<'_>) -> Result { + // Note: stream_ended is not updated here because zstd's flush only flushes + // buffered output and doesn't indicate stream completion. Stream completion + // is detected in decode() when status.remaining == 0. self.call_fn_on_out_buffer(output, |decoder, out_buf| decoder.flush(out_buf)) } fn finish(&mut self, output: &mut WriteBuffer<'_>) -> Result { - self.call_fn_on_out_buffer(output, |decoder, out_buf| decoder.finish(out_buf, true)) + self.call_fn_on_out_buffer(output, |decoder, out_buf| decoder.finish(out_buf, true))?; + + if self.stream_ended { + Ok(true) + } else { + Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "zstd stream did not finish", + )) + } } }