use std::io::{self, Write}; use crate::stream::raw::{InBuffer, Operation, OutBuffer}; // input -> [ zstd -> buffer -> writer ] /// Implements the [`Write`] API around an [`Operation`]. /// /// This can be used to wrap a raw in-memory operation in a write-focused API. /// /// It can be used with either compression or decompression, and forwards the /// output to a wrapped `Write`. pub struct Writer { writer: W, operation: D, offset: usize, buffer: Vec, // When `true`, indicates that nothing should be added to the buffer. // All that's left if to empty the buffer. finished: bool, finished_frame: bool, } impl Writer where W: Write, D: Operation, { /// Creates a new `Writer`. /// /// All output from the given operation will be forwarded to `writer`. pub fn new(writer: W, operation: D) -> Self { Writer { writer, operation, offset: 0, // 32KB buffer? That's what flate2 uses buffer: Vec::with_capacity(32 * 1024), finished: false, finished_frame: false, } } /// Ends the stream. /// /// This *must* be called after all data has been written to finish the /// stream. /// /// If you forget to call this and just drop the `Writer`, you *will* have /// an incomplete output. /// /// Keep calling it until it returns `Ok(())`, then don't call it again. pub fn finish(&mut self) -> io::Result<()> { loop { // Keep trying until we're really done. self.write_from_offset()?; // At this point the buffer has been fully written out. if self.finished { return Ok(()); } // Let's fill this buffer again! let finished_frame = self.finished_frame; let hint = self.with_buffer(|dst, op| op.finish(dst, finished_frame)); self.offset = 0; // println!("Hint: {:?}\nOut:{:?}", hint, &self.buffer); // We return here if zstd had a problem. // Could happen with invalid data, ... let hint = hint?; if hint != 0 && self.buffer.is_empty() { // This happens if we are decoding an incomplete frame. return Err(io::Error::new( io::ErrorKind::UnexpectedEof, "incomplete frame", )); } // println!("Finishing {}, {}", bytes_written, hint); self.finished = hint == 0; } } /// Run the given closure on `self.buffer`. /// /// The buffer will be cleared, and made available wrapped in an `OutBuffer`. fn with_buffer(&mut self, f: F) -> T where F: FnOnce(&mut OutBuffer<'_, Vec>, &mut D) -> T, { self.buffer.clear(); let mut output = OutBuffer::around(&mut self.buffer); // eprintln!("Output: {:?}", output); f(&mut output, &mut self.operation) } /// Attempt to write `self.buffer` to the wrapped writer. /// /// Returns `Ok(())` once all the buffer has been written. fn write_from_offset(&mut self) -> io::Result<()> { // The code looks a lot like `write_all`, but keeps track of what has // been written in case we're interrupted. while self.offset < self.buffer.len() { match self.writer.write(&self.buffer[self.offset..]) { Ok(0) => { return Err(io::Error::new( io::ErrorKind::WriteZero, "writer will not accept any more data", )) } Ok(n) => self.offset += n, Err(ref e) if e.kind() == io::ErrorKind::Interrupted => (), Err(e) => return Err(e), } } Ok(()) } /// Return the wrapped `Writer` and `Operation`. /// /// Careful: if you call this before calling [`Writer::finish()`], the /// output may be incomplete. pub fn into_inner(self) -> (W, D) { (self.writer, self.operation) } /// Gives a reference to the inner writer. pub fn writer(&self) -> &W { &self.writer } /// Gives a mutable reference to the inner writer. pub fn writer_mut(&mut self) -> &mut W { &mut self.writer } /// Gives a reference to the inner operation. pub fn operation(&self) -> &D { &self.operation } /// Gives a mutable reference to the inner operation. pub fn operation_mut(&mut self) -> &mut D { &mut self.operation } /// Returns the offset in the current buffer. Only useful for debugging. #[cfg(test)] pub fn offset(&self) -> usize { self.offset } /// Returns the current buffer. Only useful for debugging. #[cfg(test)] pub fn buffer(&self) -> &[u8] { &self.buffer } } impl Write for Writer where W: Write, D: Operation, { fn write(&mut self, buf: &[u8]) -> io::Result { // Keep trying until _something_ has been consumed. // As soon as some input has been taken, we cannot afford // to take any chance: if an error occurs, the user couldn't know // that some data _was_ successfully written. loop { // First, write any pending data from `self.buffer`. self.write_from_offset()?; // At this point `self.buffer` can safely be discarded. // Support writing concatenated frames by re-initializing the // context. if self.finished_frame { self.operation.reinit()?; self.finished_frame = false; } let mut src = InBuffer::around(buf); let hint = self.with_buffer(|dst, op| op.run(&mut src, dst)); let bytes_read = src.pos; // eprintln!( // "Write Hint: {:?}\n src: {:?}\n dst: {:?}", // hint, src, self.buffer // ); self.offset = 0; let hint = hint?; if hint == 0 { self.finished_frame = true; } // As we said, as soon as we've consumed something, return. if bytes_read > 0 || buf.is_empty() { // println!("Returning {}", bytes_read); return Ok(bytes_read); } } } fn flush(&mut self) -> io::Result<()> { let mut finished = self.finished; loop { // If the output is blocked or has an error, return now. self.write_from_offset()?; if finished { break; } let hint = self.with_buffer(|dst, op| op.flush(dst)); self.offset = 0; let hint = hint?; finished = hint == 0; } self.writer.flush() } } #[cfg(test)] mod tests { use super::Writer; use std::io::Write; #[test] fn test_noop() { use crate::stream::raw::NoOp; let input = b"AbcdefghAbcdefgh."; // Test writer let mut output = Vec::new(); { let mut writer = Writer::new(&mut output, NoOp); writer.write_all(input).unwrap(); writer.finish().unwrap(); } assert_eq!(&output, input); } #[test] fn test_compress() { use crate::stream::raw::Encoder; let input = b"AbcdefghAbcdefgh."; // Test writer let mut output = Vec::new(); { let mut writer = Writer::new(&mut output, Encoder::new(1).unwrap()); writer.write_all(input).unwrap(); writer.finish().unwrap(); } // println!("Output: {:?}", output); let decoded = crate::decode_all(&output[..]).unwrap(); assert_eq!(&decoded, input); } #[test] fn test_decompress() { use crate::stream::raw::Decoder; let input = b"AbcdefghAbcdefgh."; let compressed = crate::encode_all(&input[..], 1).unwrap(); // Test writer let mut output = Vec::new(); { let mut writer = Writer::new(&mut output, Decoder::new().unwrap()); writer.write_all(&compressed).unwrap(); writer.finish().unwrap(); } // println!("Output: {:?}", output); assert_eq!(&output, input); } }