module ETL #:nodoc:
module Control #:nodoc:
# Destination which writes directly to a database. This is useful when you are dealing with
# a small amount of data. For larger amounts of data you should probably use the bulk
# loader if it is supported with your target database as it will use a much faster load
# method.
class DatabaseDestination < Destination
# Specify the order from the source
attr_reader :order
# Set to true to truncate the destination table first
attr_reader :truncate
# Initialize the database destination
#
# * control: The ETL::Control::Control instance
# * configuration: The configuration Hash
# * mapping: The mapping
#
# Configuration options:
# * :database: The database name (REQUIRED)
# * :table: The table to write to (REQUIRED)
# * :truncate: Set to true to truncate before writing (defaults to false)
# * :unique: Set to true to only insert unique records (defaults to false)
# * :adapter: The adapter to use (defaults to :mysql)
# * :username: The database username (defaults to 'root')
# * :password: The password to the database (defaults to nothing)
# * :host: The host for the database (defaults to 'localhost')
#
# Mapping options:
# * :order: The order of fields to write (REQUIRED)
def initialize(control, configuration, mapping)
super
@truncate = configuration[:truncate] ||= false
@unique = configuration[:unique]
@order = mapping[:order] || order_from_source
raise ControlError, "Order required in mapping" unless @order
connect
end
# Flush the currently buffered data
def flush
conn = ETL::ActiveRecord::Base.connection
conn.transaction do
conn.truncate(configuration[:table]) if truncate
buffer.each do |row|
# check to see if this row's compound key constraint already exists
# note that the compound key constraint may not utilize virtual fields
next unless row_allowed?(row)
# add any virtual fields
add_virtuals!(row)
names = []
values = []
order.each do |name|
names << name
values << "'#{row[name]}'" # TODO: this is probably not database agnostic
end
q = "INSERT INTO #{configuration[:table]} (#{names.join(',')}) VALUES (#{values.join(',')})"
# ETL::Engine.logger.debug("Query: #{q}")
conn.insert(q, "Insert row #{current_row}")
@current_row += 1
end
buffer.clear
end
end
# Close the connection
def close
flush
ETL::ActiveRecord::Base.connection.disconnect!
end
private
# Connect to the database.
#
# Required options:
# * :database: The database name
#
# Options:
# * :adapter: The adapter to use (defaults to :mysql)
# * :username: The database username (defaults to 'root')
# * :password: The password to the database (defaults to nothing)
# * :host: The host for the database (defaults to 'localhost')
def connect
ETL::ActiveRecord::Base.establish_connection(
:adapter => (configuration[:adapter] || :mysql),
:username => (configuration[:username] || 'root'),
:host => (configuration[:host] || 'localhost'),
:password => configuration[:password],
:database => configuration[:database]
)
end
end
end
end