ext/polars/src/dataframe.rs in polars-df-0.1.2 vs ext/polars/src/dataframe.rs in polars-df-0.1.3
- old
+ new
@@ -1,18 +1,16 @@
-use magnus::{r_hash::ForEach, Error, RArray, RHash, RString, Value};
+use magnus::{r_hash::ForEach, RArray, RHash, RString, Value};
use polars::io::mmap::ReaderBytes;
use polars::io::RowCount;
use polars::prelude::*;
use std::cell::RefCell;
-use std::fs::File;
-use std::io::{BufReader, BufWriter, Cursor};
+use std::io::{BufWriter, Cursor};
use std::ops::Deref;
-use std::path::PathBuf;
use crate::conversion::*;
use crate::file::{get_file_like, get_mmap_bytes_reader};
-use crate::series::to_rbseries_collection;
+use crate::series::{to_rbseries_collection, to_series_collection};
use crate::{series, RbLazyFrame, RbPolarsErr, RbResult, RbSeries};
#[magnus::wrap(class = "Polars::RbDataFrame")]
pub struct RbDataFrame {
pub df: RefCell<DataFrame>,
@@ -135,17 +133,31 @@
.finish()
.map_err(RbPolarsErr::from)?;
Ok(df.into())
}
- pub fn read_parquet(path: PathBuf) -> RbResult<Self> {
- let f = File::open(&path).map_err(|e| Error::runtime_error(e.to_string()))?;
- let reader = BufReader::new(f);
- ParquetReader::new(reader)
+ pub fn read_parquet(
+ rb_f: Value,
+ columns: Option<Vec<String>>,
+ projection: Option<Vec<usize>>,
+ n_rows: Option<usize>,
+ parallel: Wrap<ParallelStrategy>,
+ row_count: Option<(String, IdxSize)>,
+ low_memory: bool,
+ ) -> RbResult<Self> {
+ let row_count = row_count.map(|(name, offset)| RowCount { name, offset });
+ let mmap_bytes_r = get_mmap_bytes_reader(rb_f)?;
+ let df = ParquetReader::new(mmap_bytes_r)
+ .with_projection(projection)
+ .with_columns(columns)
+ .read_parallel(parallel.0)
+ .with_n_rows(n_rows)
+ .with_row_count(row_count)
+ .set_low_memory(low_memory)
.finish()
- .map_err(RbPolarsErr::from)
- .map(|v| v.into())
+ .map_err(RbPolarsErr::from)?;
+ Ok(RbDataFrame::new(df))
}
pub fn read_ipc(
rb_f: Value,
columns: Option<Vec<String>>,
@@ -311,10 +323,59 @@
.map_err(RbPolarsErr::from)?;
}
Ok(())
}
+ pub fn row_tuple(&self, idx: i64) -> Value {
+ let idx = if idx < 0 {
+ (self.df.borrow().height() as i64 + idx) as usize
+ } else {
+ idx as usize
+ };
+ RArray::from_vec(
+ self.df
+ .borrow()
+ .get_columns()
+ .iter()
+ .map(|s| match s.dtype() {
+ DataType::Object(_) => {
+ let obj: Option<&ObjectValue> = s.get_object(idx).map(|any| any.into());
+ obj.unwrap().to_object()
+ }
+ _ => Wrap(s.get(idx)).into(),
+ })
+ .collect(),
+ )
+ .into()
+ }
+
+ pub fn row_tuples(&self) -> Value {
+ let df = &self.df;
+ RArray::from_vec(
+ (0..df.borrow().height())
+ .map(|idx| {
+ RArray::from_vec(
+ self.df
+ .borrow()
+ .get_columns()
+ .iter()
+ .map(|s| match s.dtype() {
+ DataType::Object(_) => {
+ let obj: Option<&ObjectValue> =
+ s.get_object(idx).map(|any| any.into());
+ obj.unwrap().to_object()
+ }
+ _ => Wrap(s.get(idx)).into(),
+ })
+ .collect(),
+ )
+ })
+ .collect(),
+ )
+ .into()
+ }
+
pub fn write_parquet(
&self,
rb_f: Value,
compression: String,
compression_level: Option<i32>,
@@ -336,10 +397,90 @@
}
Ok(())
}
+ pub fn add(&self, s: &RbSeries) -> RbResult<Self> {
+ let df = (&*self.df.borrow() + &*s.series.borrow()).map_err(RbPolarsErr::from)?;
+ Ok(df.into())
+ }
+
+ pub fn sub(&self, s: &RbSeries) -> RbResult<Self> {
+ let df = (&*self.df.borrow() - &*s.series.borrow()).map_err(RbPolarsErr::from)?;
+ Ok(df.into())
+ }
+
+ pub fn div(&self, s: &RbSeries) -> RbResult<Self> {
+ let df = (&*self.df.borrow() / &*s.series.borrow()).map_err(RbPolarsErr::from)?;
+ Ok(df.into())
+ }
+
+ pub fn mul(&self, s: &RbSeries) -> RbResult<Self> {
+ let df = (&*self.df.borrow() * &*s.series.borrow()).map_err(RbPolarsErr::from)?;
+ Ok(df.into())
+ }
+
+ pub fn rem(&self, s: &RbSeries) -> RbResult<Self> {
+ let df = (&*self.df.borrow() % &*s.series.borrow()).map_err(RbPolarsErr::from)?;
+ Ok(df.into())
+ }
+
+ pub fn add_df(&self, s: &Self) -> RbResult<Self> {
+ let df = (&*self.df.borrow() + &*s.df.borrow()).map_err(RbPolarsErr::from)?;
+ Ok(df.into())
+ }
+
+ pub fn sub_df(&self, s: &Self) -> RbResult<Self> {
+ let df = (&*self.df.borrow() - &*s.df.borrow()).map_err(RbPolarsErr::from)?;
+ Ok(df.into())
+ }
+
+ pub fn div_df(&self, s: &Self) -> RbResult<Self> {
+ let df = (&*self.df.borrow() / &*s.df.borrow()).map_err(RbPolarsErr::from)?;
+ Ok(df.into())
+ }
+
+ pub fn mul_df(&self, s: &Self) -> RbResult<Self> {
+ let df = (&*self.df.borrow() * &*s.df.borrow()).map_err(RbPolarsErr::from)?;
+ Ok(df.into())
+ }
+
+ pub fn rem_df(&self, s: &Self) -> RbResult<Self> {
+ let df = (&*self.df.borrow() % &*s.df.borrow()).map_err(RbPolarsErr::from)?;
+ Ok(df.into())
+ }
+
+ pub fn sample_n(
+ &self,
+ n: usize,
+ with_replacement: bool,
+ shuffle: bool,
+ seed: Option<u64>,
+ ) -> RbResult<Self> {
+ let df = self
+ .df
+ .borrow()
+ .sample_n(n, with_replacement, shuffle, seed)
+ .map_err(RbPolarsErr::from)?;
+ Ok(df.into())
+ }
+
+ pub fn sample_frac(
+ &self,
+ frac: f64,
+ with_replacement: bool,
+ shuffle: bool,
+ seed: Option<u64>,
+ ) -> RbResult<Self> {
+ let df = self
+ .df
+ .borrow()
+ .sample_frac(frac, with_replacement, shuffle, seed)
+ .map_err(RbPolarsErr::from)?;
+ Ok(df.into())
+ }
+
pub fn rechunk(&self) -> Self {
self.df.borrow().agg_chunks().into()
}
pub fn to_s(&self) -> String {
@@ -391,17 +532,88 @@
pub fn width(&self) -> usize {
self.df.borrow().width()
}
+ pub fn hstack_mut(&self, columns: RArray) -> RbResult<()> {
+ let columns = to_series_collection(columns)?;
+ self.df
+ .borrow_mut()
+ .hstack_mut(&columns)
+ .map_err(RbPolarsErr::from)?;
+ Ok(())
+ }
+
+ pub fn hstack(&self, columns: RArray) -> RbResult<Self> {
+ let columns = to_series_collection(columns)?;
+ let df = self
+ .df
+ .borrow()
+ .hstack(&columns)
+ .map_err(RbPolarsErr::from)?;
+ Ok(df.into())
+ }
+
+ pub fn extend(&self, df: &RbDataFrame) -> RbResult<()> {
+ self.df
+ .borrow_mut()
+ .extend(&df.df.borrow())
+ .map_err(RbPolarsErr::from)?;
+ Ok(())
+ }
+
+ pub fn vstack_mut(&self, df: &RbDataFrame) -> RbResult<()> {
+ self.df
+ .borrow_mut()
+ .vstack_mut(&df.df.borrow())
+ .map_err(RbPolarsErr::from)?;
+ Ok(())
+ }
+
+ pub fn vstack(&self, df: &RbDataFrame) -> RbResult<Self> {
+ let df = self
+ .df
+ .borrow()
+ .vstack(&df.df.borrow())
+ .map_err(RbPolarsErr::from)?;
+ Ok(df.into())
+ }
+
+ pub fn drop_in_place(&self, name: String) -> RbResult<RbSeries> {
+ let s = self
+ .df
+ .borrow_mut()
+ .drop_in_place(&name)
+ .map_err(RbPolarsErr::from)?;
+ Ok(RbSeries::new(s))
+ }
+
+ pub fn drop_nulls(&self, subset: Option<Vec<String>>) -> RbResult<Self> {
+ let df = self
+ .df
+ .borrow()
+ .drop_nulls(subset.as_ref().map(|s| s.as_ref()))
+ .map_err(RbPolarsErr::from)?;
+ Ok(df.into())
+ }
+
+ pub fn drop(&self, name: String) -> RbResult<Self> {
+ let df = self.df.borrow().drop(&name).map_err(RbPolarsErr::from)?;
+ Ok(RbDataFrame::new(df))
+ }
+
pub fn select_at_idx(&self, idx: usize) -> Option<RbSeries> {
self.df
.borrow()
.select_at_idx(idx)
.map(|s| RbSeries::new(s.clone()))
}
+ pub fn find_idx_by_name(&self, name: String) -> Option<usize> {
+ self.df.borrow().find_idx_by_name(&name)
+ }
+
// TODO remove clone
pub fn column(&self, name: String) -> RbResult<RbSeries> {
self.df
.borrow()
.column(&name)
@@ -698,9 +910,14 @@
Duration::parse(&offset),
)
};
let out = out.map_err(RbPolarsErr::from)?;
Ok(out.into())
+ }
+
+ pub fn to_struct(&self, name: String) -> RbSeries {
+ let s = self.df.borrow().clone().into_struct(&name);
+ s.into_series().into()
}
pub fn unnest(&self, names: Vec<String>) -> RbResult<Self> {
let df = self.df.borrow().unnest(names).map_err(RbPolarsErr::from)?;
Ok(df.into())