lib/metacrunch/job.rb in metacrunch-4.1.1 vs lib/metacrunch/job.rb in metacrunch-4.2.0

- old
+ new

@@ -81,16 +81,16 @@ run_pre_process if source # Run transformation for each data object available in source source.each do |data| - data = run_transformations(data) + data = run_transformations(transformations, data) write_destination(data) end # Run all transformations a last time to flush existing buffers - data = run_transformations(nil, flush_buffers: true) + data = run_transformations(transformations, nil, flush_buffers: true) write_destination(data) # Close destination destination.close if destination end @@ -121,29 +121,28 @@ def run_post_process post_process.call if post_process end - def run_transformations(data, flush_buffers: false) - transformations.each do |transformation| + def run_transformations(transformations, data, flush_buffers: false) + transformations.each.with_index do |transformation, i| if transformation.is_a?(Buffer) - buffer = transformation - - if data - data = buffer.buffer(data) - data = buffer.flush if flush_buffers - else - data = buffer.flush - end + data = transformation.buffer(data) if data + data = transformation.flush if flush_buffers else data = transformation.call(data) if data + + if data&.is_a?(Enumerator) + data.each { |d| run_transformations(transformations.slice(i+1..-1), d, flush_buffers: flush_buffers) } + data = nil + end end end data end def write_destination(data) - destination.write(data) if destination && data + destination.write(data) if destination && data.present? end end