module ETL #:nodoc: module Control #:nodoc: # Destination which writes directly to a database. This is useful when you are dealing with # a small amount of data. For larger amounts of data you should probably use the bulk # loader if it is supported with your target database as it will use a much faster load # method. class DatabaseDestination < Destination # The target connection attr_reader :target # The table attr_reader :table # Specify the order from the source attr_reader :order # Set to true to truncate the destination table first attr_reader :truncate # Initialize the database destination # # * control: The ETL::Control::Control instance # * configuration: The configuration Hash # * mapping: The mapping # # Configuration options: # * :database: The database name (REQUIRED) # * :target: The target connection (REQUIRED) # * :table: The table to write to (REQUIRED) # * :truncate: Set to true to truncate before writing (defaults to false) # * :unique: Set to true to only insert unique records (defaults to false) # * :append_rows: Array of rows to append # # Mapping options: # * :order: The order of fields to write (REQUIRED) def initialize(control, configuration, mapping={}) super @target = configuration[:target] @table = configuration[:table] @truncate = configuration[:truncate] ||= false @unique = configuration[:unique] @order = mapping[:order] || order_from_source raise ControlError, "Order required in mapping" unless @order raise ControlError, "Table required" unless @table raise ControlError, "Target required" unless @target end # Flush the currently buffered data def flush conn.transaction do buffer.flatten.each do |row| # check to see if this row's compound key constraint already exists # note that the compound key constraint may not utilize virtual fields next unless row_allowed?(row) # add any virtual fields add_virtuals!(row) names = [] values = [] order.each do |name| names << name values << conn.quote(row[name]) # TODO: this is probably not database agnostic end q = "INSERT INTO #{table_name} (#{names.join(',')}) VALUES (#{values.join(',')})" ETL::Engine.logger.debug("Executing insert: #{q}") conn.insert(q, "Insert row #{current_row}") @current_row += 1 end buffer.clear end end # Close the connection def close buffer << append_rows if append_rows flush end private def conn @conn ||= begin conn = ETL::Engine.connection(target) conn.truncate(table_name) if truncate conn end end def table_name ETL::Engine.table(table, ETL::Engine.connection(target)) end end end end