lib/metacrunch/job.rb in metacrunch-3.0.1 vs lib/metacrunch/job.rb in metacrunch-3.0.2

- old
+ new

@@ -108,40 +108,34 @@ end def run_transformations sources.each do |source| # sources are expected to respond to `each` - source.each do |row| - _run_transformations(row) + source.each do |data| + run_transformations_and_write_destinations(data) end # Run all transformations a last time to flush possible buffers - _run_transformations(nil, flush_buffers: true) + run_transformations_and_write_destinations(nil, flush_buffers: true) end # destination implementations are expected to respond to `close` destinations.each(&:close) end - def _run_transformations(row, flush_buffers: false) + def run_transformations_and_write_destinations(data, flush_buffers: false) transformations.each do |transformation| - row = if transformation.is_a?(Buffer) - if flush_buffers - transformation.flush - else - transformation.buffer(row) - end + if transformation.is_a?(Buffer) + data = transformation.buffer(data) if data.present? + data = transformation.flush if flush_buffers else - transformation.call(row) if row + data = transformation.call(data) if data.present? end - - break unless row end - if row + if data.present? destinations.each do |destination| - # destinations are expected to respond to `write(row)` - destination.write(row) + destination.write(data) # destinations are expected to respond to `write(data)` end end end end