lib/etl/control/destination.rb in activewarehouse-etl-0.8.4 vs lib/etl/control/destination.rb in activewarehouse-etl-0.9.0
- old
+ new
@@ -155,13 +155,18 @@
@natural_key ||= determine_natural_key
end
# Get the dimension table if specified
def dimension_table
- configuration[:scd][:dimension_table] if scd?
+ ETL::Engine.table(configuration[:scd][:dimension_table], dimension_target) if scd?
end
+ # Get the dimension target if specified
+ def dimension_target
+ configuration[:scd][:dimension_target] if scd?
+ end
+
# Process a row to determine the change type
def process_change(row)
ETL::Engine.logger.debug "Processing row: #{row.inspect}"
return unless row
@@ -207,24 +212,39 @@
# SCD Type 3: not supported
ETL::Engine.logger.debug "CRC does not match"
if scd_type == 2
ETL::Engine.logger.debug "type 2 SCD"
+
+ raise ConfigurationError, "dimension_table setting required" unless dimension_table
+ raise ConfigurationError, "dimension_target setting required" unless dimension_target
+
+ conn = ETL::Engine.connection(dimension_target)
+
q = "SELECT * FROM #{dimension_table} WHERE "
q << natural_key.collect { |nk| "#{nk} = '#{row[nk]}'" }.join(" AND ")
#puts "looking for original record"
- result = ETL::ActiveRecord::Base.connection.select_one(q)
+ result = conn.select_one(q)
if result
#puts "Result: #{result.inspect}"
original_record = ETL::Row[result.symbolize_keys!]
original_record[scd_end_date_field] = timestamp
ETL::Engine.logger.debug "writing original record"
# if there is no truncate then the row will exist twice in the database
# need to figure out how to delete that old record before inserting the
# updated version of the record
+ q = "DELETE FROM #{dimension_table} WHERE "
+ q << natural_key.collect { |nk| "#{nk} = '#{row[nk]}'" }.join(" AND ")
+
+ num_rows_affected = conn.delete(q)
+ ETL::Engine.logger.debug "deleted old row"
+
+ # do this?
+ #raise "Should have deleted a single record" if num_rows_affected != 1
+
buffer << original_record
end
row[scd_effective_date_field] = timestamp
row[scd_end_date_field] = '9999-12-31 00:00:00'
@@ -237,13 +257,18 @@
ETL::Engine.logger.debug "writing new record"
buffer << row
else
ETL::Engine.logger.debug "CRC matches, skipping"
+ raise ConfigurationError, "dimension_table setting required" unless dimension_table
+ raise ConfigurationError, "dimension_target setting required" unless dimension_target
+
+ conn = ETL::Engine.connection(dimension_target)
+
q = "SELECT * FROM #{dimension_table} WHERE "
q << natural_key.collect { |nk| "#{nk} = '#{row[nk]}'" }.join(" AND ")
- result = ETL::ActiveRecord::Base.connection.select_one(q)
+ result = conn.select_one(q)
if result
# This was necessary when truncating and then loading, however I
# am getting reluctant to having the ETL process do the truncation
# as part of the bulk load, favoring using a preprocessor instead.
# buffer << ETL::Row[result.symbolize_keys!]
@@ -295,10 +320,10 @@
case value
when Class
generator = generators[key] ||= value.new
row[key] = generator.next
when Symbol
- generator = generators[key] ||= ETL::Generator::Generator.class_for_name(value).new
+ generator = generators[key] ||= ETL::Generator::Generator.class_for_name(value).new(options)
row[key] = generator.next
when Proc
row[key] = value.call(row)
else
if value.is_a?(ETL::Generator::Generator)
\ No newline at end of file