lib/etl/control/destination.rb in activewarehouse-etl-0.8.4 vs lib/etl/control/destination.rb in activewarehouse-etl-0.9.0

- old
+ new

@@ -155,13 +155,18 @@ @natural_key ||= determine_natural_key end # Get the dimension table if specified def dimension_table - configuration[:scd][:dimension_table] if scd? + ETL::Engine.table(configuration[:scd][:dimension_table], dimension_target) if scd? end + # Get the dimension target if specified + def dimension_target + configuration[:scd][:dimension_target] if scd? + end + # Process a row to determine the change type def process_change(row) ETL::Engine.logger.debug "Processing row: #{row.inspect}" return unless row @@ -207,24 +212,39 @@ # SCD Type 3: not supported ETL::Engine.logger.debug "CRC does not match" if scd_type == 2 ETL::Engine.logger.debug "type 2 SCD" + + raise ConfigurationError, "dimension_table setting required" unless dimension_table + raise ConfigurationError, "dimension_target setting required" unless dimension_target + + conn = ETL::Engine.connection(dimension_target) + q = "SELECT * FROM #{dimension_table} WHERE " q << natural_key.collect { |nk| "#{nk} = '#{row[nk]}'" }.join(" AND ") #puts "looking for original record" - result = ETL::ActiveRecord::Base.connection.select_one(q) + result = conn.select_one(q) if result #puts "Result: #{result.inspect}" original_record = ETL::Row[result.symbolize_keys!] original_record[scd_end_date_field] = timestamp ETL::Engine.logger.debug "writing original record" # if there is no truncate then the row will exist twice in the database # need to figure out how to delete that old record before inserting the # updated version of the record + q = "DELETE FROM #{dimension_table} WHERE " + q << natural_key.collect { |nk| "#{nk} = '#{row[nk]}'" }.join(" AND ") + + num_rows_affected = conn.delete(q) + ETL::Engine.logger.debug "deleted old row" + + # do this? + #raise "Should have deleted a single record" if num_rows_affected != 1 + buffer << original_record end row[scd_effective_date_field] = timestamp row[scd_end_date_field] = '9999-12-31 00:00:00' @@ -237,13 +257,18 @@ ETL::Engine.logger.debug "writing new record" buffer << row else ETL::Engine.logger.debug "CRC matches, skipping" + raise ConfigurationError, "dimension_table setting required" unless dimension_table + raise ConfigurationError, "dimension_target setting required" unless dimension_target + + conn = ETL::Engine.connection(dimension_target) + q = "SELECT * FROM #{dimension_table} WHERE " q << natural_key.collect { |nk| "#{nk} = '#{row[nk]}'" }.join(" AND ") - result = ETL::ActiveRecord::Base.connection.select_one(q) + result = conn.select_one(q) if result # This was necessary when truncating and then loading, however I # am getting reluctant to having the ETL process do the truncation # as part of the bulk load, favoring using a preprocessor instead. # buffer << ETL::Row[result.symbolize_keys!] @@ -295,10 +320,10 @@ case value when Class generator = generators[key] ||= value.new row[key] = generator.next when Symbol - generator = generators[key] ||= ETL::Generator::Generator.class_for_name(value).new + generator = generators[key] ||= ETL::Generator::Generator.class_for_name(value).new(options) row[key] = generator.next when Proc row[key] = value.call(row) else if value.is_a?(ETL::Generator::Generator) \ No newline at end of file