module EasyML module Adapters class PolarsAdapter < BaseAdapter def initialize(datasource) super read_df_from_configuration end def query(drop_cols: [], filter: nil, limit: nil, select: nil, unique: nil, sort: nil, descending: false) return if df.nil? df = self.df.clone df = df.filter(filter) if filter df = df.select(select) if select.present? df = df.unique if unique drop_cols &= df.columns df = df.drop(drop_cols) unless drop_cols.empty? df = df.sort(sort, reverse: descending) if sort df = df.limit(limit) if limit df end def in_batches(of: 10_000) total_rows = df.shape[0] (0...total_rows).step(of) do |start| end_index = [start + of, total_rows].min yield df.slice(start, end_index - start) end end def files [] end def last_updated_at datasource.updated_at end def data df end private attr_accessor :df def store_df_in_configuration return unless df datasource.configuration = (datasource.configuration || {}).merge( "df" => JSON.parse(df.write_json) ) end def read_df_from_configuration return unless datasource.configuration&.key?("df") df_data = datasource.configuration["df"] columns = df_data["columns"].map do |col| dtype = case col["datatype"] when Hash if col["datatype"]["Datetime"] Polars::Datetime.new(col["datatype"]["Datetime"][0].downcase.to_sym).class else Polars::Utf8 end else Polars.const_get(col["datatype"]) end Polars::Series.new(col["name"], col["values"], dtype: dtype) end @df = Polars::DataFrame.new(columns) end end end end