lib/etl/control/destination.rb in activewarehouse-etl-0.9.0 vs lib/etl/control/destination.rb in activewarehouse-etl-0.9.1

- old
+ new

@@ -27,11 +27,11 @@ # Get the destination class for the specified name. # # For example if name is :database or 'database' then the # DatabaseDestination class is returned def class_for_name(name) - ETL::Control.const_get("#{name.to_s.classify}Destination") + ETL::Control.const_get("#{name.to_s.camelize}Destination") end end # Initialize the destination # @@ -122,16 +122,26 @@ # row can be written when the unique option is specified def compound_key_constraints @compound_key_constraints ||= {} end - # Return fields which are Slowly Changing Dimension fields. Return nil - # by default. - def scd_fields - @scd_fields ||= configuration[:scd_fields] + # Return fields which are Slowly Changing Dimension fields. + # Uses the scd_fields specified in the configuration. If that's + # missing, uses all of the row's fields. + def scd_fields(row) + @scd_fields ||= configuration[:scd_fields] || row.keys end + def non_scd_fields(row) + @non_csd_fields ||= row.keys - natural_key - scd_fields(row) - + [primary_key, scd_effective_date_field, scd_end_date_field, scd_latest_version_field] + end + + def non_evolving_fields + (Array(configuration[:scd][:non_evolving_fields]) << primary_key).uniq + end + def scd? !configuration[:scd].nil? end def scd_type @@ -148,162 +158,63 @@ # 'end_date'. def scd_end_date_field configuration[:scd][:end_date_field] || :end_date if scd? end - # Return the natural key field name, defaults to :id + # Get the Slowly Changing Dimension latest version field. Defaults to + # 'latest_version'. + def scd_latest_version_field + configuration[:scd][:latest_version_field] || :latest_version if scd? + end + + # Return the natural key field names, defaults to [] def natural_key @natural_key ||= determine_natural_key end # Get the dimension table if specified def dimension_table - ETL::Engine.table(configuration[:scd][:dimension_table], dimension_target) if scd? + @dimension_table ||= if scd? + ETL::Engine.table(configuration[:scd][:dimension_table], dimension_target) or raise ConfigurationError, "dimension_table setting required" + end end # Get the dimension target if specified def dimension_target - configuration[:scd][:dimension_target] if scd? + @dimension_target ||= if scd? + configuration[:scd][:dimension_target] or raise ConfigurationError, "dimension_target setting required" + end end # Process a row to determine the change type def process_change(row) ETL::Engine.logger.debug "Processing row: #{row.inspect}" return unless row # Change processing can only occur if the natural key exists in the row ETL::Engine.logger.debug "Checking for natural key existence" - if natural_key.length == 0 + unless has_natural_key?(row) buffer << row return end - natural_key.each do |key| - unless row.has_key?(key) - buffer << row - return - end - end - - ETL::Engine.logger.debug "Checking for SCD fields" - s = String.new - if scd_fields - scd_fields.each { |f| s << row[f].to_s } - else - row.each { |key,value| s << value.to_s } - end - - # apply the CRC to 's' and see if it matches the last - # ETL::Execution::Record with the samenatural key. If they match then - # throw away this row (no need to process). If they do not match then - # the record is an 'update'. If the record doesn't exist then it is an - # 'insert' - nk = natural_key.collect{|k|row[k]}.join('|') - require 'zlib' - crc = Zlib.crc32(s) - record = ETL::Execution::Record.find_by_control_file_and_natural_key(control.file, nk) - - timestamp = Time.now - - ETL::Engine.logger.debug "Checking record change type" - if record - if record.crc != crc.to_s - # SCD Type 1: only the new row should be added - # SCD Type 2: both an old and new row should be added - # SCD Type 3: not supported - ETL::Engine.logger.debug "CRC does not match" - - if scd_type == 2 - ETL::Engine.logger.debug "type 2 SCD" - - raise ConfigurationError, "dimension_table setting required" unless dimension_table - raise ConfigurationError, "dimension_target setting required" unless dimension_target - - conn = ETL::Engine.connection(dimension_target) - - q = "SELECT * FROM #{dimension_table} WHERE " - q << natural_key.collect { |nk| "#{nk} = '#{row[nk]}'" }.join(" AND ") - #puts "looking for original record" - result = conn.select_one(q) - if result - #puts "Result: #{result.inspect}" - original_record = ETL::Row[result.symbolize_keys!] - original_record[scd_end_date_field] = timestamp - ETL::Engine.logger.debug "writing original record" - - # if there is no truncate then the row will exist twice in the database - # need to figure out how to delete that old record before inserting the - # updated version of the record - - q = "DELETE FROM #{dimension_table} WHERE " - q << natural_key.collect { |nk| "#{nk} = '#{row[nk]}'" }.join(" AND ") - - num_rows_affected = conn.delete(q) - ETL::Engine.logger.debug "deleted old row" - - # do this? - #raise "Should have deleted a single record" if num_rows_affected != 1 - - buffer << original_record - end - - row[scd_effective_date_field] = timestamp - row[scd_end_date_field] = '9999-12-31 00:00:00' - elsif scd_type == 1 - ETL::Engine.logger.debug "type 1 SCD" - else - ETL::Engine.logger.debug "SCD not specified" - end - - ETL::Engine.logger.debug "writing new record" - buffer << row + @timestamp = Time.now + + # See if the scd_fields of the current record have changed + # from the last time this record was loaded into the data + # warehouse. If they match then throw away this row (no need + # to process). If they do not match then the record is an + # 'update'. If the record doesn't exist then it is an 'insert' + ETL::Engine.logger.debug "Checking record for SCD change" + if @existing_row = preexisting_row(row) + if has_scd_field_changes?(row) + process_scd_change(row) else - ETL::Engine.logger.debug "CRC matches, skipping" - - raise ConfigurationError, "dimension_table setting required" unless dimension_table - raise ConfigurationError, "dimension_target setting required" unless dimension_target - - conn = ETL::Engine.connection(dimension_target) - - q = "SELECT * FROM #{dimension_table} WHERE " - q << natural_key.collect { |nk| "#{nk} = '#{row[nk]}'" }.join(" AND ") - result = conn.select_one(q) - if result - # This was necessary when truncating and then loading, however I - # am getting reluctant to having the ETL process do the truncation - # as part of the bulk load, favoring using a preprocessor instead. - # buffer << ETL::Row[result.symbolize_keys!] - else - # The record never made it into the database, so add the effective and end date - # and add it into the bulk load file - row[scd_effective_date_field] = timestamp - row[scd_end_date_field] = '9999-12-31 00:00:00' - buffer << row - end + process_scd_match(row) end else - ETL::Engine.logger.debug "record never loaded" - # Set the effective and end date fields - if scd_type == 2 - row[scd_effective_date_field] = timestamp - row[scd_end_date_field] = '9999-12-31 00:00:00' - end - - # Write the row - buffer << row - - # Record the record - if ETL::Engine.job # only record the execution if there is a job - ETL::Execution::Record.time_spent += Benchmark.realtime do - ETL::Execution::Record.create!( - :control_file => control.file, - :natural_key => nk, - :crc => crc, - :job_id => ETL::Engine.job.id - ) - end - end + schedule_new_record(row) end end # Add any virtual fields to the row. Virtual rows will get their value # from one of the following: @@ -314,10 +225,12 @@ # * If the mapping is a Proc, then it will be called with the row # * Otherwise the value itself will be assigned to the field def add_virtuals!(row) if mapping[:virtual] mapping[:virtual].each do |key,value| + # If the row already has the virtual set, assume that's correct + next if row[key] # Engine.logger.debug "Mapping virtual #{key}/#{value} for row #{row}" case value when Class generator = generators[key] ||= value.new row[key] = generator.next @@ -336,21 +249,171 @@ end end end private + # Determine the natural key. This method will always return an array - # of symbols. The default value is [:id]. + # of symbols. The default value is []. def determine_natural_key - case configuration[:natural_key] - when Array - configuration[:natural_key].collect(&:to_sym) - when String, Symbol - [configuration[:natural_key].to_sym] + Array(configuration[:natural_key]).collect(&:to_sym) + end + + # Check whether a natural key has been defined, and if so, whether + # this row has enough information to do searches based on that natural + # key. + # + # TODO: This should be factored out into + # ETL::Row#has_all_fields?(field_array) But that's not possible + # until *all* sources cast to ETL::Row, instead of sometimes + # using Hash + def has_natural_key?(row) + natural_key.any? && natural_key.all? { |key| row.has_key?(key) } + end + + # Helper for generating the SQL where clause that allows searching + # by a natural key + def natural_key_equality_for_row(row) + statement = [] + values = [] + natural_key.each do |nk| + statement << "#{nk} = ?" + values << row[nk] + end + statement = statement.join(" AND ") + ActiveRecord::Base.send(:sanitize_sql, [statement, *values]) + end + + # Do all the steps required when a SCD *has* changed. Exact steps + # depend on what type of SCD we're handling. + def process_scd_change(row) + ETL::Engine.logger.debug "SCD fields do not match" + + if scd_type == 2 + # SCD Type 2: new row should be added and old row should be updated + ETL::Engine.logger.debug "type 2 SCD" + + # To update the old row, we delete the version in the database + # and insert a new expired version + + # If there is no truncate then the row will exist twice in the database + delete_outdated_record + + ETL::Engine.logger.debug "expiring original record" + @existing_row[scd_end_date_field] = @timestamp + @existing_row[scd_latest_version_field] = false + + buffer << @existing_row + + elsif scd_type == 1 + # SCD Type 1: only the new row should be added + ETL::Engine.logger.debug "type 1 SCD" + + # Copy primary key, and other non-evolving fields over from + # original version of record + non_evolving_fields.each do |non_evolving_field| + row[non_evolving_field] = @existing_row[non_evolving_field] + end + + # If there is no truncate then the row will exist twice in the database + delete_outdated_record else - [] # no natural key defined + # SCD Type 3: not supported + ETL::Engine.logger.debug "SCD type #{scd_type} not supported" end + + # In all cases, the latest, greatest version of the record + # should go into the load + schedule_new_record(row) end + + # Do all the steps required when a SCD has *not* changed. Exact + # steps depend on what type of SCD we're handling. + def process_scd_match(row) + ETL::Engine.logger.debug "SCD fields match" + + if scd_type == 2 && has_non_scd_field_changes?(row) + ETL::Engine.logger.debug "Non-SCD field changes" + # Copy important data over from original version of record + row[primary_key] = @existing_row[primary_key] + row[scd_end_date_field] = @existing_row[scd_end_date_field] + row[scd_effective_date_field] = @existing_row[scd_effective_date_field] + row[scd_latest_version_field] = @existing_row[scd_latest_version_field] + + # If there is no truncate then the row will exist twice in the database + delete_outdated_record + + buffer << row + else + # The record is totally the same, so skip it + end + end + + # Find the version of this row that already exists in the datawarehouse. + def preexisting_row(row) + q = "SELECT * FROM #{dimension_table} WHERE #{natural_key_equality_for_row(row)}" + q << " AND #{scd_latest_version_field}" if scd_type == 2 + + #puts "looking for original record" + result = connection.select_one(q) + + #puts "Result: #{result.inspect}" + + result ? ETL::Row[result.symbolize_keys!] : nil + end + + # Check whether non-scd fields have changed since the last + # load of this record. + def has_scd_field_changes?(row) + scd_fields(row).any? { |csd_field| row[csd_field].to_s != @existing_row[csd_field].to_s } + end + + # Check whether non-scd fields have changed since the last + # load of this record. + def has_non_scd_field_changes?(row) + non_scd_fields(row).any? { |non_csd_field| row[non_csd_field].to_s != @existing_row[non_csd_field].to_s } + end + + # Grab, or re-use, a database connection for running queries directly + # during the destination processing. + def connection + @conn ||= ETL::Engine.connection(dimension_target) + end + + # Utility for removing a row that has outdated information. Note + # that this deletes directly from the database, even if this is a file + # destination. It needs to do this because you can't do deletes in a + # bulk load. + def delete_outdated_record + ETL::Engine.logger.debug "deleting old row" + + q = "DELETE FROM #{dimension_table} WHERE #{primary_key} = #{@existing_row[primary_key]}" + connection.delete(q) + end + + # Schedule the latest, greatest version of the row for insertion + # into the database + def schedule_new_record(row) + ETL::Engine.logger.debug "writing new record" + if scd_type == 2 + row[scd_effective_date_field] = @timestamp + row[scd_end_date_field] = '9999-12-31 00:00:00' + row[scd_latest_version_field] = true + end + buffer << row + end + + # Get the name of the primary key for this table. Asks the dimension + # model class for this information, but if that class hasn't been + # defined, just defaults to :id. + def primary_key + return @primary_key if @primary_key + @primary_key = dimension_table.to_s.camelize.constantize.primary_key.to_sym + rescue NameError => e + ETL::Engine.logger.debug "couldn't get primary_key from dimension model class, using default :id" + @primary_key = :id + end + end end end Dir[File.dirname(__FILE__) + "/destination/*.rb"].each { |file| require(file) } \ No newline at end of file