Sha256: c407b1c45b3bd384077d7cb21ecfe9cad5c275398c97b42fdd9eef14ce756d22
Contents?: true
Size: 1.96 KB
Versions: 2
Compression:
Stored size: 1.96 KB
Contents
module Kraps class Job attr_reader :steps def initialize(worker:) @worker = worker @steps = [] @partitions = 0 @partitioner = MapReduce::HashPartitioner.new(@partitions) end def parallelize(partitions:, partitioner: MapReduce::HashPartitioner.new(partitions), worker: @worker, &block) fresh.tap do |job| job.instance_eval do @partitions = partitions @partitioner = partitioner @steps << Step.new(action: Actions::PARALLELIZE, args: { partitions: @partitions, partitioner: @partitioner, worker: worker }, block: block) end end end def map(partitions: nil, partitioner: nil, worker: @worker, &block) fresh.tap do |job| job.instance_eval do @partitions = partitions if partitions @partitioner = partitioner || MapReduce::HashPartitioner.new(partitions) if partitioner || partitions @steps << Step.new(action: Actions::MAP, args: { partitions: @partitions, partitioner: @partitioner, worker: worker }, block: block) end end end def reduce(worker: @worker, &block) fresh.tap do |job| job.instance_eval do @steps << Step.new(action: Actions::REDUCE, args: { partitions: @partitions, partitioner: @partitioner, worker: worker }, block: block) end end end def each_partition(worker: @worker, &block) fresh.tap do |job| job.instance_eval do @steps << Step.new(action: Actions::EACH_PARTITION, args: { partitions: @partitions, partitioner: @partitioner, worker: worker }, block: block) end end end def repartition(partitions:, partitioner: nil, worker: @worker) map(partitions: partitions, partitioner: partitioner, worker: worker) do |key, value, collector| collector.call(key, value) end end def fresh dup.tap do |job| job.instance_variable_set(:@steps, @steps.dup) end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
kraps-0.2.0 | lib/kraps/job.rb |
kraps-0.1.0 | lib/kraps/job.rb |