use std::io::{self, BufRead, Read}; use crate::stream::raw::{InBuffer, Operation, OutBuffer}; // [ reader -> zstd ] -> output /// Implements the [`Read`] API around an [`Operation`]. /// /// This can be used to wrap a raw in-memory operation in a read-focused API. /// /// It can wrap either a compression or decompression operation, and pulls /// input data from a wrapped `Read`. pub struct Reader { reader: R, operation: D, state: State, single_frame: bool, finished_frame: bool, } enum State { // Still actively reading from the inner `Read` Reading, // We reached EOF from the inner `Read`, now flushing. PastEof, // We are fully done, nothing can be read. Finished, } impl Reader { /// Creates a new `Reader`. /// /// `reader` will be used to pull input data for the given operation. pub fn new(reader: R, operation: D) -> Self { Reader { reader, operation, state: State::Reading, single_frame: false, finished_frame: false, } } /// Sets `self` to stop after the first decoded frame. pub fn set_single_frame(&mut self) { self.single_frame = true; } /// Returns a mutable reference to the underlying operation. pub fn operation_mut(&mut self) -> &mut D { &mut self.operation } /// Returns a mutable reference to the underlying reader. pub fn reader_mut(&mut self) -> &mut R { &mut self.reader } /// Returns a reference to the underlying reader. pub fn reader(&self) -> &R { &self.reader } /// Returns the inner reader. pub fn into_inner(self) -> R { self.reader } /// Flush any internal buffer. /// /// For encoders, this ensures all input consumed so far is compressed. pub fn flush(&mut self, output: &mut [u8]) -> io::Result where D: Operation, { self.operation.flush(&mut OutBuffer::around(output)) } } // Read and retry on Interrupted errors. fn fill_buf(reader: &mut R) -> io::Result<&[u8]> where R: BufRead, { // This doesn't work right now because of the borrow-checker. // When it can be made to compile, it would allow Reader to automatically // retry on `Interrupted` error. /* loop { match reader.fill_buf() { Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} otherwise => return otherwise, } } */ // Workaround for now let res = reader.fill_buf()?; // eprintln!("Filled buffer: {:?}", res); Ok(res) } impl Read for Reader where R: BufRead, D: Operation, { fn read(&mut self, buf: &mut [u8]) -> io::Result { // Keep trying until _something_ has been written. let mut first = true; loop { match self.state { State::Reading => { let (bytes_read, bytes_written) = { // Start with a fresh pool of un-processed data. // This is the only line that can return an interruption error. let input = if first { // eprintln!("First run, no input coming."); b"" } else { fill_buf(&mut self.reader)? }; // eprintln!("Input = {:?}", input); // It's possible we don't have any new data to read. // (In this case we may still have zstd's own buffer to clear.) if !first && input.is_empty() { self.state = State::PastEof; continue; } first = false; let mut src = InBuffer::around(input); let mut dst = OutBuffer::around(buf); // We don't want empty input (from first=true) to cause a frame // re-initialization. if self.finished_frame && !input.is_empty() { // eprintln!("!! Reigniting !!"); self.operation.reinit()?; self.finished_frame = false; } // Phase 1: feed input to the operation let hint = self.operation.run(&mut src, &mut dst)?; // eprintln!( // "Hint={} Just run our operation:\n In={:?}\n Out={:?}", // hint, src, dst // ); if hint == 0 { // In practice this only happens when decoding, when we just finished // reading a frame. self.finished_frame = true; if self.single_frame { self.state = State::Finished; } } // eprintln!("Output: {:?}", dst); (src.pos(), dst.pos()) }; self.reader.consume(bytes_read); if bytes_written > 0 { return Ok(bytes_written); } // We need more data! Try again! } State::PastEof => { let mut dst = OutBuffer::around(buf); // We already sent all the input we could get to zstd. Time to flush out the // buffer and be done with it. // Phase 2: flush out the operation's buffer // Keep calling `finish()` until the buffer is empty. let hint = self .operation .finish(&mut dst, self.finished_frame)?; // eprintln!("Hint: {} ; Output: {:?}", hint, dst); if hint == 0 { // This indicates that the footer is complete. // This is the only way to terminate the stream cleanly. self.state = State::Finished; } return Ok(dst.pos()); } State::Finished => { return Ok(0); } } } } } #[cfg(test)] mod tests { use super::Reader; use std::io::{Cursor, Read}; #[test] fn test_noop() { use crate::stream::raw::NoOp; let input = b"AbcdefghAbcdefgh."; // Test reader let mut output = Vec::new(); { let mut reader = Reader::new(Cursor::new(input), NoOp); reader.read_to_end(&mut output).unwrap(); } assert_eq!(&output, input); } #[test] fn test_compress() { use crate::stream::raw::Encoder; let input = b"AbcdefghAbcdefgh."; // Test reader let mut output = Vec::new(); { let mut reader = Reader::new(Cursor::new(input), Encoder::new(1).unwrap()); reader.read_to_end(&mut output).unwrap(); } // eprintln!("{:?}", output); let decoded = crate::decode_all(&output[..]).unwrap(); assert_eq!(&decoded, input); } }