lib/metacrunch/job.rb in metacrunch-3.0.1 vs lib/metacrunch/job.rb in metacrunch-3.0.2
- old
+ new
@@ -108,40 +108,34 @@
end
def run_transformations
sources.each do |source|
# sources are expected to respond to `each`
- source.each do |row|
- _run_transformations(row)
+ source.each do |data|
+ run_transformations_and_write_destinations(data)
end
# Run all transformations a last time to flush possible buffers
- _run_transformations(nil, flush_buffers: true)
+ run_transformations_and_write_destinations(nil, flush_buffers: true)
end
# destination implementations are expected to respond to `close`
destinations.each(&:close)
end
- def _run_transformations(row, flush_buffers: false)
+ def run_transformations_and_write_destinations(data, flush_buffers: false)
transformations.each do |transformation|
- row = if transformation.is_a?(Buffer)
- if flush_buffers
- transformation.flush
- else
- transformation.buffer(row)
- end
+ if transformation.is_a?(Buffer)
+ data = transformation.buffer(data) if data.present?
+ data = transformation.flush if flush_buffers
else
- transformation.call(row) if row
+ data = transformation.call(data) if data.present?
end
-
- break unless row
end
- if row
+ if data.present?
destinations.each do |destination|
- # destinations are expected to respond to `write(row)`
- destination.write(row)
+ destination.write(data) # destinations are expected to respond to `write(data)`
end
end
end
end