ext/polars/src/dataframe.rs in polars-df-0.1.1 vs ext/polars/src/dataframe.rs in polars-df-0.1.2
- old
+ new
@@ -1,7 +1,8 @@
use magnus::{r_hash::ForEach, Error, RArray, RHash, RString, Value};
use polars::io::mmap::ReaderBytes;
+use polars::io::RowCount;
use polars::prelude::*;
use std::cell::RefCell;
use std::fs::File;
use std::io::{BufReader, BufWriter, Cursor};
use std::ops::Deref;
@@ -41,14 +42,98 @@
pub fn estimated_size(&self) -> usize {
self.df.borrow().estimated_size()
}
- pub fn read_csv(rb_f: Value, has_header: bool) -> RbResult<Self> {
+ pub fn read_csv(arguments: &[Value]) -> RbResult<Self> {
+ // start arguments
+ // this pattern is needed for more than 16
+ let rb_f: Value = arguments[0].try_convert()?;
+ let infer_schema_length: Option<usize> = arguments[1].try_convert()?;
+ let chunk_size: usize = arguments[2].try_convert()?;
+ let has_header: bool = arguments[3].try_convert()?;
+ let ignore_errors: bool = arguments[4].try_convert()?;
+ let n_rows: Option<usize> = arguments[5].try_convert()?;
+ let skip_rows: usize = arguments[6].try_convert()?;
+ let projection: Option<Vec<usize>> = arguments[7].try_convert()?;
+ let sep: String = arguments[8].try_convert()?;
+ let rechunk: bool = arguments[9].try_convert()?;
+ let columns: Option<Vec<String>> = arguments[10].try_convert()?;
+ let encoding: Wrap<CsvEncoding> = arguments[11].try_convert()?;
+ let n_threads: Option<usize> = arguments[12].try_convert()?;
+ let path: Option<String> = arguments[13].try_convert()?;
+ let overwrite_dtype: Option<Vec<(String, Wrap<DataType>)>> = arguments[14].try_convert()?;
+ // TODO fix
+ let overwrite_dtype_slice: Option<Vec<Wrap<DataType>>> = None; // arguments[15].try_convert()?;
+ let low_memory: bool = arguments[16].try_convert()?;
+ let comment_char: Option<String> = arguments[17].try_convert()?;
+ let quote_char: Option<String> = arguments[18].try_convert()?;
+ let null_values: Option<Wrap<NullValues>> = arguments[19].try_convert()?;
+ let parse_dates: bool = arguments[20].try_convert()?;
+ let skip_rows_after_header: usize = arguments[21].try_convert()?;
+ let row_count: Option<(String, IdxSize)> = arguments[22].try_convert()?;
+ let sample_size: usize = arguments[23].try_convert()?;
+ let eol_char: String = arguments[24].try_convert()?;
+ // end arguments
+
+ let null_values = null_values.map(|w| w.0);
+ let comment_char = comment_char.map(|s| s.as_bytes()[0]);
+ let eol_char = eol_char.as_bytes()[0];
+
+ let row_count = row_count.map(|(name, offset)| RowCount { name, offset });
+
+ let quote_char = if let Some(s) = quote_char {
+ if s.is_empty() {
+ None
+ } else {
+ Some(s.as_bytes()[0])
+ }
+ } else {
+ None
+ };
+
+ let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| {
+ let fields = overwrite_dtype.iter().map(|(name, dtype)| {
+ let dtype = dtype.0.clone();
+ Field::new(name, dtype)
+ });
+ Schema::from(fields)
+ });
+
+ let overwrite_dtype_slice = overwrite_dtype_slice.map(|overwrite_dtype| {
+ overwrite_dtype
+ .iter()
+ .map(|dt| dt.0.clone())
+ .collect::<Vec<_>>()
+ });
+
let mmap_bytes_r = get_mmap_bytes_reader(rb_f)?;
let df = CsvReader::new(mmap_bytes_r)
+ .infer_schema(infer_schema_length)
.has_header(has_header)
+ .with_n_rows(n_rows)
+ .with_delimiter(sep.as_bytes()[0])
+ .with_skip_rows(skip_rows)
+ .with_ignore_parser_errors(ignore_errors)
+ .with_projection(projection)
+ .with_rechunk(rechunk)
+ .with_chunk_size(chunk_size)
+ .with_encoding(encoding.0)
+ .with_columns(columns)
+ .with_n_threads(n_threads)
+ .with_path(path)
+ .with_dtypes(overwrite_dtype.as_ref())
+ .with_dtypes_slice(overwrite_dtype_slice.as_deref())
+ .low_memory(low_memory)
+ .with_comment_char(comment_char)
+ .with_null_values(null_values)
+ .with_parse_dates(parse_dates)
+ .with_quote_char(quote_char)
+ .with_end_of_line_char(eol_char)
+ .with_skip_rows_after_header(skip_rows_after_header)
+ .with_row_count(row_count)
+ .sample_size(sample_size)
.finish()
.map_err(RbPolarsErr::from)?;
Ok(df.into())
}
@@ -59,10 +144,31 @@
.finish()
.map_err(RbPolarsErr::from)
.map(|v| v.into())
}
+ pub fn read_ipc(
+ rb_f: Value,
+ columns: Option<Vec<String>>,
+ projection: Option<Vec<usize>>,
+ n_rows: Option<usize>,
+ row_count: Option<(String, IdxSize)>,
+ memory_map: bool,
+ ) -> RbResult<Self> {
+ let row_count = row_count.map(|(name, offset)| RowCount { name, offset });
+ let mmap_bytes_r = get_mmap_bytes_reader(rb_f)?;
+ let df = IpcReader::new(mmap_bytes_r)
+ .with_projection(projection)
+ .with_columns(columns)
+ .with_n_rows(n_rows)
+ .with_row_count(row_count)
+ .memory_mapped(memory_map)
+ .finish()
+ .map_err(RbPolarsErr::from)?;
+ Ok(RbDataFrame::new(df))
+ }
+
pub fn read_json(rb_f: Value) -> RbResult<Self> {
// memmap the file first
let mmap_bytes_r = get_mmap_bytes_reader(rb_f)?;
let mmap_read: ReaderBytes = (&mmap_bytes_r).into();
let bytes = mmap_read.deref();
@@ -183,10 +289,32 @@
}
Ok(())
}
+ pub fn write_ipc(
+ &self,
+ rb_f: Value,
+ compression: Wrap<Option<IpcCompression>>,
+ ) -> RbResult<()> {
+ if let Ok(s) = rb_f.try_convert::<String>() {
+ let f = std::fs::File::create(&s).unwrap();
+ IpcWriter::new(f)
+ .with_compression(compression.0)
+ .finish(&mut self.df.borrow_mut())
+ .map_err(RbPolarsErr::from)?;
+ } else {
+ let mut buf = get_file_like(rb_f, true)?;
+
+ IpcWriter::new(&mut buf)
+ .with_compression(compression.0)
+ .finish(&mut self.df.borrow_mut())
+ .map_err(RbPolarsErr::from)?;
+ }
+ Ok(())
+ }
+
pub fn write_parquet(
&self,
rb_f: Value,
compression: String,
compression_level: Option<i32>,
@@ -238,15 +366,15 @@
.set_column_names(&names)
.map_err(RbPolarsErr::from)?;
Ok(())
}
- pub fn dtypes(&self) -> Vec<String> {
+ pub fn dtypes(&self) -> Vec<Value> {
self.df
.borrow()
.iter()
- .map(|s| s.dtype().to_string())
+ .map(|s| Wrap(s.dtype().clone()).into())
.collect()
}
pub fn n_chunks(&self) -> RbResult<usize> {
let n = self.df.borrow().n_chunks().map_err(RbPolarsErr::from)?;
@@ -416,10 +544,10 @@
self.df.borrow().partition_by_stable(groups)
} else {
self.df.borrow().partition_by(groups)
}
.map_err(RbPolarsErr::from)?;
- Ok(out.into_iter().map(|v| RbDataFrame::new(v)).collect())
+ Ok(out.into_iter().map(RbDataFrame::new).collect())
}
pub fn shift(&self, periods: i64) -> Self {
self.df.borrow().shift(periods).into()
}