module ETL #:nodoc:
module Control #:nodoc:
# Destination which writes directly to a database. This is useful when you are dealing with
# a small amount of data. For larger amounts of data you should probably use the bulk
# loader if it is supported with your target database as it will use a much faster load
# method.
class UpdateDatabaseDestination < Destination
# The target connection
attr_reader :target
# The table
attr_reader :table
# Specify the order from the source
attr_reader :order
# Specify the conditions from the source
attr_reader :conditions
# Initialize the database destination
#
# * control: The ETL::Control::Control instance
# * configuration: The configuration Hash
# * mapping: The mapping
#
# Configuration options:
# * :database: The database name (REQUIRED)
# * :target: The target connection (REQUIRED)
# * :table: The table to write to (REQUIRED)
# * :unique: Set to true to only insert unique records (defaults to false)
# * :append_rows: Array of rows to append
#
# Mapping options:
# * :order: The order of fields to write (REQUIRED)
# * :conditions: The conditions on the fields to update (REQUIRED)
def initialize(control, configuration, mapping={})
super
@target = configuration[:target]
@table = configuration[:table]
@unique = configuration[:unique] ? configuration[:unique] + [scd_effective_date_field] : configuration[:unique]
@unique.uniq! unless @unique.nil?
@order = mapping[:order] ? mapping[:order] + scd_required_fields : order_from_source
@order.uniq! unless @order.nil?
@conditions = mapping[:conditions] ? mapping[:conditions] + scd_required_fields : nil
@conditions.uniq! unless @conditions.nil?
raise ControlError, "Conditions required in mapping" unless @conditions
raise ControlError, "Order required in mapping" unless @order
raise ControlError, "Table required" unless @table
raise ControlError, "Target required" unless @target
end
# Flush the currently buffered data
def flush
conn.transaction do
buffer.flatten.each do |row|
# check to see if this row's compound key constraint already exists
# note that the compound key constraint may not utilize virtual fields
next unless row_allowed?(row)
# add any virtual fields
add_virtuals!(row)
conditionsfilter = []
conditions.each do |cond|
c = " #{cond[:field]} #{cond[:comp]} #{cond[:value]} "
condition = c
begin
condition = eval('"' + c + '"')
rescue
end
conditionsfilter << condition
end
updatevalues = []
order.each do |name|
updatevalues << "#{conn.quote_column_name(name)} = #{conn.quote(row[name])}"
end
q = "UPDATE #{conn.quote_table_name(table_name)} SET #{updatevalues.join(',')} WHERE #{conditionsfilter.join(' AND ')}"
ETL::Engine.logger.debug("Executing update: #{q}")
conn.update(q, "Update row #{current_row}")
@current_row += 1
end
buffer.clear
end
end
# Close the connection
def close
buffer << append_rows if append_rows
flush
end
private
def conn
@conn ||= begin
conn = ETL::Engine.connection(target)
conn
rescue
raise RuntimeError, "Problem to connect to db"
end
end
def table_name
ETL::Engine.table(table, ETL::Engine.connection(target))
end
end
end
end