lib/rodimus/step.rb in rodimus-0.1.2 vs lib/rodimus/step.rb in rodimus-1.0.0
- old
+ new
@@ -1,27 +1,33 @@
module Rodimus
- module Step
+ class Step
+ include Observable
+ include Observing # Steps observe themselves for run hooks
+ include RuntimeLogging
+
# The incoming data stream. Can be anything that quacks like an IO
attr_accessor :incoming
# The outgoing data stream. Can be anything that quacks like an IO
attr_accessor :outgoing
# Shared user-data accessible across all running transformation steps.
# This is initialized by the Transformation when the step begins to run.
attr_accessor :shared_data
+ def initialize
+ observers << self
+ observers << Benchmark.new if Rodimus.configuration.benchmarking
+ end
+
def close_descriptors
[incoming, outgoing].reject(&:nil?).each do |descriptor|
descriptor.close if descriptor.respond_to?(:close)
end
end
- # Override this for custom cleanup functionality.
- def finalize; end
-
# Override this for custom output handling functionality per-row.
def handle_output(transformed_row)
outgoing.puts(transformed_row)
end
@@ -29,19 +35,20 @@
def process_row(row)
row.to_s
end
def run
- Rodimus.logger.info "Running #{self}"
+ notify(self, :before_run)
@row_count = 1
incoming.each do |row|
+ notify(self, :before_row)
transformed_row = process_row(row)
handle_output(transformed_row)
Rodimus.logger.info(self) { "#{@row_count} rows processed" } if @row_count % 50000 == 0
@row_count += 1
+ notify(self, :after_row)
end
- finalize
- Rodimus.logger.info "Finished #{self}"
+ notify(self, :after_run)
ensure
close_descriptors
end
def to_s