lib/etl/control/destination.rb in activewarehouse-etl-0.9.1 vs lib/etl/control/destination.rb in activewarehouse-etl-0.9.5.rc1

- old
+ new

@@ -127,15 +127,27 @@ # Return fields which are Slowly Changing Dimension fields. # Uses the scd_fields specified in the configuration. If that's # missing, uses all of the row's fields. def scd_fields(row) @scd_fields ||= configuration[:scd_fields] || row.keys + ETL::Engine.logger.debug "@scd_fields is: #{@scd_fields.inspect}" + @scd_fields end + + # returns the fields that are required to identify an SCD + def scd_required_fields + if scd? + [scd_effective_date_field, scd_end_date_field, scd_latest_version_field] + else + [] + end + end def non_scd_fields(row) - @non_csd_fields ||= row.keys - natural_key - scd_fields(row) - - [primary_key, scd_effective_date_field, scd_end_date_field, scd_latest_version_field] + @non_scd_fields ||= row.keys - natural_key - scd_fields(row) - [primary_key] - scd_required_fields + ETL::Engine.logger.debug "@non_scd_fields is: #{@non_scd_fields.inspect}" + @non_scd_fields end def non_evolving_fields (Array(configuration[:scd][:non_evolving_fields]) << primary_key).uniq end @@ -278,11 +290,12 @@ natural_key.each do |nk| statement << "#{nk} = ?" values << row[nk] end statement = statement.join(" AND ") - ActiveRecord::Base.send(:sanitize_sql, [statement, *values]) + x=ActiveRecord::Base.send(:sanitize_sql_array, [statement, *values]) + return x end # Do all the steps required when a SCD *has* changed. Exact steps # depend on what type of SCD we're handling. def process_scd_change(row) @@ -351,22 +364,29 @@ # Find the version of this row that already exists in the datawarehouse. def preexisting_row(row) q = "SELECT * FROM #{dimension_table} WHERE #{natural_key_equality_for_row(row)}" q << " AND #{scd_latest_version_field}" if scd_type == 2 - #puts "looking for original record" + ETL::Engine.logger.debug "looking for original record" result = connection.select_one(q) - #puts "Result: #{result.inspect}" + ETL::Engine.logger.debug "Result: #{result.inspect}" result ? ETL::Row[result.symbolize_keys!] : nil end # Check whether non-scd fields have changed since the last # load of this record. def has_scd_field_changes?(row) - scd_fields(row).any? { |csd_field| row[csd_field].to_s != @existing_row[csd_field].to_s } + scd_fields(row).any? { |csd_field| + ETL::Engine.logger.debug "Row: #{row.inspect}" + ETL::Engine.logger.debug "Existing Row: #{@existing_row.inspect}" + ETL::Engine.logger.debug "comparing: #{row[csd_field].to_s} != #{@existing_row[csd_field].to_s}" + x=row[csd_field].to_s != @existing_row[csd_field].to_s + ETL::Engine.logger.debug x + x + } end # Check whether non-scd fields have changed since the last # load of this record. def has_non_scd_field_changes?(row) @@ -415,6 +435,6 @@ end end end -Dir[File.dirname(__FILE__) + "/destination/*.rb"].each { |file| require(file) } \ No newline at end of file +Dir[File.dirname(__FILE__) + "/destination/*.rb"].each { |file| require(file) }