pub(crate) mod anyvalue; mod chunked_array; use std::fmt::{Debug, Display, Formatter}; use std::hash::{Hash, Hasher}; use std::num::NonZeroUsize; use magnus::{ class, exception, prelude::*, r_hash::ForEach, value::Opaque, IntoValue, Module, RArray, RHash, Ruby, Symbol, TryConvert, Value, }; use polars::chunked_array::object::PolarsObjectSafe; use polars::chunked_array::ops::{FillNullLimit, FillNullStrategy}; use polars::datatypes::AnyValue; use polars::frame::row::Row; use polars::frame::NullStrategy; use polars::io::avro::AvroCompression; use polars::prelude::*; use polars::series::ops::NullBehavior; use polars_core::utils::arrow::array::Array; use polars_utils::total_ord::{TotalEq, TotalHash}; use smartstring::alias::String as SmartString; use crate::object::OBJECT_NAME; use crate::rb_modules::series; use crate::{RbDataFrame, RbLazyFrame, RbPolarsErr, RbResult, RbSeries, RbTypeError, RbValueError}; pub(crate) fn slice_to_wrapped(slice: &[T]) -> &[Wrap] { // Safety: // Wrap is transparent. unsafe { std::mem::transmute(slice) } } pub(crate) fn slice_extract_wrapped(slice: &[Wrap]) -> &[T] { // Safety: // Wrap is transparent. unsafe { std::mem::transmute(slice) } } pub(crate) fn vec_extract_wrapped(buf: Vec>) -> Vec { // Safety: // Wrap is transparent. unsafe { std::mem::transmute(buf) } } #[repr(transparent)] pub struct Wrap(pub T); impl Clone for Wrap where T: Clone, { fn clone(&self) -> Self { Wrap(self.0.clone()) } } impl From for Wrap { fn from(t: T) -> Self { Wrap(t) } } pub(crate) fn get_rbseq(obj: Value) -> RbResult<(RArray, usize)> { let seq = RArray::try_convert(obj)?; let len = seq.len(); Ok((seq, len)) } pub(crate) fn get_df(obj: Value) -> RbResult { let rbdf = obj.funcall::<_, _, &RbDataFrame>("_df", ())?; Ok(rbdf.df.borrow().clone()) } pub(crate) fn get_lf(obj: Value) -> RbResult { let rbdf = obj.funcall::<_, _, &RbLazyFrame>("_ldf", ())?; Ok(rbdf.ldf.clone()) } pub(crate) fn get_series(obj: Value) -> RbResult { let rbs = obj.funcall::<_, _, &RbSeries>("_s", ())?; Ok(rbs.series.borrow().clone()) } pub(crate) fn to_series(s: RbSeries) -> Value { let series = series(); series .funcall::<_, _, Value>("_from_rbseries", (s,)) .unwrap() } impl TryConvert for Wrap { fn try_convert(ob: Value) -> RbResult { if let Ok(s) = String::try_convert(ob) { Ok(Wrap(NullValues::AllColumnsSingle(s))) } else if let Ok(s) = Vec::::try_convert(ob) { Ok(Wrap(NullValues::AllColumns(s))) } else if let Ok(s) = Vec::<(String, String)>::try_convert(ob) { Ok(Wrap(NullValues::Named(s))) } else { Err(RbPolarsErr::other( "could not extract value from null_values argument".into(), )) } } } fn struct_dict<'a>(vals: impl Iterator>, flds: &[Field]) -> Value { let dict = RHash::new(); for (fld, val) in flds.iter().zip(vals) { dict.aset(fld.name().as_str(), Wrap(val)).unwrap() } dict.into_value() } impl IntoValue for Wrap { fn into_value_with(self, _: &Ruby) -> Value { let pl = crate::rb_modules::polars(); match self.0 { DataType::Int8 => { let class = pl.const_get::<_, Value>("Int8").unwrap(); class.funcall("new", ()).unwrap() } DataType::Int16 => { let class = pl.const_get::<_, Value>("Int16").unwrap(); class.funcall("new", ()).unwrap() } DataType::Int32 => { let class = pl.const_get::<_, Value>("Int32").unwrap(); class.funcall("new", ()).unwrap() } DataType::Int64 => { let class = pl.const_get::<_, Value>("Int64").unwrap(); class.funcall("new", ()).unwrap() } DataType::UInt8 => { let class = pl.const_get::<_, Value>("UInt8").unwrap(); class.funcall("new", ()).unwrap() } DataType::UInt16 => { let class = pl.const_get::<_, Value>("UInt16").unwrap(); class.funcall("new", ()).unwrap() } DataType::UInt32 => { let class = pl.const_get::<_, Value>("UInt32").unwrap(); class.funcall("new", ()).unwrap() } DataType::UInt64 => { let class = pl.const_get::<_, Value>("UInt64").unwrap(); class.funcall("new", ()).unwrap() } DataType::Float32 => { let class = pl.const_get::<_, Value>("Float32").unwrap(); class.funcall("new", ()).unwrap() } DataType::Float64 => { let class = pl.const_get::<_, Value>("Float64").unwrap(); class.funcall("new", ()).unwrap() } DataType::Decimal(precision, scale) => { let class = pl.const_get::<_, Value>("Decimal").unwrap(); class .funcall::<_, _, Value>("new", (precision, scale)) .unwrap() } DataType::Boolean => { let class = pl.const_get::<_, Value>("Boolean").unwrap(); class.funcall("new", ()).unwrap() } DataType::String => { let class = pl.const_get::<_, Value>("String").unwrap(); class.funcall("new", ()).unwrap() } DataType::Binary => { let class = pl.const_get::<_, Value>("Binary").unwrap(); class.funcall("new", ()).unwrap() } DataType::Array(inner, size) => { let class = pl.const_get::<_, Value>("Array").unwrap(); let inner = Wrap(*inner); let args = (inner, size); class.funcall::<_, _, Value>("new", args).unwrap() } DataType::List(inner) => { let class = pl.const_get::<_, Value>("List").unwrap(); let inner = Wrap(*inner); class.funcall::<_, _, Value>("new", (inner,)).unwrap() } DataType::Date => { let class = pl.const_get::<_, Value>("Date").unwrap(); class.funcall("new", ()).unwrap() } DataType::Datetime(tu, tz) => { let datetime_class = pl.const_get::<_, Value>("Datetime").unwrap(); datetime_class .funcall::<_, _, Value>("new", (tu.to_ascii(), tz)) .unwrap() } DataType::Duration(tu) => { let duration_class = pl.const_get::<_, Value>("Duration").unwrap(); duration_class .funcall::<_, _, Value>("new", (tu.to_ascii(),)) .unwrap() } DataType::Object(_, _) => { let class = pl.const_get::<_, Value>("Object").unwrap(); class.funcall("new", ()).unwrap() } DataType::Categorical(_, ordering) => { let class = pl.const_get::<_, Value>("Categorical").unwrap(); class.funcall("new", (Wrap(ordering),)).unwrap() } DataType::Enum(rev_map, _) => { // we should always have an initialized rev_map coming from rust let categories = rev_map.as_ref().unwrap().get_categories(); let class = pl.const_get::<_, Value>("Enum").unwrap(); let s = Series::from_arrow("category", categories.to_boxed()).unwrap(); let series = to_series(s.into()); class.funcall::<_, _, Value>("new", (series,)).unwrap() } DataType::Time => { let class = pl.const_get::<_, Value>("Time").unwrap(); class.funcall("new", ()).unwrap() } DataType::Struct(fields) => { let field_class = pl.const_get::<_, Value>("Field").unwrap(); let iter = fields.iter().map(|fld| { let name = fld.name().as_str(); let dtype = Wrap(fld.data_type().clone()); field_class .funcall::<_, _, Value>("new", (name, dtype)) .unwrap() }); let fields = RArray::from_iter(iter); let struct_class = pl.const_get::<_, Value>("Struct").unwrap(); struct_class .funcall::<_, _, Value>("new", (fields,)) .unwrap() } DataType::Null => { let class = pl.const_get::<_, Value>("Null").unwrap(); class.funcall("new", ()).unwrap() } DataType::Unknown => { let class = pl.const_get::<_, Value>("Unknown").unwrap(); class.funcall("new", ()).unwrap() } DataType::BinaryOffset => { unimplemented!() } } } } impl IntoValue for Wrap { fn into_value_with(self, _: &Ruby) -> Value { let ordering = match self.0 { CategoricalOrdering::Physical => "physical", CategoricalOrdering::Lexical => "lexical", }; ordering.into_value() } } impl IntoValue for Wrap { fn into_value_with(self, _: &Ruby) -> Value { let tu = match self.0 { TimeUnit::Nanoseconds => "ns", TimeUnit::Microseconds => "us", TimeUnit::Milliseconds => "ms", }; tu.into_value() } } impl TryConvert for Wrap { fn try_convert(ob: Value) -> RbResult { let name: String = ob.funcall("name", ())?; let dtype: Wrap = ob.funcall("dtype", ())?; Ok(Wrap(Field::new(&name, dtype.0))) } } impl TryConvert for Wrap { fn try_convert(ob: Value) -> RbResult { let dtype = if ob.is_kind_of(class::class()) { let name = ob.funcall::<_, _, String>("name", ())?; match name.as_str() { "Polars::UInt8" => DataType::UInt8, "Polars::UInt16" => DataType::UInt16, "Polars::UInt32" => DataType::UInt32, "Polars::UInt64" => DataType::UInt64, "Polars::Int8" => DataType::Int8, "Polars::Int16" => DataType::Int16, "Polars::Int32" => DataType::Int32, "Polars::Int64" => DataType::Int64, "Polars::String" => DataType::String, "Polars::Binary" => DataType::Binary, "Polars::Boolean" => DataType::Boolean, "Polars::Categorical" => DataType::Categorical(None, Default::default()), "Polars::Enum" => DataType::Enum(None, Default::default()), "Polars::Date" => DataType::Date, "Polars::Datetime" => DataType::Datetime(TimeUnit::Microseconds, None), "Polars::Time" => DataType::Time, "Polars::Duration" => DataType::Duration(TimeUnit::Microseconds), "Polars::Decimal" => DataType::Decimal(None, None), "Polars::Float32" => DataType::Float32, "Polars::Float64" => DataType::Float64, "Polars::Object" => DataType::Object(OBJECT_NAME, None), "Polars::List" => DataType::List(Box::new(DataType::Null)), "Polars::Null" => DataType::Null, "Polars::Unknown" => DataType::Unknown, dt => { return Err(RbValueError::new_err(format!( "{dt} is not a correct polars DataType.", ))) } } // TODO improve } else if String::try_convert(ob).is_err() { let name = unsafe { ob.class().name() }.into_owned(); match name.as_str() { "Polars::Int8" => DataType::Int8, "Polars::Int16" => DataType::Int16, "Polars::Int32" => DataType::Int32, "Polars::Int64" => DataType::Int64, "Polars::UInt8" => DataType::UInt8, "Polars::UInt16" => DataType::UInt16, "Polars::UInt32" => DataType::UInt32, "Polars::UInt64" => DataType::UInt64, "Polars::String" => DataType::String, "Polars::Binary" => DataType::Binary, "Polars::Boolean" => DataType::Boolean, "Polars::Categorical" => { let ordering = ob .funcall::<_, _, Wrap>("ordering", ())? .0; DataType::Categorical(None, ordering) } "Polars::Enum" => { let categories = ob.funcall("categories", ()).unwrap(); let s = get_series(categories)?; let ca = s.str().map_err(RbPolarsErr::from)?; let categories = ca.downcast_iter().next().unwrap().clone(); create_enum_data_type(categories) } "Polars::Date" => DataType::Date, "Polars::Time" => DataType::Time, "Polars::Float32" => DataType::Float32, "Polars::Float64" => DataType::Float64, "Polars::Null" => DataType::Null, "Polars::Unknown" => DataType::Unknown, "Polars::Duration" => { let time_unit: Value = ob.funcall("time_unit", ()).unwrap(); let time_unit = Wrap::::try_convert(time_unit)?.0; DataType::Duration(time_unit) } "Polars::Datetime" => { let time_unit: Value = ob.funcall("time_unit", ()).unwrap(); let time_unit = Wrap::::try_convert(time_unit)?.0; let time_zone = ob.funcall("time_zone", ())?; DataType::Datetime(time_unit, time_zone) } "Polars::Decimal" => { let precision = ob.funcall("precision", ())?; let scale = ob.funcall("scale", ())?; DataType::Decimal(precision, Some(scale)) } "Polars::List" => { let inner: Value = ob.funcall("inner", ()).unwrap(); let inner = Wrap::::try_convert(inner)?; DataType::List(Box::new(inner.0)) } "Polars::Struct" => { let arr: RArray = ob.funcall("fields", ())?; let mut fields = Vec::with_capacity(arr.len()); for v in arr.each() { fields.push(Wrap::::try_convert(v?)?.0); } DataType::Struct(fields) } dt => { return Err(RbTypeError::new_err(format!( "A {dt} object is not a correct polars DataType. \ Hint: use the class without instantiating it.", ))) } } } else { match String::try_convert(ob)?.as_str() { "u8" => DataType::UInt8, "u16" => DataType::UInt16, "u32" => DataType::UInt32, "u64" => DataType::UInt64, "i8" => DataType::Int8, "i16" => DataType::Int16, "i32" => DataType::Int32, "i64" => DataType::Int64, "str" => DataType::String, "bin" => DataType::Binary, "bool" => DataType::Boolean, "cat" => DataType::Categorical(None, Default::default()), "date" => DataType::Date, "datetime" => DataType::Datetime(TimeUnit::Microseconds, None), "f32" => DataType::Float32, "time" => DataType::Time, "dur" => DataType::Duration(TimeUnit::Microseconds), "f64" => DataType::Float64, "obj" => DataType::Object(OBJECT_NAME, None), "list" => DataType::List(Box::new(DataType::Boolean)), "null" => DataType::Null, "unk" => DataType::Unknown, _ => { return Err(RbValueError::new_err(format!( "{} is not a supported DataType.", ob ))) } } }; Ok(Wrap(dtype)) } } impl<'s> TryConvert for Wrap> { fn try_convert(ob: Value) -> RbResult { let mut vals: Vec>> = Vec::new(); for item in RArray::try_convert(ob)?.each() { vals.push(Wrap::>::try_convert(item?)?); } let vals: Vec = unsafe { std::mem::transmute(vals) }; Ok(Wrap(Row(vals))) } } impl TryConvert for Wrap { fn try_convert(ob: Value) -> RbResult { let dict = RHash::try_convert(ob)?; let mut schema = Vec::new(); dict.foreach(|key: String, val: Wrap| { schema.push(Ok(Field::new(&key, val.0))); Ok(ForEach::Continue) }) .unwrap(); Ok(Wrap(schema.into_iter().collect::>()?)) } } #[derive(Clone)] pub struct ObjectValue { pub inner: Opaque, } impl Debug for ObjectValue { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("ObjectValue") .field("inner", &self.to_object()) .finish() } } impl Hash for ObjectValue { fn hash(&self, state: &mut H) { let h = self .to_object() .funcall::<_, _, isize>("hash", ()) .expect("should be hashable"); state.write_isize(h) } } impl Eq for ObjectValue {} impl PartialEq for ObjectValue { fn eq(&self, other: &Self) -> bool { self.to_object().eql(other.to_object()).unwrap_or(false) } } impl TotalEq for ObjectValue { fn tot_eq(&self, other: &Self) -> bool { self == other } } impl TotalHash for ObjectValue { fn tot_hash(&self, state: &mut H) where H: Hasher, { self.hash(state); } } impl Display for ObjectValue { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.to_object()) } } impl PolarsObject for ObjectValue { fn type_name() -> &'static str { "object" } } impl From for ObjectValue { fn from(v: Value) -> Self { Self { inner: v.into() } } } impl TryConvert for ObjectValue { fn try_convert(ob: Value) -> RbResult { Ok(ObjectValue { inner: ob.into() }) } } impl From<&dyn PolarsObjectSafe> for &ObjectValue { fn from(val: &dyn PolarsObjectSafe) -> Self { unsafe { &*(val as *const dyn PolarsObjectSafe as *const ObjectValue) } } } // TODO remove impl ObjectValue { pub fn to_object(&self) -> Value { Ruby::get().unwrap().get_inner(self.inner) } } impl IntoValue for ObjectValue { fn into_value_with(self, _: &Ruby) -> Value { self.to_object() } } impl Default for ObjectValue { fn default() -> Self { ObjectValue { inner: Ruby::get().unwrap().qnil().as_value().into(), } } } pub(crate) fn dicts_to_rows( records: &Value, infer_schema_len: Option, schema_columns: PlIndexSet, ) -> RbResult<(Vec, Vec)> { let infer_schema_len = infer_schema_len.map(|n| std::cmp::max(1, n)); let (dicts, len) = get_rbseq(*records)?; let key_names = { if !schema_columns.is_empty() { schema_columns } else { let mut inferred_keys = PlIndexSet::new(); for d in dicts.each().take(infer_schema_len.unwrap_or(usize::MAX)) { let d = d?; let d = RHash::try_convert(d)?; d.foreach(|name: Value, _value: Value| { if let Some(v) = Symbol::from_value(name) { inferred_keys.insert(v.name()?.into()); } else { inferred_keys.insert(String::try_convert(name)?); }; Ok(ForEach::Continue) })?; } inferred_keys } }; let mut rows = Vec::with_capacity(len); for d in dicts.each() { let d = d?; let d = RHash::try_convert(d)?; let mut row = Vec::with_capacity(key_names.len()); for k in key_names.iter() { // TODO improve performance let val = match d.get(k.clone()).or_else(|| d.get(Symbol::new(k))) { None => AnyValue::Null, Some(val) => Wrap::::try_convert(val)?.0, }; row.push(val) } rows.push(Row(row)) } Ok((rows, key_names.into_iter().collect())) } impl TryConvert for Wrap { fn try_convert(ob: Value) -> RbResult { let parsed = match String::try_convert(ob)?.as_str() { "backward" => AsofStrategy::Backward, "forward" => AsofStrategy::Forward, v => { return Err(RbValueError::new_err(format!( "strategy must be one of {{'backward', 'forward'}}, got {}", v ))) } }; Ok(Wrap(parsed)) } } impl TryConvert for Wrap { fn try_convert(ob: Value) -> RbResult { let parsed = match String::try_convert(ob)?.as_str() { "linear" => InterpolationMethod::Linear, "nearest" => InterpolationMethod::Nearest, v => { return Err(RbValueError::new_err(format!( "method must be one of {{'linear', 'nearest'}}, got {v}", ))) } }; Ok(Wrap(parsed)) } } impl TryConvert for Wrap> { fn try_convert(ob: Value) -> RbResult { let parsed = match String::try_convert(ob)?.as_str() { "uncompressed" => None, "snappy" => Some(AvroCompression::Snappy), "deflate" => Some(AvroCompression::Deflate), v => { return Err(RbValueError::new_err(format!( "compression must be one of {{'uncompressed', 'snappy', 'deflate'}}, got {}", v ))) } }; Ok(Wrap(parsed)) } } impl TryConvert for Wrap { fn try_convert(ob: Value) -> RbResult { let parsed = match String::try_convert(ob)?.as_str() { "physical" => CategoricalOrdering::Physical, "lexical" => CategoricalOrdering::Lexical, v => { return Err(RbValueError::new_err(format!( "ordering must be one of {{'physical', 'lexical'}}, got {}", v ))) } }; Ok(Wrap(parsed)) } } impl TryConvert for Wrap { fn try_convert(ob: Value) -> RbResult { let parsed = match String::try_convert(ob)?.as_str() { "window" => StartBy::WindowBound, "datapoint" => StartBy::DataPoint, "monday" => StartBy::Monday, v => { return Err(RbValueError::new_err(format!( "closed must be one of {{'window', 'datapoint', 'monday'}}, got {v}", ))) } }; Ok(Wrap(parsed)) } } impl TryConvert for Wrap { fn try_convert(ob: Value) -> RbResult { let parsed = match String::try_convert(ob)?.as_str() { "left" => ClosedWindow::Left, "right" => ClosedWindow::Right, "both" => ClosedWindow::Both, "none" => ClosedWindow::None, v => { return Err(RbValueError::new_err(format!( "closed must be one of {{'left', 'right', 'both', 'none'}}, got {}", v ))) } }; Ok(Wrap(parsed)) } } impl TryConvert for Wrap { fn try_convert(ob: Value) -> RbResult { let parsed = match String::try_convert(ob)?.as_str() { "utf8" => CsvEncoding::Utf8, "utf8-lossy" => CsvEncoding::LossyUtf8, v => { return Err(RbValueError::new_err(format!( "encoding must be one of {{'utf8', 'utf8-lossy'}}, got {}", v ))) } }; Ok(Wrap(parsed)) } } impl TryConvert for Wrap> { fn try_convert(ob: Value) -> RbResult { let parsed = match String::try_convert(ob)?.as_str() { "uncompressed" => None, "lz4" => Some(IpcCompression::LZ4), "zstd" => Some(IpcCompression::ZSTD), v => { return Err(RbValueError::new_err(format!( "compression must be one of {{'uncompressed', 'lz4', 'zstd'}}, got {}", v ))) } }; Ok(Wrap(parsed)) } } impl TryConvert for Wrap { fn try_convert(ob: Value) -> RbResult { let parsed = match String::try_convert(ob)?.as_str() { "inner" => JoinType::Inner, "left" => JoinType::Left, "outer" => JoinType::Outer { coalesce: false }, "outer_coalesce" => JoinType::Outer { coalesce: true }, "semi" => JoinType::Semi, "anti" => JoinType::Anti, "cross" => JoinType::Cross, v => { return Err(RbValueError::new_err(format!( "how must be one of {{'inner', 'left', 'outer', 'semi', 'anti', 'cross'}}, got {}", v ))) } }; Ok(Wrap(parsed)) } } impl TryConvert for Wrap