Sha256: 74f42f37bb63102ec32ec45152c1358002933255218a515f1955255b2f32df5c

Contents?: true

Size: 1.62 KB

Versions: 5

Compression:

Stored size: 1.62 KB

Contents

module Rodimus

  class Step
    include Observable
    include Observing # Steps observe themselves for run hooks
    include RuntimeLogging

    # The incoming data stream.  Can be anything that quacks like an IO
    attr_accessor :incoming

    # The outgoing data stream.  Can be anything that quacks like an IO
    attr_accessor :outgoing

    # Shared user-data accessible across all running transformation steps.
    # This is initialized by the Transformation when the step begins to run.
    attr_accessor :shared_data

    def initialize
      observers << self
      observers << Benchmark.new if Rodimus.configuration.benchmarking
    end

    def close_descriptors
      [incoming, outgoing].reject(&:nil?).each do |descriptor|
        descriptor.close if descriptor.respond_to?(:close)
      end
    end

    # Override this for custom output handling functionality per-row.
    def handle_output(transformed_row)
      outgoing.puts(transformed_row)
    end

    # Override this for custom transformation functionality
    def process_row(row)
      row.to_s
    end

    def run
      notify(self, :before_run)
      @row_count = 1
      incoming.each do |row|
        notify(self, :before_row)
        transformed_row = process_row(row)
        handle_output(transformed_row)
        Rodimus.logger.info(self) { "#{@row_count} rows processed" } if @row_count % 50000 == 0
        @row_count += 1
        notify(self, :after_row)
      end
      notify(self, :after_run)
    ensure
      close_descriptors
    end

    def to_s
      "#{self.class} connected to input: #{incoming || 'nil'} and output: #{outgoing || 'nil'}"
    end
  end

end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
rodimus-1.3.1 lib/rodimus/step.rb
rodimus-1.3.0 lib/rodimus/step.rb
rodimus-1.2.0 lib/rodimus/step.rb
rodimus-1.1.0 lib/rodimus/step.rb
rodimus-1.0.0 lib/rodimus/step.rb