diff --git a/crates/async-compression/tests/utils/test_cases.rs b/crates/async-compression/tests/utils/test_cases.rs index f32b0a82..af79c4b1 100644 --- a/crates/async-compression/tests/utils/test_cases.rs +++ b/crates/async-compression/tests/utils/test_cases.rs @@ -231,6 +231,26 @@ macro_rules! io_test_cases { assert_eq!(output, &[1, 2, 3, 4, 5, 6, 6, 5, 4, 3, 2, 1][..]); } + + #[test] + #[ntest::timeout(1000)] + fn truncated() { + let compressed = sync::compress(&[1, 2, 3, 4, 5, 6]); + + // Truncate the compressed data (remove last 20 bytes or half, whichever is less) + let truncate_amount = std::cmp::min(20, compressed.len() / 2); + let truncated = &compressed[..compressed.len() - truncate_amount]; + + let input = InputStream::new(vec![truncated.to_vec()]); + + // Try to decompress - should get an error for incomplete stream + // The error manifests as a panic when read::to_vec calls unwrap() + let result = + std::panic::catch_unwind(|| bufread::decompress(bufread::from(&input))); + + // Should fail for truncated stream + assert!(result.is_err(), "Expected error for truncated stream"); + } } } diff --git a/crates/compression-codecs/src/bzip2/decoder.rs b/crates/compression-codecs/src/bzip2/decoder.rs index 537549e9..26fa52b1 100644 --- a/crates/compression-codecs/src/bzip2/decoder.rs +++ b/crates/compression-codecs/src/bzip2/decoder.rs @@ -5,6 +5,7 @@ use std::{fmt, io}; pub struct BzDecoder { decompress: Decompress, + stream_ended: bool, } impl fmt::Debug for BzDecoder { @@ -22,6 +23,7 @@ impl Default for BzDecoder { fn default() -> Self { Self { decompress: Decompress::new(false), + stream_ended: false, } } } @@ -49,6 +51,11 @@ impl BzDecoder { input.advance((self.decompress.total_in() - prior_in) as usize); output.advance((self.decompress.total_out() - prior_out) as usize); + // Track when stream has properly ended + if status == Status::StreamEnd { + self.stream_ended = true; + } + Ok(status) } } @@ -56,6 +63,7 @@ impl BzDecoder { impl DecodeV2 for BzDecoder { fn reinit(&mut self) -> io::Result<()> { self.decompress = Decompress::new(false); + self.stream_ended = false; Ok(()) } @@ -101,6 +109,13 @@ impl DecodeV2 for BzDecoder { } fn finish(&mut self, _output: &mut WriteBuffer<'_>) -> io::Result { - Ok(true) + if self.stream_ended { + Ok(true) + } else { + Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "bzip2 stream did not finish", + )) + } } } diff --git a/crates/compression-codecs/src/lz4/decoder.rs b/crates/compression-codecs/src/lz4/decoder.rs index aa7c71d1..66f3fc86 100644 --- a/crates/compression-codecs/src/lz4/decoder.rs +++ b/crates/compression-codecs/src/lz4/decoder.rs @@ -17,6 +17,7 @@ struct DecoderContext { #[derive(Debug)] pub struct Lz4Decoder { ctx: Unshared, + stream_ended: bool, } impl DecoderContext { @@ -37,6 +38,7 @@ impl Default for Lz4Decoder { fn default() -> Self { Self { ctx: Unshared::new(DecoderContext::new().unwrap()), + stream_ended: false, } } } @@ -50,6 +52,7 @@ impl Lz4Decoder { impl DecodeV2 for Lz4Decoder { fn reinit(&mut self) -> Result<()> { unsafe { LZ4F_resetDecompressionContext(self.ctx.get_mut().ctx) }; + self.stream_ended = false; Ok(()) } @@ -74,7 +77,12 @@ impl DecodeV2 for Lz4Decoder { }; input.advance(input_size); output.advance(output_size); - Ok(remaining == 0) + + let finished = remaining == 0; + if finished { + self.stream_ended = true; + } + Ok(finished) } fn flush(&mut self, output: &mut WriteBuffer<'_>) -> Result { @@ -92,6 +100,15 @@ impl DecodeV2 for Lz4Decoder { } fn finish(&mut self, output: &mut WriteBuffer<'_>) -> Result { - self.flush(output) + self.flush(output)?; + + if self.stream_ended { + Ok(true) + } else { + Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "lz4 stream did not finish", + )) + } } } diff --git a/crates/compression-codecs/src/zstd/decoder.rs b/crates/compression-codecs/src/zstd/decoder.rs index 44e72ef2..e6cfca21 100644 --- a/crates/compression-codecs/src/zstd/decoder.rs +++ b/crates/compression-codecs/src/zstd/decoder.rs @@ -13,12 +13,14 @@ use zstd_safe::get_error_name; #[derive(Debug)] pub struct ZstdDecoder { decoder: Unshared>, + stream_ended: bool, } impl Default for ZstdDecoder { fn default() -> Self { Self { decoder: Unshared::new(Decoder::new().unwrap()), + stream_ended: false, } } } @@ -35,6 +37,7 @@ impl ZstdDecoder { } Self { decoder: Unshared::new(decoder), + stream_ended: false, } } @@ -42,6 +45,7 @@ impl ZstdDecoder { let decoder = Decoder::with_dictionary(dictionary)?; Ok(Self { decoder: Unshared::new(decoder), + stream_ended: false, }) } @@ -64,6 +68,7 @@ impl ZstdDecoder { impl DecodeV2 for ZstdDecoder { fn reinit(&mut self) -> Result<()> { self.decoder.get_mut().reinit()?; + self.stream_ended = false; Ok(()) } @@ -80,15 +85,32 @@ impl DecodeV2 for ZstdDecoder { .run_on_buffers(input.unwritten(), output.unwritten_initialized_mut())?; input.advance(status.bytes_read); output.advance(status.bytes_written); - Ok(status.remaining == 0) + + let finished = status.remaining == 0; + if finished { + self.stream_ended = true; + } + Ok(finished) } fn flush(&mut self, output: &mut WriteBuffer<'_>) -> Result { + // Note: stream_ended is not updated here because zstd's flush only flushes + // buffered output and doesn't indicate stream completion. Stream completion + // is detected in decode() when status.remaining == 0. self.call_fn_on_out_buffer(output, |decoder, out_buf| decoder.flush(out_buf)) } fn finish(&mut self, output: &mut WriteBuffer<'_>) -> Result { - self.call_fn_on_out_buffer(output, |decoder, out_buf| decoder.finish(out_buf, true)) + self.call_fn_on_out_buffer(output, |decoder, out_buf| decoder.finish(out_buf, true))?; + + if self.stream_ended { + Ok(true) + } else { + Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "zstd stream did not finish", + )) + } } }