Sha256: cdaa49540388839d49585901e5801bd5d65d90c135ac1cbe709653d6f09799dc

Contents?: true

Size: 1 KB

Versions: 7

Compression:

Stored size: 1 KB

Contents

require_relative '../all_jobs_shared'

class PartitionerJob
  include AllJobsShared

  define_source :source_data, Remi::DataSource::DataFrame,
    fields: {
      :id      => {}
    }

  define_source :distribution, Remi::DataSource::DataFrame,
    fields: {
      :group  => {},
      :weight => {}
    }

  define_source :current_population, Remi::DataSource::DataFrame,
    fields: {
      :group => {},
      :count => {}
    }

  define_target :target_data, Remi::DataTarget::DataFrame

  define_transform :main, sources: :source_data, targets: :target_data do

    distribution_hash = distribution.df.map(:row) { |row| [row[:group], row[:weight].to_f] }.to_h
    current_population_hash = current_population.df.map(:row) { |row| [row[:group], row[:count].to_i] }.to_h

    Remi::SourceToTargetMap.apply(source_data.df, target_data.df) do
      map source(nil) .target(:group)
        .transform(Remi::Transform::Partitioner.new(buckets: distribution_hash, initial_population: current_population_hash))
    end
  end
end

Version data entries

7 entries across 7 versions & 1 rubygems

Version Path
remi-0.2.37 jobs/transforms/partitioner_job.rb
remi-0.2.36 jobs/transforms/partitioner_job.rb
remi-0.2.35 jobs/transforms/partitioner_job.rb
remi-0.2.34 jobs/transforms/partitioner_job.rb
remi-0.2.33 jobs/transforms/partitioner_job.rb
remi-0.2.32 jobs/transforms/partitioner_job.rb
remi-0.2.31 jobs/transforms/partitioner_job.rb