ext/polars/src/dataframe.rs in polars-df-0.1.4 vs ext/polars/src/dataframe.rs in polars-df-0.1.5

- old
+ new

@@ -1,17 +1,23 @@ use magnus::{r_hash::ForEach, RArray, RHash, RString, Value}; +use polars::io::avro::AvroCompression; use polars::io::mmap::ReaderBytes; use polars::io::RowCount; +use polars::prelude::pivot::{pivot, pivot_stable}; use polars::prelude::*; use std::cell::RefCell; use std::io::{BufWriter, Cursor}; use std::ops::Deref; +use crate::apply::dataframe::{ + apply_lambda_unknown, apply_lambda_with_bool_out_type, apply_lambda_with_primitive_out_type, + apply_lambda_with_utf8_out_type, +}; use crate::conversion::*; use crate::file::{get_file_like, get_mmap_bytes_reader}; use crate::series::{to_rbseries_collection, to_series_collection}; -use crate::{series, RbLazyFrame, RbPolarsErr, RbResult, RbSeries}; +use crate::{series, RbExpr, RbLazyFrame, RbPolarsErr, RbResult, RbSeries}; #[magnus::wrap(class = "Polars::RbDataFrame")] pub struct RbDataFrame { pub df: RefCell<DataFrame>, } @@ -177,10 +183,52 @@ .finish() .map_err(RbPolarsErr::from)?; Ok(RbDataFrame::new(df)) } + pub fn read_avro( + rb_f: Value, + columns: Option<Vec<String>>, + projection: Option<Vec<usize>>, + n_rows: Option<usize>, + ) -> RbResult<Self> { + use polars::io::avro::AvroReader; + + let file = get_file_like(rb_f, false)?; + let df = AvroReader::new(file) + .with_projection(projection) + .with_columns(columns) + .with_n_rows(n_rows) + .finish() + .map_err(RbPolarsErr::from)?; + Ok(RbDataFrame::new(df)) + } + + pub fn write_avro( + &self, + rb_f: Value, + compression: Wrap<Option<AvroCompression>>, + ) -> RbResult<()> { + use polars::io::avro::AvroWriter; + + if let Ok(s) = rb_f.try_convert::<String>() { + let f = std::fs::File::create(&s).unwrap(); + AvroWriter::new(f) + .with_compression(compression.0) + .finish(&mut self.df.borrow_mut()) + .map_err(RbPolarsErr::from)?; + } else { + let mut buf = get_file_like(rb_f, true)?; + AvroWriter::new(&mut buf) + .with_compression(compression.0) + .finish(&mut self.df.borrow_mut()) + .map_err(RbPolarsErr::from)?; + } + + Ok(()) + } + pub fn read_json(rb_f: Value) -> RbResult<Self> { // memmap the file first let mmap_bytes_r = get_mmap_bytes_reader(rb_f)?; let mmap_read: ReaderBytes = (&mmap_bytes_r).into(); let bytes = mmap_read.deref(); @@ -236,10 +284,18 @@ r.map_err(|e| RbPolarsErr::other(format!("{:?}", e)))?; Ok(()) } + pub fn read_hashes( + _dicts: Value, + _infer_schema_length: Option<usize>, + _schema_overwrite: Option<Wrap<Schema>>, + ) -> RbResult<Self> { + Err(RbPolarsErr::todo()) + } + pub fn read_hash(data: RHash) -> RbResult<Self> { let mut cols: Vec<Series> = Vec::new(); data.foreach(|name: String, values: Value| { let obj: Value = series().funcall("new", (name, values))?; let rbseries = obj.funcall::<_, _, &RbSeries>("_s", ())?; @@ -749,10 +805,35 @@ let df = self.df.borrow().melt2(args).map_err(RbPolarsErr::from)?; Ok(RbDataFrame::new(df)) } + pub fn pivot_expr( + &self, + values: Vec<String>, + index: Vec<String>, + columns: Vec<String>, + aggregate_expr: &RbExpr, + maintain_order: bool, + sort_columns: bool, + ) -> RbResult<Self> { + let fun = match maintain_order { + true => pivot_stable, + false => pivot, + }; + let df = fun( + &self.df.borrow(), + values, + index, + columns, + aggregate_expr.inner.clone(), + sort_columns, + ) + .map_err(RbPolarsErr::from)?; + Ok(RbDataFrame::new(df)) + } + pub fn partition_by(&self, groups: Vec<String>, stable: bool) -> RbResult<Vec<Self>> { let out = if stable { self.df.borrow().partition_by_stable(groups) } else { self.df.borrow().partition_by(groups) @@ -868,11 +949,75 @@ pub fn null_count(&self) -> Self { let df = self.df.borrow().null_count(); df.into() } + pub fn apply( + &self, + lambda: Value, + output_type: Option<Wrap<DataType>>, + inference_size: usize, + ) -> RbResult<(Value, bool)> { + let df = &self.df.borrow(); + + let output_type = output_type.map(|dt| dt.0); + let out = match output_type { + Some(DataType::Int32) => { + apply_lambda_with_primitive_out_type::<Int32Type>(df, lambda, 0, None).into_series() + } + Some(DataType::Int64) => { + apply_lambda_with_primitive_out_type::<Int64Type>(df, lambda, 0, None).into_series() + } + Some(DataType::UInt32) => { + apply_lambda_with_primitive_out_type::<UInt32Type>(df, lambda, 0, None) + .into_series() + } + Some(DataType::UInt64) => { + apply_lambda_with_primitive_out_type::<UInt64Type>(df, lambda, 0, None) + .into_series() + } + Some(DataType::Float32) => { + apply_lambda_with_primitive_out_type::<Float32Type>(df, lambda, 0, None) + .into_series() + } + Some(DataType::Float64) => { + apply_lambda_with_primitive_out_type::<Float64Type>(df, lambda, 0, None) + .into_series() + } + Some(DataType::Boolean) => { + apply_lambda_with_bool_out_type(df, lambda, 0, None).into_series() + } + Some(DataType::Date) => { + apply_lambda_with_primitive_out_type::<Int32Type>(df, lambda, 0, None) + .into_date() + .into_series() + } + Some(DataType::Datetime(tu, tz)) => { + apply_lambda_with_primitive_out_type::<Int64Type>(df, lambda, 0, None) + .into_datetime(tu, tz) + .into_series() + } + Some(DataType::Utf8) => { + apply_lambda_with_utf8_out_type(df, lambda, 0, None).into_series() + } + _ => return apply_lambda_unknown(df, lambda, inference_size), + }; + + Ok((RbSeries::from(out).into(), false)) + } + pub fn shrink_to_fit(&self) { self.df.borrow_mut().shrink_to_fit(); + } + + pub fn hash_rows(&self, k0: u64, k1: u64, k2: u64, k3: u64) -> RbResult<RbSeries> { + let hb = ahash::RandomState::with_seeds(k0, k1, k2, k3); + let hash = self + .df + .borrow_mut() + .hash_rows(Some(hb)) + .map_err(RbPolarsErr::from)?; + Ok(hash.into_series().into()) } pub fn transpose(&self, include_header: bool, names: String) -> RbResult<Self> { let mut df = self.df.borrow().transpose().map_err(RbPolarsErr::from)?; if include_header {