module ETL #:nodoc:
module Processor #:nodoc:
# A row-level processor that checks if the row already exists in the
# target table
class CheckExistProcessor < ETL::Processor::RowProcessor
# A symbol or array of symbols representing keys that should be skipped
attr_accessor :skip
# The target database
attr_accessor :target
# The name of the table to check against
attr_accessor :table
# An array of columns representing the natural key
attr_accessor :columns
# Is set to true if the processor should execute the check. If there are
# no rows in the target table then this should return false.
attr_accessor :should_check
# Initialize the processor
# Configuration options:
# * :skip: A symbol or array of column names that should not
# be checked
# * :table: The table name
# * :columns: An array of columns which represent the natural
# key
def initialize(control, configuration)
super
@skip = configuration[:skip] || []
@target = configuration[:target] || raise(ETL::ControlError, "target must be specified")
@table = configuration[:table] || raise(ETL::ControlError, "table must be specified")
@columns = configuration[:columns]
q = "SELECT COUNT(*) FROM #{table_name}"
@should_check = ETL::Engine.connection(target).select_value(q).to_i > 0
end
# Return true if the given key should be skipped
def skip?(key)
case skip
when Array
skip.include?(key)
else
skip.to_sym == key.to_sym
end
end
# Return true if the row should be checked
def should_check?
@should_check ? true : false
end
# Process the row
def process(row)
return row unless should_check?
conn = ETL::Engine.connection(target)
q = "SELECT * FROM #{table_name} WHERE "
conditions = []
row.each do |k,v|
if columns.nil? || columns.include?(k.to_sym)
conditions << "'#{k}' = #{conn.quote(v)}" unless (skip?(k.to_sym) || v.blank?)
end
end
q << conditions.join(" AND ")
q << " LIMIT 1"
#puts "query: #{q}"
result = conn.select_one(q)
return row if result.nil?
end
private
def table_name
ETL::Engine.table(table, ETL::Engine.connection(target))
end
end
end
end