Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions crates/async-compression/tests/utils/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,26 @@ macro_rules! io_test_cases {

assert_eq!(output, &[1, 2, 3, 4, 5, 6, 6, 5, 4, 3, 2, 1][..]);
}

#[test]
#[ntest::timeout(1000)]
fn truncated() {
let compressed = sync::compress(&[1, 2, 3, 4, 5, 6]);

// Truncate the compressed data (remove last 20 bytes or half, whichever is less)
let truncate_amount = std::cmp::min(20, compressed.len() / 2);
let truncated = &compressed[..compressed.len() - truncate_amount];

let input = InputStream::new(vec![truncated.to_vec()]);

// Try to decompress - should get an error for incomplete stream
// The error manifests as a panic when read::to_vec calls unwrap()
let result =
std::panic::catch_unwind(|| bufread::decompress(bufread::from(&input)));

// Should fail for truncated stream
assert!(result.is_err(), "Expected error for truncated stream");
}
}
}

Expand Down
17 changes: 16 additions & 1 deletion crates/compression-codecs/src/bzip2/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{fmt, io};

pub struct BzDecoder {
decompress: Decompress,
stream_ended: bool,
}

impl fmt::Debug for BzDecoder {
Expand All @@ -22,6 +23,7 @@ impl Default for BzDecoder {
fn default() -> Self {
Self {
decompress: Decompress::new(false),
stream_ended: false,
}
}
}
Expand Down Expand Up @@ -49,13 +51,19 @@ impl BzDecoder {
input.advance((self.decompress.total_in() - prior_in) as usize);
output.advance((self.decompress.total_out() - prior_out) as usize);

// Track when stream has properly ended
if status == Status::StreamEnd {
self.stream_ended = true;
}

Ok(status)
}
}

impl DecodeV2 for BzDecoder {
fn reinit(&mut self) -> io::Result<()> {
self.decompress = Decompress::new(false);
self.stream_ended = false;
Ok(())
}

Expand Down Expand Up @@ -101,6 +109,13 @@ impl DecodeV2 for BzDecoder {
}

fn finish(&mut self, _output: &mut WriteBuffer<'_>) -> io::Result<bool> {
Ok(true)
if self.stream_ended {
Ok(true)
} else {
Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"bzip2 stream did not finish",
))
}
}
}
21 changes: 19 additions & 2 deletions crates/compression-codecs/src/lz4/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ struct DecoderContext {
#[derive(Debug)]
pub struct Lz4Decoder {
ctx: Unshared<DecoderContext>,
stream_ended: bool,
}

impl DecoderContext {
Expand All @@ -37,6 +38,7 @@ impl Default for Lz4Decoder {
fn default() -> Self {
Self {
ctx: Unshared::new(DecoderContext::new().unwrap()),
stream_ended: false,
}
}
}
Expand All @@ -50,6 +52,7 @@ impl Lz4Decoder {
impl DecodeV2 for Lz4Decoder {
fn reinit(&mut self) -> Result<()> {
unsafe { LZ4F_resetDecompressionContext(self.ctx.get_mut().ctx) };
self.stream_ended = false;
Ok(())
}

Expand All @@ -74,7 +77,12 @@ impl DecodeV2 for Lz4Decoder {
};
input.advance(input_size);
output.advance(output_size);
Ok(remaining == 0)

let finished = remaining == 0;
if finished {
self.stream_ended = true;
}
Ok(finished)
}

fn flush(&mut self, output: &mut WriteBuffer<'_>) -> Result<bool> {
Expand All @@ -92,6 +100,15 @@ impl DecodeV2 for Lz4Decoder {
}

fn finish(&mut self, output: &mut WriteBuffer<'_>) -> Result<bool> {
self.flush(output)
self.flush(output)?;

if self.stream_ended {
Ok(true)
} else {
Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"lz4 stream did not finish",
))
}
}
}
26 changes: 24 additions & 2 deletions crates/compression-codecs/src/zstd/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ use zstd_safe::get_error_name;
#[derive(Debug)]
pub struct ZstdDecoder {
decoder: Unshared<Decoder<'static>>,
stream_ended: bool,
}

impl Default for ZstdDecoder {
fn default() -> Self {
Self {
decoder: Unshared::new(Decoder::new().unwrap()),
stream_ended: false,
}
}
}
Expand All @@ -35,13 +37,15 @@ impl ZstdDecoder {
}
Self {
decoder: Unshared::new(decoder),
stream_ended: false,
}
}

pub fn new_with_dict(dictionary: &[u8]) -> io::Result<Self> {
let decoder = Decoder::with_dictionary(dictionary)?;
Ok(Self {
decoder: Unshared::new(decoder),
stream_ended: false,
})
}

Expand All @@ -64,6 +68,7 @@ impl ZstdDecoder {
impl DecodeV2 for ZstdDecoder {
fn reinit(&mut self) -> Result<()> {
self.decoder.get_mut().reinit()?;
self.stream_ended = false;
Ok(())
}

Expand All @@ -80,15 +85,32 @@ impl DecodeV2 for ZstdDecoder {
.run_on_buffers(input.unwritten(), output.unwritten_initialized_mut())?;
input.advance(status.bytes_read);
output.advance(status.bytes_written);
Ok(status.remaining == 0)

let finished = status.remaining == 0;
if finished {
self.stream_ended = true;
}
Ok(finished)
}

fn flush(&mut self, output: &mut WriteBuffer<'_>) -> Result<bool> {
// Note: stream_ended is not updated here because zstd's flush only flushes
// buffered output and doesn't indicate stream completion. Stream completion
// is detected in decode() when status.remaining == 0.
self.call_fn_on_out_buffer(output, |decoder, out_buf| decoder.flush(out_buf))
}

fn finish(&mut self, output: &mut WriteBuffer<'_>) -> Result<bool> {
self.call_fn_on_out_buffer(output, |decoder, out_buf| decoder.finish(out_buf, true))
self.call_fn_on_out_buffer(output, |decoder, out_buf| decoder.finish(out_buf, true))?;

if self.stream_ended {
Ok(true)
} else {
Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"zstd stream did not finish",
))
}
}
}

Expand Down