lib/kraps/job.rb in kraps-0.4.0 vs lib/kraps/job.rb in kraps-0.5.0

- old
+ new

@@ -7,49 +7,77 @@ @steps = [] @partitions = 0 @partitioner = HashPartitioner.new end - def parallelize(partitions:, partitioner: HashPartitioner.new, worker: @worker, &block) + def parallelize(partitions:, partitioner: HashPartitioner.new, worker: @worker, before: nil, &block) fresh.tap do |job| job.instance_eval do @partitions = partitions @partitioner = partitioner - @steps << Step.new(action: Actions::PARALLELIZE, args: { partitions: @partitions, partitioner: @partitioner, worker: worker }, block: block) + @steps << Step.new( + action: Actions::PARALLELIZE, + partitions: @partitions, + partitioner: @partitioner, + worker: worker, + before: before, + block: block + ) end end end - def map(partitions: nil, partitioner: nil, worker: @worker, &block) + def map(partitions: nil, partitioner: nil, worker: @worker, before: nil, &block) fresh.tap do |job| job.instance_eval do @partitions = partitions if partitions @partitioner = partitioner if partitioner - @steps << Step.new(action: Actions::MAP, args: { partitions: @partitions, partitioner: @partitioner, worker: worker }, block: block) + @steps << Step.new( + action: Actions::MAP, + partitions: @partitions, + partitioner: @partitioner, + worker: worker, + before: before, + block: block + ) end end end - def reduce(worker: @worker, &block) + def reduce(worker: @worker, before: nil, &block) fresh.tap do |job| job.instance_eval do - @steps << Step.new(action: Actions::REDUCE, args: { partitions: @partitions, partitioner: @partitioner, worker: worker }, block: block) + @steps << Step.new( + action: Actions::REDUCE, + partitions: @partitions, + partitioner: @partitioner, + worker: worker, + before: before, + block: block + ) end end end - def each_partition(worker: @worker, &block) + def each_partition(worker: @worker, before: nil, &block) fresh.tap do |job| job.instance_eval do - @steps << Step.new(action: Actions::EACH_PARTITION, args: { partitions: @partitions, partitioner: @partitioner, worker: worker }, block: block) + @steps << Step.new( + action: Actions::EACH_PARTITION, + partitions: @partitions, + partitioner: @partitioner, + worker: worker, + before: before, + block: block + ) end end end - def repartition(partitions:, partitioner: nil, worker: @worker) - map(partitions: partitions, partitioner: partitioner, worker: worker) do |key, value, collector| + def repartition(partitions:, partitioner: nil, worker: @worker, before: nil) + map(partitions: partitions, partitioner: partitioner, worker: worker, before: before) do |key, value, collector| collector.call(key, value) end end def fresh