Sha256: 74e402c80f8aa3a17c62848b16c44780b9c37823932f3bf299f2ddcd09e56fd8
Contents?: true
Size: 1.85 KB
Versions: 2
Compression:
Stored size: 1.85 KB
Contents
module Kraps class Job attr_reader :steps def initialize(worker:) @worker = worker @steps = [] @partitions = 0 @partitioner = HashPartitioner.new end def parallelize(partitions:, partitioner: HashPartitioner.new, worker: @worker, &block) fresh.tap do |job| job.instance_eval do @partitions = partitions @partitioner = partitioner @steps << Step.new(action: Actions::PARALLELIZE, args: { partitions: @partitions, partitioner: @partitioner, worker: worker }, block: block) end end end def map(partitions: nil, partitioner: nil, worker: @worker, &block) fresh.tap do |job| job.instance_eval do @partitions = partitions if partitions @partitioner = partitioner if partitioner @steps << Step.new(action: Actions::MAP, args: { partitions: @partitions, partitioner: @partitioner, worker: worker }, block: block) end end end def reduce(worker: @worker, &block) fresh.tap do |job| job.instance_eval do @steps << Step.new(action: Actions::REDUCE, args: { partitions: @partitions, partitioner: @partitioner, worker: worker }, block: block) end end end def each_partition(worker: @worker, &block) fresh.tap do |job| job.instance_eval do @steps << Step.new(action: Actions::EACH_PARTITION, args: { partitions: @partitions, partitioner: @partitioner, worker: worker }, block: block) end end end def repartition(partitions:, partitioner: nil, worker: @worker) map(partitions: partitions, partitioner: partitioner, worker: worker) do |key, value, collector| collector.call(key, value) end end def fresh dup.tap do |job| job.instance_variable_set(:@steps, @steps.dup) end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
kraps-0.4.0 | lib/kraps/job.rb |
kraps-0.3.0 | lib/kraps/job.rb |