module ETL #:nodoc: module Control #:nodoc: # Destination which writes directly to a database. This is useful when you are dealing with # a small amount of data. For larger amounts of data you should probably use the bulk # loader if it is supported with your target database as it will use a much faster load # method. class DatabaseDestination < Destination # Specify the order from the source attr_reader :order # Set to true to truncate the destination table first attr_reader :truncate # Initialize the database destination # # * control: The ETL::Control::Control instance # * configuration: The configuration Hash # * mapping: The mapping # # Configuration options: # * :database: The database name (REQUIRED) # * :table: The table to write to (REQUIRED) # * :truncate: Set to true to truncate before writing (defaults to false) # * :unique: Set to true to only insert unique records (defaults to false) # * :adapter: The adapter to use (defaults to :mysql) # * :username: The database username (defaults to 'root') # * :password: The password to the database (defaults to nothing) # * :host: The host for the database (defaults to 'localhost') # # Mapping options: # * :order: The order of fields to write (REQUIRED) def initialize(control, configuration, mapping) super @truncate = configuration[:truncate] ||= false @unique = configuration[:unique] @order = mapping[:order] || order_from_source raise ControlError, "Order required in mapping" unless @order connect end # Flush the currently buffered data def flush conn = ETL::ActiveRecord::Base.connection conn.transaction do conn.truncate(configuration[:table]) if truncate buffer.each do |row| # check to see if this row's compound key constraint already exists # note that the compound key constraint may not utilize virtual fields next unless row_allowed?(row) # add any virtual fields add_virtuals!(row) names = [] values = [] order.each do |name| names << name values << "'#{row[name]}'" # TODO: this is probably not database agnostic end q = "INSERT INTO #{configuration[:table]} (#{names.join(',')}) VALUES (#{values.join(',')})" # ETL::Engine.logger.debug("Query: #{q}") conn.insert(q, "Insert row #{current_row}") @current_row += 1 end buffer.clear end end # Close the connection def close flush ETL::ActiveRecord::Base.connection.disconnect! end private # Connect to the database. # # Required options: # * :database: The database name # # Options: # * :adapter: The adapter to use (defaults to :mysql) # * :username: The database username (defaults to 'root') # * :password: The password to the database (defaults to nothing) # * :host: The host for the database (defaults to 'localhost') def connect ETL::ActiveRecord::Base.establish_connection( :adapter => (configuration[:adapter] || :mysql), :username => (configuration[:username] || 'root'), :host => (configuration[:host] || 'localhost'), :password => configuration[:password], :database => configuration[:database] ) end end end end