module ETL module Processor class DatabaseJoinProcessor < ETL::Processor::RowProcessor attr_reader :target attr_reader :query attr_reader :fields # Initialize the procesor. # # Arguments: # * control: The ETL::Control::Control instance # * configuration: The configuration Hash # * definition: The source definition # # Required configuration options: # * :target: The target connection # * :query: The join query # * :fields: The fields to add to the row def initialize(control, configuration) super @target = configuration[:target] @query = configuration[:query] @fields = configuration[:fields] raise ControlError, ":target must be specified" unless @target raise ControlError, ":query must be specified" unless @query raise ControlError, ":fields must be specified" unless @fields end # Get a String identifier for the source def to_s "#{host}/#{database}" end def process(row) return nil if row.nil? q = @query begin q = eval('"' + @query + '"') rescue end ETL::Engine.logger.debug("Executing select: #{q}") res = connection.execute(q) # TODO - refactor this and move it (and similar code around) to adapter_extensions case connection.class.name when "ActiveRecord::ConnectionAdapters::PostgreSQLAdapter"; res.each do |r| @fields.each do |field| row[field.to_sym] = r[field.to_s] end end when "ActiveRecord::ConnectionAdapters::Mysql2Adapter"; res.each(:as => :hash) do |r| @fields.each do |field| row[field.to_sym] = r[field.to_s] end end when "ActiveRecord::ConnectionAdapters::MysqlAdapter"; res.each_hash do |r| @fields.each do |field| row[field.to_sym] = r[field.to_s] end end res.free else raise "Unsupported adapter #{connection.class} for this destination" end return row end private # Get the database connection to use def connection ETL::Engine.connection(target) end # Get the host, defaults to 'localhost' def host ETL::Base.configurations[target.to_s]['host'] || 'localhost' end def database ETL::Base.configurations[target.to_s]['database'] end end end end