lib/kraps/job.rb in kraps-0.5.0 vs lib/kraps/job.rb in kraps-0.6.0

- old
+ new

@@ -43,10 +43,28 @@ ) end end end + def map_partitions(partitions: nil, partitioner: nil, worker: @worker, before: nil, &block) + fresh.tap do |job| + job.instance_eval do + @partitions = partitions if partitions + @partitioner = partitioner if partitioner + + @steps << Step.new( + action: Actions::MAP_PARTITIONS, + partitions: @partitions, + partitioner: @partitioner, + worker: worker, + before: before, + block: block + ) + end + end + end + def reduce(worker: @worker, before: nil, &block) fresh.tap do |job| job.instance_eval do @steps << Step.new( action: Actions::REDUCE, @@ -58,10 +76,27 @@ ) end end end + def combine(other_job, worker: @worker, before: nil, &block) + fresh.tap do |job| + job.instance_eval do + @steps << Step.new( + action: Actions::COMBINE, + partitions: @partitions, + partitioner: @partitioner, + worker: worker, + before: before, + block: block, + dependency: other_job, + options: { combine_step_index: other_job.steps.size - 1 } + ) + end + end + end + def each_partition(worker: @worker, before: nil, &block) fresh.tap do |job| job.instance_eval do @steps << Step.new( action: Actions::EACH_PARTITION, @@ -76,9 +111,48 @@ end def repartition(partitions:, partitioner: nil, worker: @worker, before: nil) map(partitions: partitions, partitioner: partitioner, worker: worker, before: before) do |key, value, collector| collector.call(key, value) + end + end + + def dump(prefix:, worker: @worker) + each_partition(worker: worker) do |partition, pairs| + tempfile = Tempfile.new + + pairs.each do |pair| + tempfile.puts(JSON.generate(pair)) + end + + Kraps.driver.store(File.join(prefix, partition.to_s, "chunk.json"), tempfile.tap(&:rewind)) + ensure + tempfile&.close(true) + end + end + + def load(prefix:, partitions:, partitioner:, worker: @worker) + job = parallelize(partitions: partitions, partitioner: proc { |key, _| key }, worker: worker) do |collector| + (0...partitions).each do |partition| + collector.call(partition) + end + end + + job.map_partitions(partitioner: partitioner, worker: worker) do |partition, _, collector| + tempfile = Tempfile.new + + path = File.join(prefix, partition.to_s, "chunk.json") + next unless Kraps.driver.exists?(path) + + Kraps.driver.download(path, tempfile.path) + + tempfile.each_line do |line| + key, value = JSON.parse(line) + + collector.call(key, value) + end + ensure + tempfile&.close(true) end end def fresh dup.tap do |job|