ext/polars/src/dataframe.rs in polars-df-0.1.2 vs ext/polars/src/dataframe.rs in polars-df-0.1.3

- old
+ new

@@ -1,18 +1,16 @@ -use magnus::{r_hash::ForEach, Error, RArray, RHash, RString, Value}; +use magnus::{r_hash::ForEach, RArray, RHash, RString, Value}; use polars::io::mmap::ReaderBytes; use polars::io::RowCount; use polars::prelude::*; use std::cell::RefCell; -use std::fs::File; -use std::io::{BufReader, BufWriter, Cursor}; +use std::io::{BufWriter, Cursor}; use std::ops::Deref; -use std::path::PathBuf; use crate::conversion::*; use crate::file::{get_file_like, get_mmap_bytes_reader}; -use crate::series::to_rbseries_collection; +use crate::series::{to_rbseries_collection, to_series_collection}; use crate::{series, RbLazyFrame, RbPolarsErr, RbResult, RbSeries}; #[magnus::wrap(class = "Polars::RbDataFrame")] pub struct RbDataFrame { pub df: RefCell<DataFrame>, @@ -135,17 +133,31 @@ .finish() .map_err(RbPolarsErr::from)?; Ok(df.into()) } - pub fn read_parquet(path: PathBuf) -> RbResult<Self> { - let f = File::open(&path).map_err(|e| Error::runtime_error(e.to_string()))?; - let reader = BufReader::new(f); - ParquetReader::new(reader) + pub fn read_parquet( + rb_f: Value, + columns: Option<Vec<String>>, + projection: Option<Vec<usize>>, + n_rows: Option<usize>, + parallel: Wrap<ParallelStrategy>, + row_count: Option<(String, IdxSize)>, + low_memory: bool, + ) -> RbResult<Self> { + let row_count = row_count.map(|(name, offset)| RowCount { name, offset }); + let mmap_bytes_r = get_mmap_bytes_reader(rb_f)?; + let df = ParquetReader::new(mmap_bytes_r) + .with_projection(projection) + .with_columns(columns) + .read_parallel(parallel.0) + .with_n_rows(n_rows) + .with_row_count(row_count) + .set_low_memory(low_memory) .finish() - .map_err(RbPolarsErr::from) - .map(|v| v.into()) + .map_err(RbPolarsErr::from)?; + Ok(RbDataFrame::new(df)) } pub fn read_ipc( rb_f: Value, columns: Option<Vec<String>>, @@ -311,10 +323,59 @@ .map_err(RbPolarsErr::from)?; } Ok(()) } + pub fn row_tuple(&self, idx: i64) -> Value { + let idx = if idx < 0 { + (self.df.borrow().height() as i64 + idx) as usize + } else { + idx as usize + }; + RArray::from_vec( + self.df + .borrow() + .get_columns() + .iter() + .map(|s| match s.dtype() { + DataType::Object(_) => { + let obj: Option<&ObjectValue> = s.get_object(idx).map(|any| any.into()); + obj.unwrap().to_object() + } + _ => Wrap(s.get(idx)).into(), + }) + .collect(), + ) + .into() + } + + pub fn row_tuples(&self) -> Value { + let df = &self.df; + RArray::from_vec( + (0..df.borrow().height()) + .map(|idx| { + RArray::from_vec( + self.df + .borrow() + .get_columns() + .iter() + .map(|s| match s.dtype() { + DataType::Object(_) => { + let obj: Option<&ObjectValue> = + s.get_object(idx).map(|any| any.into()); + obj.unwrap().to_object() + } + _ => Wrap(s.get(idx)).into(), + }) + .collect(), + ) + }) + .collect(), + ) + .into() + } + pub fn write_parquet( &self, rb_f: Value, compression: String, compression_level: Option<i32>, @@ -336,10 +397,90 @@ } Ok(()) } + pub fn add(&self, s: &RbSeries) -> RbResult<Self> { + let df = (&*self.df.borrow() + &*s.series.borrow()).map_err(RbPolarsErr::from)?; + Ok(df.into()) + } + + pub fn sub(&self, s: &RbSeries) -> RbResult<Self> { + let df = (&*self.df.borrow() - &*s.series.borrow()).map_err(RbPolarsErr::from)?; + Ok(df.into()) + } + + pub fn div(&self, s: &RbSeries) -> RbResult<Self> { + let df = (&*self.df.borrow() / &*s.series.borrow()).map_err(RbPolarsErr::from)?; + Ok(df.into()) + } + + pub fn mul(&self, s: &RbSeries) -> RbResult<Self> { + let df = (&*self.df.borrow() * &*s.series.borrow()).map_err(RbPolarsErr::from)?; + Ok(df.into()) + } + + pub fn rem(&self, s: &RbSeries) -> RbResult<Self> { + let df = (&*self.df.borrow() % &*s.series.borrow()).map_err(RbPolarsErr::from)?; + Ok(df.into()) + } + + pub fn add_df(&self, s: &Self) -> RbResult<Self> { + let df = (&*self.df.borrow() + &*s.df.borrow()).map_err(RbPolarsErr::from)?; + Ok(df.into()) + } + + pub fn sub_df(&self, s: &Self) -> RbResult<Self> { + let df = (&*self.df.borrow() - &*s.df.borrow()).map_err(RbPolarsErr::from)?; + Ok(df.into()) + } + + pub fn div_df(&self, s: &Self) -> RbResult<Self> { + let df = (&*self.df.borrow() / &*s.df.borrow()).map_err(RbPolarsErr::from)?; + Ok(df.into()) + } + + pub fn mul_df(&self, s: &Self) -> RbResult<Self> { + let df = (&*self.df.borrow() * &*s.df.borrow()).map_err(RbPolarsErr::from)?; + Ok(df.into()) + } + + pub fn rem_df(&self, s: &Self) -> RbResult<Self> { + let df = (&*self.df.borrow() % &*s.df.borrow()).map_err(RbPolarsErr::from)?; + Ok(df.into()) + } + + pub fn sample_n( + &self, + n: usize, + with_replacement: bool, + shuffle: bool, + seed: Option<u64>, + ) -> RbResult<Self> { + let df = self + .df + .borrow() + .sample_n(n, with_replacement, shuffle, seed) + .map_err(RbPolarsErr::from)?; + Ok(df.into()) + } + + pub fn sample_frac( + &self, + frac: f64, + with_replacement: bool, + shuffle: bool, + seed: Option<u64>, + ) -> RbResult<Self> { + let df = self + .df + .borrow() + .sample_frac(frac, with_replacement, shuffle, seed) + .map_err(RbPolarsErr::from)?; + Ok(df.into()) + } + pub fn rechunk(&self) -> Self { self.df.borrow().agg_chunks().into() } pub fn to_s(&self) -> String { @@ -391,17 +532,88 @@ pub fn width(&self) -> usize { self.df.borrow().width() } + pub fn hstack_mut(&self, columns: RArray) -> RbResult<()> { + let columns = to_series_collection(columns)?; + self.df + .borrow_mut() + .hstack_mut(&columns) + .map_err(RbPolarsErr::from)?; + Ok(()) + } + + pub fn hstack(&self, columns: RArray) -> RbResult<Self> { + let columns = to_series_collection(columns)?; + let df = self + .df + .borrow() + .hstack(&columns) + .map_err(RbPolarsErr::from)?; + Ok(df.into()) + } + + pub fn extend(&self, df: &RbDataFrame) -> RbResult<()> { + self.df + .borrow_mut() + .extend(&df.df.borrow()) + .map_err(RbPolarsErr::from)?; + Ok(()) + } + + pub fn vstack_mut(&self, df: &RbDataFrame) -> RbResult<()> { + self.df + .borrow_mut() + .vstack_mut(&df.df.borrow()) + .map_err(RbPolarsErr::from)?; + Ok(()) + } + + pub fn vstack(&self, df: &RbDataFrame) -> RbResult<Self> { + let df = self + .df + .borrow() + .vstack(&df.df.borrow()) + .map_err(RbPolarsErr::from)?; + Ok(df.into()) + } + + pub fn drop_in_place(&self, name: String) -> RbResult<RbSeries> { + let s = self + .df + .borrow_mut() + .drop_in_place(&name) + .map_err(RbPolarsErr::from)?; + Ok(RbSeries::new(s)) + } + + pub fn drop_nulls(&self, subset: Option<Vec<String>>) -> RbResult<Self> { + let df = self + .df + .borrow() + .drop_nulls(subset.as_ref().map(|s| s.as_ref())) + .map_err(RbPolarsErr::from)?; + Ok(df.into()) + } + + pub fn drop(&self, name: String) -> RbResult<Self> { + let df = self.df.borrow().drop(&name).map_err(RbPolarsErr::from)?; + Ok(RbDataFrame::new(df)) + } + pub fn select_at_idx(&self, idx: usize) -> Option<RbSeries> { self.df .borrow() .select_at_idx(idx) .map(|s| RbSeries::new(s.clone())) } + pub fn find_idx_by_name(&self, name: String) -> Option<usize> { + self.df.borrow().find_idx_by_name(&name) + } + // TODO remove clone pub fn column(&self, name: String) -> RbResult<RbSeries> { self.df .borrow() .column(&name) @@ -698,9 +910,14 @@ Duration::parse(&offset), ) }; let out = out.map_err(RbPolarsErr::from)?; Ok(out.into()) + } + + pub fn to_struct(&self, name: String) -> RbSeries { + let s = self.df.borrow().clone().into_struct(&name); + s.into_series().into() } pub fn unnest(&self, names: Vec<String>) -> RbResult<Self> { let df = self.df.borrow().unnest(names).map_err(RbPolarsErr::from)?; Ok(df.into())