lib/kraps/job.rb in kraps-0.4.0 vs lib/kraps/job.rb in kraps-0.5.0
- old
+ new
@@ -7,49 +7,77 @@
@steps = []
@partitions = 0
@partitioner = HashPartitioner.new
end
- def parallelize(partitions:, partitioner: HashPartitioner.new, worker: @worker, &block)
+ def parallelize(partitions:, partitioner: HashPartitioner.new, worker: @worker, before: nil, &block)
fresh.tap do |job|
job.instance_eval do
@partitions = partitions
@partitioner = partitioner
- @steps << Step.new(action: Actions::PARALLELIZE, args: { partitions: @partitions, partitioner: @partitioner, worker: worker }, block: block)
+ @steps << Step.new(
+ action: Actions::PARALLELIZE,
+ partitions: @partitions,
+ partitioner: @partitioner,
+ worker: worker,
+ before: before,
+ block: block
+ )
end
end
end
- def map(partitions: nil, partitioner: nil, worker: @worker, &block)
+ def map(partitions: nil, partitioner: nil, worker: @worker, before: nil, &block)
fresh.tap do |job|
job.instance_eval do
@partitions = partitions if partitions
@partitioner = partitioner if partitioner
- @steps << Step.new(action: Actions::MAP, args: { partitions: @partitions, partitioner: @partitioner, worker: worker }, block: block)
+ @steps << Step.new(
+ action: Actions::MAP,
+ partitions: @partitions,
+ partitioner: @partitioner,
+ worker: worker,
+ before: before,
+ block: block
+ )
end
end
end
- def reduce(worker: @worker, &block)
+ def reduce(worker: @worker, before: nil, &block)
fresh.tap do |job|
job.instance_eval do
- @steps << Step.new(action: Actions::REDUCE, args: { partitions: @partitions, partitioner: @partitioner, worker: worker }, block: block)
+ @steps << Step.new(
+ action: Actions::REDUCE,
+ partitions: @partitions,
+ partitioner: @partitioner,
+ worker: worker,
+ before: before,
+ block: block
+ )
end
end
end
- def each_partition(worker: @worker, &block)
+ def each_partition(worker: @worker, before: nil, &block)
fresh.tap do |job|
job.instance_eval do
- @steps << Step.new(action: Actions::EACH_PARTITION, args: { partitions: @partitions, partitioner: @partitioner, worker: worker }, block: block)
+ @steps << Step.new(
+ action: Actions::EACH_PARTITION,
+ partitions: @partitions,
+ partitioner: @partitioner,
+ worker: worker,
+ before: before,
+ block: block
+ )
end
end
end
- def repartition(partitions:, partitioner: nil, worker: @worker)
- map(partitions: partitions, partitioner: partitioner, worker: worker) do |key, value, collector|
+ def repartition(partitions:, partitioner: nil, worker: @worker, before: nil)
+ map(partitions: partitions, partitioner: partitioner, worker: worker, before: before) do |key, value, collector|
collector.call(key, value)
end
end
def fresh