use magnus::{IntoValue, RArray, RHash, TryConvert, Value}; use polars::io::{HiveOptions, RowIndex}; use polars::lazy::frame::LazyFrame; use polars::prelude::*; use std::cell::RefCell; use std::io::{BufWriter, Read}; use std::num::NonZeroUsize; use std::path::PathBuf; use crate::conversion::*; use crate::expr::rb_exprs_to_exprs; use crate::file::get_file_like; use crate::{RbDataFrame, RbExpr, RbLazyGroupBy, RbPolarsErr, RbResult, RbValueError}; #[magnus::wrap(class = "Polars::RbLazyFrame")] #[derive(Clone)] pub struct RbLazyFrame { pub ldf: LazyFrame, } impl RbLazyFrame { fn get_schema(&self) -> RbResult { let schema = self.ldf.schema().map_err(RbPolarsErr::from)?; Ok(schema) } } impl From for RbLazyFrame { fn from(ldf: LazyFrame) -> Self { RbLazyFrame { ldf } } } impl RbLazyFrame { pub fn read_json(rb_f: Value) -> RbResult { // it is faster to first read to memory and then parse: https://github.com/serde-rs/json/issues/160 // so don't bother with files. let mut json = String::new(); let _ = get_file_like(rb_f, false)? .read_to_string(&mut json) .unwrap(); // Safety // we skipped the serializing/deserializing of the static in lifetime in `DataType` // so we actually don't have a lifetime at all when serializing. // &str still has a lifetime. Bit its ok, because we drop it immediately // in this scope let json = unsafe { std::mem::transmute::<&'_ str, &'static str>(json.as_str()) }; let lp = serde_json::from_str::(json) .map_err(|err| RbValueError::new_err(format!("{:?}", err)))?; Ok(LazyFrame::from(lp).into()) } pub fn new_from_ndjson( path: String, infer_schema_length: Option, batch_size: Option>, n_rows: Option, low_memory: bool, rechunk: bool, row_index: Option<(String, IdxSize)>, ) -> RbResult { let batch_size = batch_size.map(|v| v.0); let row_index = row_index.map(|(name, offset)| RowIndex { name: Arc::from(name.as_str()), offset, }); let lf = LazyJsonLineReader::new(path) .with_infer_schema_length(infer_schema_length) .with_batch_size(batch_size) .with_n_rows(n_rows) .low_memory(low_memory) .with_rechunk(rechunk) .with_row_index(row_index) .finish() .map_err(RbPolarsErr::from)?; Ok(lf.into()) } pub fn new_from_csv(arguments: &[Value]) -> RbResult { // start arguments // this pattern is needed for more than 16 let path = String::try_convert(arguments[0])?; let separator = String::try_convert(arguments[1])?; let has_header = bool::try_convert(arguments[2])?; let ignore_errors = bool::try_convert(arguments[3])?; let skip_rows = usize::try_convert(arguments[4])?; let n_rows = Option::::try_convert(arguments[5])?; let cache = bool::try_convert(arguments[6])?; let overwrite_dtype = Option::)>>::try_convert(arguments[7])?; let low_memory = bool::try_convert(arguments[8])?; let comment_prefix = Option::::try_convert(arguments[9])?; let quote_char = Option::::try_convert(arguments[10])?; let null_values = Option::>::try_convert(arguments[11])?; let infer_schema_length = Option::::try_convert(arguments[12])?; let with_schema_modify = Option::::try_convert(arguments[13])?; let rechunk = bool::try_convert(arguments[14])?; let skip_rows_after_header = usize::try_convert(arguments[15])?; let encoding = Wrap::::try_convert(arguments[16])?; let row_index = Option::<(String, IdxSize)>::try_convert(arguments[17])?; let try_parse_dates = bool::try_convert(arguments[18])?; let eol_char = String::try_convert(arguments[19])?; let truncate_ragged_lines = bool::try_convert(arguments[20])?; // end arguments let null_values = null_values.map(|w| w.0); let quote_char = quote_char.map(|s| s.as_bytes()[0]); let separator = separator.as_bytes()[0]; let eol_char = eol_char.as_bytes()[0]; let row_index = row_index.map(|(name, offset)| RowIndex { name: Arc::from(name.as_str()), offset, }); let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| { overwrite_dtype .into_iter() .map(|(name, dtype)| Field::new(&name, dtype.0)) .collect::() }); let r = LazyCsvReader::new(path) .with_infer_schema_length(infer_schema_length) .with_separator(separator) .with_has_header(has_header) .with_ignore_errors(ignore_errors) .with_skip_rows(skip_rows) .with_n_rows(n_rows) .with_cache(cache) .with_dtype_overwrite(overwrite_dtype.map(Arc::new)) // TODO add with_schema .with_low_memory(low_memory) .with_comment_prefix(comment_prefix.as_deref()) .with_quote_char(quote_char) .with_eol_char(eol_char) .with_rechunk(rechunk) .with_skip_rows_after_header(skip_rows_after_header) .with_encoding(encoding.0) .with_row_index(row_index) .with_try_parse_dates(try_parse_dates) .with_null_values(null_values) // TODO add with_missing_is_null .with_truncate_ragged_lines(truncate_ragged_lines); if let Some(_lambda) = with_schema_modify { todo!(); } Ok(r.finish().map_err(RbPolarsErr::from)?.into()) } #[allow(clippy::too_many_arguments)] pub fn new_from_parquet( path: Option, paths: Vec, n_rows: Option, cache: bool, parallel: Wrap, rechunk: bool, row_index: Option<(String, IdxSize)>, low_memory: bool, use_statistics: bool, hive_partitioning: bool, hive_schema: Option>, glob: bool, ) -> RbResult { let parallel = parallel.0; let hive_schema = hive_schema.map(|s| Arc::new(s.0)); let first_path = if let Some(path) = &path { path } else { paths .first() .ok_or_else(|| RbValueError::new_err("expected a path argument".to_string()))? }; let row_index = row_index.map(|(name, offset)| RowIndex { name: Arc::from(name.as_str()), offset, }); let hive_options = HiveOptions { enabled: hive_partitioning, schema: hive_schema, }; let args = ScanArgsParquet { n_rows, cache, parallel, rechunk, row_index, low_memory, cloud_options: None, use_statistics, hive_options, glob, }; let lf = if path.is_some() { LazyFrame::scan_parquet(first_path, args) } else { LazyFrame::scan_parquet_files(Arc::from(paths), args) } .map_err(RbPolarsErr::from)?; Ok(lf.into()) } pub fn new_from_ipc( path: String, n_rows: Option, cache: bool, rechunk: bool, row_index: Option<(String, IdxSize)>, memory_map: bool, ) -> RbResult { let row_index = row_index.map(|(name, offset)| RowIndex { name: Arc::from(name.as_str()), offset, }); let args = ScanArgsIpc { n_rows, cache, rechunk, row_index, memory_map, cloud_options: None, }; let lf = LazyFrame::scan_ipc(path, args).map_err(RbPolarsErr::from)?; Ok(lf.into()) } pub fn write_json(&self, rb_f: Value) -> RbResult<()> { let file = BufWriter::new(get_file_like(rb_f, true)?); serde_json::to_writer(file, &self.ldf.logical_plan) .map_err(|err| RbValueError::new_err(format!("{:?}", err)))?; Ok(()) } pub fn describe_plan(&self) -> RbResult { self.ldf .describe_plan() .map_err(RbPolarsErr::from) .map_err(Into::into) } pub fn describe_optimized_plan(&self) -> RbResult { let result = self .ldf .describe_optimized_plan() .map_err(RbPolarsErr::from)?; Ok(result) } #[allow(clippy::too_many_arguments)] pub fn optimization_toggle( &self, type_coercion: bool, predicate_pushdown: bool, projection_pushdown: bool, simplify_expr: bool, slice_pushdown: bool, comm_subplan_elim: bool, comm_subexpr_elim: bool, allow_streaming: bool, _eager: bool, ) -> RbLazyFrame { let ldf = self.ldf.clone(); let mut ldf = ldf .with_type_coercion(type_coercion) .with_predicate_pushdown(predicate_pushdown) .with_simplify_expr(simplify_expr) .with_slice_pushdown(slice_pushdown) .with_streaming(allow_streaming) ._with_eager(_eager) .with_projection_pushdown(projection_pushdown); ldf = ldf.with_comm_subplan_elim(comm_subplan_elim); ldf = ldf.with_comm_subexpr_elim(comm_subexpr_elim); ldf.into() } pub fn sort( &self, by_column: String, descending: bool, nulls_last: bool, maintain_order: bool, multithreaded: bool, ) -> Self { let ldf = self.ldf.clone(); ldf.sort( [&by_column], SortMultipleOptions { descending: vec![descending], nulls_last, multithreaded, maintain_order, }, ) .into() } pub fn sort_by_exprs( &self, by: RArray, descending: Vec, nulls_last: bool, maintain_order: bool, multithreaded: bool, ) -> RbResult { let ldf = self.ldf.clone(); let exprs = rb_exprs_to_exprs(by)?; Ok(ldf .sort_by_exprs( exprs, SortMultipleOptions { descending, nulls_last, maintain_order, multithreaded, }, ) .into()) } pub fn cache(&self) -> Self { let ldf = self.ldf.clone(); ldf.cache().into() } pub fn collect(&self) -> RbResult { let ldf = self.ldf.clone(); let df = ldf.collect().map_err(RbPolarsErr::from)?; Ok(df.into()) } #[allow(clippy::too_many_arguments)] pub fn sink_parquet( &self, path: PathBuf, compression: String, compression_level: Option, statistics: bool, row_group_size: Option, data_pagesize_limit: Option, maintain_order: bool, ) -> RbResult<()> { let compression = parse_parquet_compression(&compression, compression_level)?; let options = ParquetWriteOptions { compression, statistics, row_group_size, data_pagesize_limit, maintain_order, }; let ldf = self.ldf.clone(); ldf.sink_parquet(path, options).map_err(RbPolarsErr::from)?; Ok(()) } pub fn sink_ipc( &self, path: PathBuf, compression: Option>, maintain_order: bool, ) -> RbResult<()> { let options = IpcWriterOptions { compression: compression.map(|c| c.0), maintain_order, }; let ldf = self.ldf.clone(); ldf.sink_ipc(path, options).map_err(RbPolarsErr::from)?; Ok(()) } #[allow(clippy::too_many_arguments)] pub fn sink_csv( &self, path: PathBuf, include_bom: bool, include_header: bool, separator: u8, line_terminator: String, quote_char: u8, batch_size: Wrap, datetime_format: Option, date_format: Option, time_format: Option, float_precision: Option, null_value: Option, quote_style: Option>, maintain_order: bool, ) -> RbResult<()> { let quote_style = quote_style.map_or(QuoteStyle::default(), |wrap| wrap.0); let null_value = null_value.unwrap_or(SerializeOptions::default().null); let serialize_options = SerializeOptions { date_format, time_format, datetime_format, float_precision, separator, quote_char, null: null_value, line_terminator, quote_style, }; let options = CsvWriterOptions { include_bom, include_header, maintain_order, batch_size: batch_size.0, serialize_options, }; let ldf = self.ldf.clone(); ldf.sink_csv(path, options).map_err(RbPolarsErr::from)?; Ok(()) } pub fn sink_json(&self, path: PathBuf, maintain_order: bool) -> RbResult<()> { let options = JsonWriterOptions { maintain_order }; let ldf = self.ldf.clone(); ldf.sink_json(path, options).map_err(RbPolarsErr::from)?; Ok(()) } pub fn fetch(&self, n_rows: usize) -> RbResult { let ldf = self.ldf.clone(); let df = ldf.fetch(n_rows).map_err(RbPolarsErr::from)?; Ok(df.into()) } pub fn filter(&self, predicate: &RbExpr) -> Self { let ldf = self.ldf.clone(); ldf.filter(predicate.inner.clone()).into() } pub fn select(&self, exprs: RArray) -> RbResult { let ldf = self.ldf.clone(); let exprs = rb_exprs_to_exprs(exprs)?; Ok(ldf.select(exprs).into()) } pub fn select_seq(&self, exprs: RArray) -> RbResult { let ldf = self.ldf.clone(); let exprs = rb_exprs_to_exprs(exprs)?; Ok(ldf.select_seq(exprs).into()) } pub fn group_by(&self, by: RArray, maintain_order: bool) -> RbResult { let ldf = self.ldf.clone(); let by = rb_exprs_to_exprs(by)?; let lazy_gb = if maintain_order { ldf.group_by_stable(by) } else { ldf.group_by(by) }; Ok(RbLazyGroupBy { lgb: RefCell::new(Some(lazy_gb)), }) } pub fn rolling( &self, index_column: &RbExpr, period: String, offset: String, closed: Wrap, by: RArray, check_sorted: bool, ) -> RbResult { let closed_window = closed.0; let ldf = self.ldf.clone(); let by = rb_exprs_to_exprs(by)?; let lazy_gb = ldf.rolling( index_column.inner.clone(), by, RollingGroupOptions { index_column: "".into(), period: Duration::parse(&period), offset: Duration::parse(&offset), closed_window, check_sorted, }, ); Ok(RbLazyGroupBy { lgb: RefCell::new(Some(lazy_gb)), }) } #[allow(clippy::too_many_arguments)] pub fn group_by_dynamic( &self, index_column: &RbExpr, every: String, period: String, offset: String, label: Wrap