module ETL #:nodoc: module Control #:nodoc: # Destination which writes directly to a database. This is useful when you are dealing with # a small amount of data. For larger amounts of data you should probably use the bulk # loader if it is supported with your target database as it will use a much faster load # method. class InsertUpdateDatabaseDestination < Destination # The target connection attr_reader :target # The table attr_reader :table # Specify the order from the source attr_reader :order # Specify the primarykey from the source attr_reader :primarykey # Set to true to truncate the destination table first attr_reader :truncate # Initialize the database destination # # * control: The ETL::Control::Control instance # * configuration: The configuration Hash # * mapping: The mapping # # Configuration options: # * :database: The database name (REQUIRED) # * :target: The target connection (REQUIRED) # * :table: The table to write to (REQUIRED) # * :truncate: Set to true to truncate before writing (defaults to false) # * :unique: Set to true to only insert unique records (defaults to false) # * :append_rows: Array of rows to append # # Mapping options: # * :order: The order of fields to write (REQUIRED) # * :primarykey: The primary key of fields to select insert or update (REQUIRED) def initialize(control, configuration, mapping={}) super @target = configuration[:target] @table = configuration[:table] @truncate = configuration[:truncate] ||= false @unique = configuration[:unique] ? configuration[:unique] + [scd_effective_date_field] : configuration[:unique] @unique.uniq! unless @unique.nil? @order = mapping[:order] ? mapping[:order] + scd_required_fields : order_from_source @order.uniq! unless @order.nil? @primarykey = mapping[:primarykey] ? mapping[:primarykey] + scd_required_fields : nil @primarykey.uniq! unless @primarykey.nil? raise ControlError, "Primarykey required in mapping" unless @primarykey raise ControlError, "Order required in mapping" unless @order raise ControlError, "Table required" unless @table raise ControlError, "Target required" unless @target end # Flush the currently buffered data def flush conn.transaction do buffer.flatten.each do |row| # check to see if this row's compound key constraint already exists # note that the compound key constraint may not utilize virtual fields next unless row_allowed?(row) # add any virtual fields add_virtuals!(row) primarykeyfilter = [] primarykey.each do |name| primarykeyfilter << "#{conn.quote_column_name(name)} = #{conn.quote(row[name])}" end q = "SELECT * FROM #{conn.quote_table_name(table_name)} WHERE #{primarykeyfilter.join(' AND ')}" ETL::Engine.logger.debug("Executing select: #{q}") res = conn.execute(q, "Select row #{current_row}") none = true case conn.class.name when "ActiveRecord::ConnectionAdapters::PostgreSQLAdapter" res.each { none = false } when "ActiveRecord::ConnectionAdapters::MysqlAdapter" res.each_hash { none = false } res.free when "ActiveRecord::ConnectionAdapters::Mysql2Adapter" res.each { none = false } else raise "Unsupported adapter #{conn.class} for this destination" end if none names = [] values = [] order.each do |name| names << conn.quote_column_name(name) values << conn.quote(row[name]) end q = "INSERT INTO #{conn.quote_table_name(table_name)} (#{names.join(',')}) VALUES (#{values.join(',')})" ETL::Engine.logger.debug("Executing insert: #{q}") conn.insert(q, "Insert row #{current_row}") else updatevalues = [] order.each do |name| updatevalues << "#{conn.quote_column_name(name)} = #{conn.quote(row[name])}" end q = "UPDATE #{conn.quote_table_name(table_name)} SET #{updatevalues.join(',')} WHERE #{primarykeyfilter.join(' AND ')}" ETL::Engine.logger.debug("Executing update: #{q}") conn.update(q, "Update row #{current_row}") end @current_row += 1 end buffer.clear end end # Close the connection def close buffer << append_rows if append_rows flush end private def conn @conn ||= begin conn = ETL::Engine.connection(target) conn.truncate(table_name) if truncate conn rescue raise RuntimeError, "Problem to connect to db" end end def table_name ETL::Engine.table(table, ETL::Engine.connection(target)) end end end end