lib/kraps/job.rb in kraps-0.7.0 vs lib/kraps/job.rb in kraps-0.8.0
- old
+ new
@@ -28,16 +28,18 @@
end
def map(partitions: nil, partitioner: nil, jobs: nil, worker: @worker, before: nil, &block)
fresh.tap do |job|
job.instance_eval do
+ jobs = [jobs, @partitions].compact.min
+
@partitions = partitions if partitions
@partitioner = partitioner if partitioner
@steps << Step.new(
action: Actions::MAP,
- jobs: [jobs, @partitions].compact.min,
+ jobs: jobs,
partitions: @partitions,
partitioner: @partitioner,
worker: worker,
before: before,
block: block
@@ -47,16 +49,18 @@
end
def map_partitions(partitions: nil, partitioner: nil, jobs: nil, worker: @worker, before: nil, &block)
fresh.tap do |job|
job.instance_eval do
+ jobs = [jobs, @partitions].compact.min
+
@partitions = partitions if partitions
@partitioner = partitioner if partitioner
@steps << Step.new(
action: Actions::MAP_PARTITIONS,
- jobs: [jobs, @partitions].compact.min,
+ jobs: jobs,
partitions: @partitions,
partitioner: @partitioner,
worker: worker,
before: before,
block: block
@@ -133,31 +137,30 @@
ensure
tempfile&.close(true)
end
end
- def load(prefix:, partitions:, partitioner:, worker: @worker)
+ def load(prefix:, partitions:, partitioner:, concurrency:, worker: @worker)
job = parallelize(partitions: partitions, partitioner: proc { |key, _| key }, worker: worker) do |collector|
(0...partitions).each do |partition|
collector.call(partition)
end
end
job.map_partitions(partitioner: partitioner, worker: worker) do |partition, _, collector|
- tempfile = Tempfile.new
+ temp_paths = Downloader.download_all(prefix: File.join(prefix, partition.to_s, "/"), concurrency: concurrency)
- path = File.join(prefix, partition.to_s, "chunk.json")
- next unless Kraps.driver.exists?(path)
+ temp_paths.each do |temp_path|
+ File.open(temp_path.path) do |stream|
+ stream.each_line do |line|
+ key, value = JSON.parse(line)
- Kraps.driver.download(path, tempfile.path)
-
- tempfile.each_line do |line|
- key, value = JSON.parse(line)
-
- collector.call(key, value)
+ collector.call(key, value)
+ end
+ end
end
ensure
- tempfile&.close(true)
+ temp_paths&.delete
end
end
def fresh
dup.tap do |job|