Sha256: c42f02d831e8272cf09c80c0784d49e857012ba5e374e8135a0150e511046900

Contents?: true

Size: 1.67 KB

Versions: 1

Compression:

Stored size: 1.67 KB

Contents

module ETL
  module Processor
    class DatabaseJoinProcessor < ETL::Processor::RowProcessor
      attr_reader :target
      attr_reader :query
      attr_reader :fields

      # Initialize the procesor.
      #
      # Arguments:
      # * <tt>control</tt>: The ETL::Control::Control instance
      # * <tt>configuration</tt>: The configuration Hash
      # * <tt>definition</tt>: The source definition
      #
      # Required configuration options:
      # * <tt>:target</tt>: The target connection
      # * <tt>:query</tt>: The join query
      # * <tt>:fields</tt>: The fields to add to the row
      def initialize(control, configuration)
        super
        @target = configuration[:target]
        @query = configuration[:query]
        @fields = configuration[:fields]
      end
      
      # Get a String identifier for the source
      def to_s
        "#{host}/#{database}"
      end
      
      def process(row)
        return nil if row.nil?

        q = @query
        begin
          q = eval('"' + @query + '"')
        rescue
        end

        ETL::Engine.logger.debug("Executing select: #{q}")
        res = connection.execute(q)

        res.each_hash do |r|
          @fields.each do |field|
            row[field.to_sym] = r[field]
          end
        end

        return row
      end

      private
      # Get the database connection to use
      def connection
        ETL::Engine.connection(target)
      end
      
      # Get the host, defaults to 'localhost'
      def host
        ETL::Base.configurations[target.to_s]['host'] || 'localhost'
      end
      
      def database
        ETL::Base.configurations[target.to_s]['database']
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
activewarehouse-etl-0.9.5.rc1 lib/etl/processor/database_join_processor.rb