lib/kraps/job.rb in kraps-0.5.0 vs lib/kraps/job.rb in kraps-0.6.0
- old
+ new
@@ -43,10 +43,28 @@
)
end
end
end
+ def map_partitions(partitions: nil, partitioner: nil, worker: @worker, before: nil, &block)
+ fresh.tap do |job|
+ job.instance_eval do
+ @partitions = partitions if partitions
+ @partitioner = partitioner if partitioner
+
+ @steps << Step.new(
+ action: Actions::MAP_PARTITIONS,
+ partitions: @partitions,
+ partitioner: @partitioner,
+ worker: worker,
+ before: before,
+ block: block
+ )
+ end
+ end
+ end
+
def reduce(worker: @worker, before: nil, &block)
fresh.tap do |job|
job.instance_eval do
@steps << Step.new(
action: Actions::REDUCE,
@@ -58,10 +76,27 @@
)
end
end
end
+ def combine(other_job, worker: @worker, before: nil, &block)
+ fresh.tap do |job|
+ job.instance_eval do
+ @steps << Step.new(
+ action: Actions::COMBINE,
+ partitions: @partitions,
+ partitioner: @partitioner,
+ worker: worker,
+ before: before,
+ block: block,
+ dependency: other_job,
+ options: { combine_step_index: other_job.steps.size - 1 }
+ )
+ end
+ end
+ end
+
def each_partition(worker: @worker, before: nil, &block)
fresh.tap do |job|
job.instance_eval do
@steps << Step.new(
action: Actions::EACH_PARTITION,
@@ -76,9 +111,48 @@
end
def repartition(partitions:, partitioner: nil, worker: @worker, before: nil)
map(partitions: partitions, partitioner: partitioner, worker: worker, before: before) do |key, value, collector|
collector.call(key, value)
+ end
+ end
+
+ def dump(prefix:, worker: @worker)
+ each_partition(worker: worker) do |partition, pairs|
+ tempfile = Tempfile.new
+
+ pairs.each do |pair|
+ tempfile.puts(JSON.generate(pair))
+ end
+
+ Kraps.driver.store(File.join(prefix, partition.to_s, "chunk.json"), tempfile.tap(&:rewind))
+ ensure
+ tempfile&.close(true)
+ end
+ end
+
+ def load(prefix:, partitions:, partitioner:, worker: @worker)
+ job = parallelize(partitions: partitions, partitioner: proc { |key, _| key }, worker: worker) do |collector|
+ (0...partitions).each do |partition|
+ collector.call(partition)
+ end
+ end
+
+ job.map_partitions(partitioner: partitioner, worker: worker) do |partition, _, collector|
+ tempfile = Tempfile.new
+
+ path = File.join(prefix, partition.to_s, "chunk.json")
+ next unless Kraps.driver.exists?(path)
+
+ Kraps.driver.download(path, tempfile.path)
+
+ tempfile.each_line do |line|
+ key, value = JSON.parse(line)
+
+ collector.call(key, value)
+ end
+ ensure
+ tempfile&.close(true)
end
end
def fresh
dup.tap do |job|