use magnus::{RArray, Value}; use polars::io::mmap::MmapBytesReader; use polars::io::RowCount; use polars::prelude::read_impl::OwnedBatchedCsvReader; use polars::prelude::*; use std::cell::RefCell; use std::path::PathBuf; use crate::conversion::*; use crate::prelude::read_impl::OwnedBatchedCsvReaderMmap; use crate::{RbDataFrame, RbPolarsErr, RbResult}; pub enum BatchedReader { MMap(OwnedBatchedCsvReaderMmap), Read(OwnedBatchedCsvReader), } #[magnus::wrap(class = "Polars::RbBatchedCsv")] pub struct RbBatchedCsv { pub reader: RefCell, } impl RbBatchedCsv { pub fn new(arguments: &[Value]) -> RbResult { // start arguments // this pattern is needed for more than 16 let infer_schema_length: Option = arguments[0].try_convert()?; let chunk_size: usize = arguments[1].try_convert()?; let has_header: bool = arguments[2].try_convert()?; let ignore_errors: bool = arguments[3].try_convert()?; let n_rows: Option = arguments[4].try_convert()?; let skip_rows: usize = arguments[5].try_convert()?; let projection: Option> = arguments[6].try_convert()?; let sep: String = arguments[7].try_convert()?; let rechunk: bool = arguments[8].try_convert()?; let columns: Option> = arguments[9].try_convert()?; let encoding: Wrap = arguments[10].try_convert()?; let n_threads: Option = arguments[11].try_convert()?; let path: PathBuf = arguments[12].try_convert()?; let overwrite_dtype: Option)>> = arguments[13].try_convert()?; // TODO fix let overwrite_dtype_slice: Option>> = None; // arguments[14].try_convert()?; let low_memory: bool = arguments[15].try_convert()?; let comment_char: Option = arguments[16].try_convert()?; let quote_char: Option = arguments[17].try_convert()?; let null_values: Option> = arguments[18].try_convert()?; let try_parse_dates: bool = arguments[19].try_convert()?; let skip_rows_after_header: usize = arguments[20].try_convert()?; let row_count: Option<(String, IdxSize)> = arguments[21].try_convert()?; let sample_size: usize = arguments[22].try_convert()?; let eol_char: String = arguments[23].try_convert()?; // end arguments let null_values = null_values.map(|w| w.0); let comment_char = comment_char.map(|s| s.as_bytes()[0]); let eol_char = eol_char.as_bytes()[0]; let row_count = row_count.map(|(name, offset)| RowCount { name, offset }); let quote_char = if let Some(s) = quote_char { if s.is_empty() { None } else { Some(s.as_bytes()[0]) } } else { None }; let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| { overwrite_dtype .iter() .map(|(name, dtype)| { let dtype = dtype.0.clone(); Field::new(name, dtype) }) .collect::() }); let overwrite_dtype_slice = overwrite_dtype_slice.map(|overwrite_dtype| { overwrite_dtype .iter() .map(|dt| dt.0.clone()) .collect::>() }); let file = std::fs::File::open(path).map_err(RbPolarsErr::io)?; let reader = Box::new(file) as Box; let reader = CsvReader::new(reader) .infer_schema(infer_schema_length) .has_header(has_header) .with_n_rows(n_rows) .with_delimiter(sep.as_bytes()[0]) .with_skip_rows(skip_rows) .with_ignore_errors(ignore_errors) .with_projection(projection) .with_rechunk(rechunk) .with_chunk_size(chunk_size) .with_encoding(encoding.0) .with_columns(columns) .with_n_threads(n_threads) .with_dtypes_slice(overwrite_dtype_slice.as_deref()) .low_memory(low_memory) .with_comment_char(comment_char) .with_null_values(null_values) .with_try_parse_dates(try_parse_dates) .with_quote_char(quote_char) .with_end_of_line_char(eol_char) .with_skip_rows_after_header(skip_rows_after_header) .with_row_count(row_count) .sample_size(sample_size); let reader = if low_memory { let reader = reader .batched_read(overwrite_dtype.map(Arc::new)) .map_err(RbPolarsErr::from)?; BatchedReader::Read(reader) } else { let reader = reader .batched_mmap(overwrite_dtype.map(Arc::new)) .map_err(RbPolarsErr::from)?; BatchedReader::MMap(reader) }; Ok(RbBatchedCsv { reader: RefCell::new(reader), }) } pub fn next_batches(&self, n: usize) -> RbResult> { let batches = match &mut *self.reader.borrow_mut() { BatchedReader::MMap(reader) => reader.next_batches(n), BatchedReader::Read(reader) => reader.next_batches(n), } .map_err(RbPolarsErr::from)?; Ok(batches.map(|batches| RArray::from_iter(batches.into_iter().map(RbDataFrame::from)))) } }