lib/metacrunch/job.rb in metacrunch-4.1.1 vs lib/metacrunch/job.rb in metacrunch-4.2.0
- old
+ new
@@ -81,16 +81,16 @@
run_pre_process
if source
# Run transformation for each data object available in source
source.each do |data|
- data = run_transformations(data)
+ data = run_transformations(transformations, data)
write_destination(data)
end
# Run all transformations a last time to flush existing buffers
- data = run_transformations(nil, flush_buffers: true)
+ data = run_transformations(transformations, nil, flush_buffers: true)
write_destination(data)
# Close destination
destination.close if destination
end
@@ -121,29 +121,28 @@
def run_post_process
post_process.call if post_process
end
- def run_transformations(data, flush_buffers: false)
- transformations.each do |transformation|
+ def run_transformations(transformations, data, flush_buffers: false)
+ transformations.each.with_index do |transformation, i|
if transformation.is_a?(Buffer)
- buffer = transformation
-
- if data
- data = buffer.buffer(data)
- data = buffer.flush if flush_buffers
- else
- data = buffer.flush
- end
+ data = transformation.buffer(data) if data
+ data = transformation.flush if flush_buffers
else
data = transformation.call(data) if data
+
+ if data&.is_a?(Enumerator)
+ data.each { |d| run_transformations(transformations.slice(i+1..-1), d, flush_buffers: flush_buffers) }
+ data = nil
+ end
end
end
data
end
def write_destination(data)
- destination.write(data) if destination && data
+ destination.write(data) if destination && data.present?
end
end