ext/polars/src/dataframe.rs in polars-df-0.1.1 vs ext/polars/src/dataframe.rs in polars-df-0.1.2

- old
+ new

@@ -1,7 +1,8 @@ use magnus::{r_hash::ForEach, Error, RArray, RHash, RString, Value}; use polars::io::mmap::ReaderBytes; +use polars::io::RowCount; use polars::prelude::*; use std::cell::RefCell; use std::fs::File; use std::io::{BufReader, BufWriter, Cursor}; use std::ops::Deref; @@ -41,14 +42,98 @@ pub fn estimated_size(&self) -> usize { self.df.borrow().estimated_size() } - pub fn read_csv(rb_f: Value, has_header: bool) -> RbResult<Self> { + pub fn read_csv(arguments: &[Value]) -> RbResult<Self> { + // start arguments + // this pattern is needed for more than 16 + let rb_f: Value = arguments[0].try_convert()?; + let infer_schema_length: Option<usize> = arguments[1].try_convert()?; + let chunk_size: usize = arguments[2].try_convert()?; + let has_header: bool = arguments[3].try_convert()?; + let ignore_errors: bool = arguments[4].try_convert()?; + let n_rows: Option<usize> = arguments[5].try_convert()?; + let skip_rows: usize = arguments[6].try_convert()?; + let projection: Option<Vec<usize>> = arguments[7].try_convert()?; + let sep: String = arguments[8].try_convert()?; + let rechunk: bool = arguments[9].try_convert()?; + let columns: Option<Vec<String>> = arguments[10].try_convert()?; + let encoding: Wrap<CsvEncoding> = arguments[11].try_convert()?; + let n_threads: Option<usize> = arguments[12].try_convert()?; + let path: Option<String> = arguments[13].try_convert()?; + let overwrite_dtype: Option<Vec<(String, Wrap<DataType>)>> = arguments[14].try_convert()?; + // TODO fix + let overwrite_dtype_slice: Option<Vec<Wrap<DataType>>> = None; // arguments[15].try_convert()?; + let low_memory: bool = arguments[16].try_convert()?; + let comment_char: Option<String> = arguments[17].try_convert()?; + let quote_char: Option<String> = arguments[18].try_convert()?; + let null_values: Option<Wrap<NullValues>> = arguments[19].try_convert()?; + let parse_dates: bool = arguments[20].try_convert()?; + let skip_rows_after_header: usize = arguments[21].try_convert()?; + let row_count: Option<(String, IdxSize)> = arguments[22].try_convert()?; + let sample_size: usize = arguments[23].try_convert()?; + let eol_char: String = arguments[24].try_convert()?; + // end arguments + + let null_values = null_values.map(|w| w.0); + let comment_char = comment_char.map(|s| s.as_bytes()[0]); + let eol_char = eol_char.as_bytes()[0]; + + let row_count = row_count.map(|(name, offset)| RowCount { name, offset }); + + let quote_char = if let Some(s) = quote_char { + if s.is_empty() { + None + } else { + Some(s.as_bytes()[0]) + } + } else { + None + }; + + let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| { + let fields = overwrite_dtype.iter().map(|(name, dtype)| { + let dtype = dtype.0.clone(); + Field::new(name, dtype) + }); + Schema::from(fields) + }); + + let overwrite_dtype_slice = overwrite_dtype_slice.map(|overwrite_dtype| { + overwrite_dtype + .iter() + .map(|dt| dt.0.clone()) + .collect::<Vec<_>>() + }); + let mmap_bytes_r = get_mmap_bytes_reader(rb_f)?; let df = CsvReader::new(mmap_bytes_r) + .infer_schema(infer_schema_length) .has_header(has_header) + .with_n_rows(n_rows) + .with_delimiter(sep.as_bytes()[0]) + .with_skip_rows(skip_rows) + .with_ignore_parser_errors(ignore_errors) + .with_projection(projection) + .with_rechunk(rechunk) + .with_chunk_size(chunk_size) + .with_encoding(encoding.0) + .with_columns(columns) + .with_n_threads(n_threads) + .with_path(path) + .with_dtypes(overwrite_dtype.as_ref()) + .with_dtypes_slice(overwrite_dtype_slice.as_deref()) + .low_memory(low_memory) + .with_comment_char(comment_char) + .with_null_values(null_values) + .with_parse_dates(parse_dates) + .with_quote_char(quote_char) + .with_end_of_line_char(eol_char) + .with_skip_rows_after_header(skip_rows_after_header) + .with_row_count(row_count) + .sample_size(sample_size) .finish() .map_err(RbPolarsErr::from)?; Ok(df.into()) } @@ -59,10 +144,31 @@ .finish() .map_err(RbPolarsErr::from) .map(|v| v.into()) } + pub fn read_ipc( + rb_f: Value, + columns: Option<Vec<String>>, + projection: Option<Vec<usize>>, + n_rows: Option<usize>, + row_count: Option<(String, IdxSize)>, + memory_map: bool, + ) -> RbResult<Self> { + let row_count = row_count.map(|(name, offset)| RowCount { name, offset }); + let mmap_bytes_r = get_mmap_bytes_reader(rb_f)?; + let df = IpcReader::new(mmap_bytes_r) + .with_projection(projection) + .with_columns(columns) + .with_n_rows(n_rows) + .with_row_count(row_count) + .memory_mapped(memory_map) + .finish() + .map_err(RbPolarsErr::from)?; + Ok(RbDataFrame::new(df)) + } + pub fn read_json(rb_f: Value) -> RbResult<Self> { // memmap the file first let mmap_bytes_r = get_mmap_bytes_reader(rb_f)?; let mmap_read: ReaderBytes = (&mmap_bytes_r).into(); let bytes = mmap_read.deref(); @@ -183,10 +289,32 @@ } Ok(()) } + pub fn write_ipc( + &self, + rb_f: Value, + compression: Wrap<Option<IpcCompression>>, + ) -> RbResult<()> { + if let Ok(s) = rb_f.try_convert::<String>() { + let f = std::fs::File::create(&s).unwrap(); + IpcWriter::new(f) + .with_compression(compression.0) + .finish(&mut self.df.borrow_mut()) + .map_err(RbPolarsErr::from)?; + } else { + let mut buf = get_file_like(rb_f, true)?; + + IpcWriter::new(&mut buf) + .with_compression(compression.0) + .finish(&mut self.df.borrow_mut()) + .map_err(RbPolarsErr::from)?; + } + Ok(()) + } + pub fn write_parquet( &self, rb_f: Value, compression: String, compression_level: Option<i32>, @@ -238,15 +366,15 @@ .set_column_names(&names) .map_err(RbPolarsErr::from)?; Ok(()) } - pub fn dtypes(&self) -> Vec<String> { + pub fn dtypes(&self) -> Vec<Value> { self.df .borrow() .iter() - .map(|s| s.dtype().to_string()) + .map(|s| Wrap(s.dtype().clone()).into()) .collect() } pub fn n_chunks(&self) -> RbResult<usize> { let n = self.df.borrow().n_chunks().map_err(RbPolarsErr::from)?; @@ -416,10 +544,10 @@ self.df.borrow().partition_by_stable(groups) } else { self.df.borrow().partition_by(groups) } .map_err(RbPolarsErr::from)?; - Ok(out.into_iter().map(|v| RbDataFrame::new(v)).collect()) + Ok(out.into_iter().map(RbDataFrame::new).collect()) } pub fn shift(&self, periods: i64) -> Self { self.df.borrow().shift(periods).into() }