ext/polars/src/dataframe.rs in polars-df-0.1.4 vs ext/polars/src/dataframe.rs in polars-df-0.1.5
- old
+ new
@@ -1,17 +1,23 @@
use magnus::{r_hash::ForEach, RArray, RHash, RString, Value};
+use polars::io::avro::AvroCompression;
use polars::io::mmap::ReaderBytes;
use polars::io::RowCount;
+use polars::prelude::pivot::{pivot, pivot_stable};
use polars::prelude::*;
use std::cell::RefCell;
use std::io::{BufWriter, Cursor};
use std::ops::Deref;
+use crate::apply::dataframe::{
+ apply_lambda_unknown, apply_lambda_with_bool_out_type, apply_lambda_with_primitive_out_type,
+ apply_lambda_with_utf8_out_type,
+};
use crate::conversion::*;
use crate::file::{get_file_like, get_mmap_bytes_reader};
use crate::series::{to_rbseries_collection, to_series_collection};
-use crate::{series, RbLazyFrame, RbPolarsErr, RbResult, RbSeries};
+use crate::{series, RbExpr, RbLazyFrame, RbPolarsErr, RbResult, RbSeries};
#[magnus::wrap(class = "Polars::RbDataFrame")]
pub struct RbDataFrame {
pub df: RefCell<DataFrame>,
}
@@ -177,10 +183,52 @@
.finish()
.map_err(RbPolarsErr::from)?;
Ok(RbDataFrame::new(df))
}
+ pub fn read_avro(
+ rb_f: Value,
+ columns: Option<Vec<String>>,
+ projection: Option<Vec<usize>>,
+ n_rows: Option<usize>,
+ ) -> RbResult<Self> {
+ use polars::io::avro::AvroReader;
+
+ let file = get_file_like(rb_f, false)?;
+ let df = AvroReader::new(file)
+ .with_projection(projection)
+ .with_columns(columns)
+ .with_n_rows(n_rows)
+ .finish()
+ .map_err(RbPolarsErr::from)?;
+ Ok(RbDataFrame::new(df))
+ }
+
+ pub fn write_avro(
+ &self,
+ rb_f: Value,
+ compression: Wrap<Option<AvroCompression>>,
+ ) -> RbResult<()> {
+ use polars::io::avro::AvroWriter;
+
+ if let Ok(s) = rb_f.try_convert::<String>() {
+ let f = std::fs::File::create(&s).unwrap();
+ AvroWriter::new(f)
+ .with_compression(compression.0)
+ .finish(&mut self.df.borrow_mut())
+ .map_err(RbPolarsErr::from)?;
+ } else {
+ let mut buf = get_file_like(rb_f, true)?;
+ AvroWriter::new(&mut buf)
+ .with_compression(compression.0)
+ .finish(&mut self.df.borrow_mut())
+ .map_err(RbPolarsErr::from)?;
+ }
+
+ Ok(())
+ }
+
pub fn read_json(rb_f: Value) -> RbResult<Self> {
// memmap the file first
let mmap_bytes_r = get_mmap_bytes_reader(rb_f)?;
let mmap_read: ReaderBytes = (&mmap_bytes_r).into();
let bytes = mmap_read.deref();
@@ -236,10 +284,18 @@
r.map_err(|e| RbPolarsErr::other(format!("{:?}", e)))?;
Ok(())
}
+ pub fn read_hashes(
+ _dicts: Value,
+ _infer_schema_length: Option<usize>,
+ _schema_overwrite: Option<Wrap<Schema>>,
+ ) -> RbResult<Self> {
+ Err(RbPolarsErr::todo())
+ }
+
pub fn read_hash(data: RHash) -> RbResult<Self> {
let mut cols: Vec<Series> = Vec::new();
data.foreach(|name: String, values: Value| {
let obj: Value = series().funcall("new", (name, values))?;
let rbseries = obj.funcall::<_, _, &RbSeries>("_s", ())?;
@@ -749,10 +805,35 @@
let df = self.df.borrow().melt2(args).map_err(RbPolarsErr::from)?;
Ok(RbDataFrame::new(df))
}
+ pub fn pivot_expr(
+ &self,
+ values: Vec<String>,
+ index: Vec<String>,
+ columns: Vec<String>,
+ aggregate_expr: &RbExpr,
+ maintain_order: bool,
+ sort_columns: bool,
+ ) -> RbResult<Self> {
+ let fun = match maintain_order {
+ true => pivot_stable,
+ false => pivot,
+ };
+ let df = fun(
+ &self.df.borrow(),
+ values,
+ index,
+ columns,
+ aggregate_expr.inner.clone(),
+ sort_columns,
+ )
+ .map_err(RbPolarsErr::from)?;
+ Ok(RbDataFrame::new(df))
+ }
+
pub fn partition_by(&self, groups: Vec<String>, stable: bool) -> RbResult<Vec<Self>> {
let out = if stable {
self.df.borrow().partition_by_stable(groups)
} else {
self.df.borrow().partition_by(groups)
@@ -868,11 +949,75 @@
pub fn null_count(&self) -> Self {
let df = self.df.borrow().null_count();
df.into()
}
+ pub fn apply(
+ &self,
+ lambda: Value,
+ output_type: Option<Wrap<DataType>>,
+ inference_size: usize,
+ ) -> RbResult<(Value, bool)> {
+ let df = &self.df.borrow();
+
+ let output_type = output_type.map(|dt| dt.0);
+ let out = match output_type {
+ Some(DataType::Int32) => {
+ apply_lambda_with_primitive_out_type::<Int32Type>(df, lambda, 0, None).into_series()
+ }
+ Some(DataType::Int64) => {
+ apply_lambda_with_primitive_out_type::<Int64Type>(df, lambda, 0, None).into_series()
+ }
+ Some(DataType::UInt32) => {
+ apply_lambda_with_primitive_out_type::<UInt32Type>(df, lambda, 0, None)
+ .into_series()
+ }
+ Some(DataType::UInt64) => {
+ apply_lambda_with_primitive_out_type::<UInt64Type>(df, lambda, 0, None)
+ .into_series()
+ }
+ Some(DataType::Float32) => {
+ apply_lambda_with_primitive_out_type::<Float32Type>(df, lambda, 0, None)
+ .into_series()
+ }
+ Some(DataType::Float64) => {
+ apply_lambda_with_primitive_out_type::<Float64Type>(df, lambda, 0, None)
+ .into_series()
+ }
+ Some(DataType::Boolean) => {
+ apply_lambda_with_bool_out_type(df, lambda, 0, None).into_series()
+ }
+ Some(DataType::Date) => {
+ apply_lambda_with_primitive_out_type::<Int32Type>(df, lambda, 0, None)
+ .into_date()
+ .into_series()
+ }
+ Some(DataType::Datetime(tu, tz)) => {
+ apply_lambda_with_primitive_out_type::<Int64Type>(df, lambda, 0, None)
+ .into_datetime(tu, tz)
+ .into_series()
+ }
+ Some(DataType::Utf8) => {
+ apply_lambda_with_utf8_out_type(df, lambda, 0, None).into_series()
+ }
+ _ => return apply_lambda_unknown(df, lambda, inference_size),
+ };
+
+ Ok((RbSeries::from(out).into(), false))
+ }
+
pub fn shrink_to_fit(&self) {
self.df.borrow_mut().shrink_to_fit();
+ }
+
+ pub fn hash_rows(&self, k0: u64, k1: u64, k2: u64, k3: u64) -> RbResult<RbSeries> {
+ let hb = ahash::RandomState::with_seeds(k0, k1, k2, k3);
+ let hash = self
+ .df
+ .borrow_mut()
+ .hash_rows(Some(hb))
+ .map_err(RbPolarsErr::from)?;
+ Ok(hash.into_series().into())
}
pub fn transpose(&self, include_header: bool, names: String) -> RbResult<Self> {
let mut df = self.df.borrow().transpose().map_err(RbPolarsErr::from)?;
if include_header {