lib/etl/control/destination.rb in activewarehouse-etl-0.9.1 vs lib/etl/control/destination.rb in activewarehouse-etl-0.9.5.rc1
- old
+ new
@@ -127,15 +127,27 @@
# Return fields which are Slowly Changing Dimension fields.
# Uses the scd_fields specified in the configuration. If that's
# missing, uses all of the row's fields.
def scd_fields(row)
@scd_fields ||= configuration[:scd_fields] || row.keys
+ ETL::Engine.logger.debug "@scd_fields is: #{@scd_fields.inspect}"
+ @scd_fields
end
+
+ # returns the fields that are required to identify an SCD
+ def scd_required_fields
+ if scd?
+ [scd_effective_date_field, scd_end_date_field, scd_latest_version_field]
+ else
+ []
+ end
+ end
def non_scd_fields(row)
- @non_csd_fields ||= row.keys - natural_key - scd_fields(row) -
- [primary_key, scd_effective_date_field, scd_end_date_field, scd_latest_version_field]
+ @non_scd_fields ||= row.keys - natural_key - scd_fields(row) - [primary_key] - scd_required_fields
+ ETL::Engine.logger.debug "@non_scd_fields is: #{@non_scd_fields.inspect}"
+ @non_scd_fields
end
def non_evolving_fields
(Array(configuration[:scd][:non_evolving_fields]) << primary_key).uniq
end
@@ -278,11 +290,12 @@
natural_key.each do |nk|
statement << "#{nk} = ?"
values << row[nk]
end
statement = statement.join(" AND ")
- ActiveRecord::Base.send(:sanitize_sql, [statement, *values])
+ x=ActiveRecord::Base.send(:sanitize_sql_array, [statement, *values])
+ return x
end
# Do all the steps required when a SCD *has* changed. Exact steps
# depend on what type of SCD we're handling.
def process_scd_change(row)
@@ -351,22 +364,29 @@
# Find the version of this row that already exists in the datawarehouse.
def preexisting_row(row)
q = "SELECT * FROM #{dimension_table} WHERE #{natural_key_equality_for_row(row)}"
q << " AND #{scd_latest_version_field}" if scd_type == 2
- #puts "looking for original record"
+ ETL::Engine.logger.debug "looking for original record"
result = connection.select_one(q)
- #puts "Result: #{result.inspect}"
+ ETL::Engine.logger.debug "Result: #{result.inspect}"
result ? ETL::Row[result.symbolize_keys!] : nil
end
# Check whether non-scd fields have changed since the last
# load of this record.
def has_scd_field_changes?(row)
- scd_fields(row).any? { |csd_field| row[csd_field].to_s != @existing_row[csd_field].to_s }
+ scd_fields(row).any? { |csd_field|
+ ETL::Engine.logger.debug "Row: #{row.inspect}"
+ ETL::Engine.logger.debug "Existing Row: #{@existing_row.inspect}"
+ ETL::Engine.logger.debug "comparing: #{row[csd_field].to_s} != #{@existing_row[csd_field].to_s}"
+ x=row[csd_field].to_s != @existing_row[csd_field].to_s
+ ETL::Engine.logger.debug x
+ x
+ }
end
# Check whether non-scd fields have changed since the last
# load of this record.
def has_non_scd_field_changes?(row)
@@ -415,6 +435,6 @@
end
end
end
-Dir[File.dirname(__FILE__) + "/destination/*.rb"].each { |file| require(file) }
\ No newline at end of file
+Dir[File.dirname(__FILE__) + "/destination/*.rb"].each { |file| require(file) }