lib/kraps/job.rb in kraps-0.7.0 vs lib/kraps/job.rb in kraps-0.8.0

- old
+ new

@@ -28,16 +28,18 @@ end def map(partitions: nil, partitioner: nil, jobs: nil, worker: @worker, before: nil, &block) fresh.tap do |job| job.instance_eval do + jobs = [jobs, @partitions].compact.min + @partitions = partitions if partitions @partitioner = partitioner if partitioner @steps << Step.new( action: Actions::MAP, - jobs: [jobs, @partitions].compact.min, + jobs: jobs, partitions: @partitions, partitioner: @partitioner, worker: worker, before: before, block: block @@ -47,16 +49,18 @@ end def map_partitions(partitions: nil, partitioner: nil, jobs: nil, worker: @worker, before: nil, &block) fresh.tap do |job| job.instance_eval do + jobs = [jobs, @partitions].compact.min + @partitions = partitions if partitions @partitioner = partitioner if partitioner @steps << Step.new( action: Actions::MAP_PARTITIONS, - jobs: [jobs, @partitions].compact.min, + jobs: jobs, partitions: @partitions, partitioner: @partitioner, worker: worker, before: before, block: block @@ -133,31 +137,30 @@ ensure tempfile&.close(true) end end - def load(prefix:, partitions:, partitioner:, worker: @worker) + def load(prefix:, partitions:, partitioner:, concurrency:, worker: @worker) job = parallelize(partitions: partitions, partitioner: proc { |key, _| key }, worker: worker) do |collector| (0...partitions).each do |partition| collector.call(partition) end end job.map_partitions(partitioner: partitioner, worker: worker) do |partition, _, collector| - tempfile = Tempfile.new + temp_paths = Downloader.download_all(prefix: File.join(prefix, partition.to_s, "/"), concurrency: concurrency) - path = File.join(prefix, partition.to_s, "chunk.json") - next unless Kraps.driver.exists?(path) + temp_paths.each do |temp_path| + File.open(temp_path.path) do |stream| + stream.each_line do |line| + key, value = JSON.parse(line) - Kraps.driver.download(path, tempfile.path) - - tempfile.each_line do |line| - key, value = JSON.parse(line) - - collector.call(key, value) + collector.call(key, value) + end + end end ensure - tempfile&.close(true) + temp_paths&.delete end end def fresh dup.tap do |job|