Sha256: 50040823dfe2fab24dbbfb5c910e46f48d4e6cd6bd45aad45ec329d99dff58aa
Contents?: true
Size: 1.92 KB
Versions: 1
Compression:
Stored size: 1.92 KB
Contents
module ETL module Control class DatabaseDestination < Destination attr_reader :order, :truncate def initialize(control, configuration, mapping) super @truncate = configuration[:truncate] ||= false @unique = configuration[:unique] @order = mapping[:order] || order_from_source raise ControlError, "Order required in mapping" unless @order connect end def flush conn = ActiveRecord::Base.connection conn.transaction do conn.truncate(configuration[:table]) if truncate buffer.each do |row| # check to see if this row's compound key constraint already exists # note that the compound key constraint may not utilize virtual fields next unless row_allowed?(row) # add any virtual fields add_virtuals!(row) names = [] values = [] order.each do |name| names << name values << "'#{row[name]}'" # TODO: this is probably not database agnostic end q = "INSERT INTO #{configuration[:table]} (#{names.join(',')}) VALUES (#{values.join(',')})" # ETL::Engine.logger.debug("Query: #{q}") conn.execute(q, "Insert row #{current_row}") @current_row += 1 end buffer.clear end end # Close the connection def close flush ActiveRecord::Base.connection.disconnect! end private def connect ActiveRecord::Base.establish_connection( :adapter => (configuration[:adapter] || :mysql), :username => (configuration[:username] || 'root'), :host => (configuration[:host] || 'localhost'), :password => configuration[:password], :database => configuration[:database] ) end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
activewarehouse-etl-0.4.0 | lib/etl/control/destination/database_destination.rb |