lib/metacrunch/job.rb in metacrunch-3.0.3 vs lib/metacrunch/job.rb in metacrunch-3.1.0
- old
+ new
@@ -4,18 +4,20 @@
require_relative "job/buffer"
attr_reader :builder, :args
class << self
- def define(file_content = nil, filename: nil, args: nil, &block)
- self.new(file_content, filename: filename, args: args, &block)
+ def define(file_content = nil, filename: nil, args: nil, number_of_processes: 1, process_index: 0, &block)
+ self.new(file_content, filename: filename, args: args, number_of_processes: number_of_processes, process_index: process_index, &block)
end
end
- def initialize(file_content = nil, filename: nil, args: nil, &block)
+ def initialize(file_content = nil, filename: nil, args: nil, number_of_processes: 1, process_index: 0, &block)
@builder = Dsl.new(self)
@args = args
+ @number_of_processes = number_of_processes
+ @process_index = process_index
if file_content
@builder.instance_eval(file_content, filename || "")
elsif block_given?
@builder.instance_eval(&block)
@@ -107,9 +109,21 @@
post_processes.each(&:call)
end
def run_transformations
sources.each do |source|
+ # Setup parallel processing
+ if @number_of_processes > 1
+ if source.class.included_modules.include?(Metacrunch::ParallelProcessableReader)
+ source.set_parallel_process_options(
+ number_of_processes: @number_of_processes,
+ process_index: @process_index
+ )
+ else
+ raise RuntimeError, "source does't support parallel processing"
+ end
+ end
+
# sources are expected to respond to `each`
source.each do |data|
run_transformations_and_write_destinations(data)
end