lib/etl/control/destination.rb in activewarehouse-etl-0.9.0 vs lib/etl/control/destination.rb in activewarehouse-etl-0.9.1
- old
+ new
@@ -27,11 +27,11 @@
# Get the destination class for the specified name.
#
# For example if name is :database or 'database' then the
# DatabaseDestination class is returned
def class_for_name(name)
- ETL::Control.const_get("#{name.to_s.classify}Destination")
+ ETL::Control.const_get("#{name.to_s.camelize}Destination")
end
end
# Initialize the destination
#
@@ -122,16 +122,26 @@
# row can be written when the unique option is specified
def compound_key_constraints
@compound_key_constraints ||= {}
end
- # Return fields which are Slowly Changing Dimension fields. Return nil
- # by default.
- def scd_fields
- @scd_fields ||= configuration[:scd_fields]
+ # Return fields which are Slowly Changing Dimension fields.
+ # Uses the scd_fields specified in the configuration. If that's
+ # missing, uses all of the row's fields.
+ def scd_fields(row)
+ @scd_fields ||= configuration[:scd_fields] || row.keys
end
+ def non_scd_fields(row)
+ @non_csd_fields ||= row.keys - natural_key - scd_fields(row) -
+ [primary_key, scd_effective_date_field, scd_end_date_field, scd_latest_version_field]
+ end
+
+ def non_evolving_fields
+ (Array(configuration[:scd][:non_evolving_fields]) << primary_key).uniq
+ end
+
def scd?
!configuration[:scd].nil?
end
def scd_type
@@ -148,162 +158,63 @@
# 'end_date'.
def scd_end_date_field
configuration[:scd][:end_date_field] || :end_date if scd?
end
- # Return the natural key field name, defaults to :id
+ # Get the Slowly Changing Dimension latest version field. Defaults to
+ # 'latest_version'.
+ def scd_latest_version_field
+ configuration[:scd][:latest_version_field] || :latest_version if scd?
+ end
+
+ # Return the natural key field names, defaults to []
def natural_key
@natural_key ||= determine_natural_key
end
# Get the dimension table if specified
def dimension_table
- ETL::Engine.table(configuration[:scd][:dimension_table], dimension_target) if scd?
+ @dimension_table ||= if scd?
+ ETL::Engine.table(configuration[:scd][:dimension_table], dimension_target) or raise ConfigurationError, "dimension_table setting required"
+ end
end
# Get the dimension target if specified
def dimension_target
- configuration[:scd][:dimension_target] if scd?
+ @dimension_target ||= if scd?
+ configuration[:scd][:dimension_target] or raise ConfigurationError, "dimension_target setting required"
+ end
end
# Process a row to determine the change type
def process_change(row)
ETL::Engine.logger.debug "Processing row: #{row.inspect}"
return unless row
# Change processing can only occur if the natural key exists in the row
ETL::Engine.logger.debug "Checking for natural key existence"
- if natural_key.length == 0
+ unless has_natural_key?(row)
buffer << row
return
end
- natural_key.each do |key|
- unless row.has_key?(key)
- buffer << row
- return
- end
- end
-
- ETL::Engine.logger.debug "Checking for SCD fields"
- s = String.new
- if scd_fields
- scd_fields.each { |f| s << row[f].to_s }
- else
- row.each { |key,value| s << value.to_s }
- end
-
- # apply the CRC to 's' and see if it matches the last
- # ETL::Execution::Record with the samenatural key. If they match then
- # throw away this row (no need to process). If they do not match then
- # the record is an 'update'. If the record doesn't exist then it is an
- # 'insert'
- nk = natural_key.collect{|k|row[k]}.join('|')
- require 'zlib'
- crc = Zlib.crc32(s)
- record = ETL::Execution::Record.find_by_control_file_and_natural_key(control.file, nk)
-
- timestamp = Time.now
-
- ETL::Engine.logger.debug "Checking record change type"
- if record
- if record.crc != crc.to_s
- # SCD Type 1: only the new row should be added
- # SCD Type 2: both an old and new row should be added
- # SCD Type 3: not supported
- ETL::Engine.logger.debug "CRC does not match"
-
- if scd_type == 2
- ETL::Engine.logger.debug "type 2 SCD"
-
- raise ConfigurationError, "dimension_table setting required" unless dimension_table
- raise ConfigurationError, "dimension_target setting required" unless dimension_target
-
- conn = ETL::Engine.connection(dimension_target)
-
- q = "SELECT * FROM #{dimension_table} WHERE "
- q << natural_key.collect { |nk| "#{nk} = '#{row[nk]}'" }.join(" AND ")
- #puts "looking for original record"
- result = conn.select_one(q)
- if result
- #puts "Result: #{result.inspect}"
- original_record = ETL::Row[result.symbolize_keys!]
- original_record[scd_end_date_field] = timestamp
- ETL::Engine.logger.debug "writing original record"
-
- # if there is no truncate then the row will exist twice in the database
- # need to figure out how to delete that old record before inserting the
- # updated version of the record
-
- q = "DELETE FROM #{dimension_table} WHERE "
- q << natural_key.collect { |nk| "#{nk} = '#{row[nk]}'" }.join(" AND ")
-
- num_rows_affected = conn.delete(q)
- ETL::Engine.logger.debug "deleted old row"
-
- # do this?
- #raise "Should have deleted a single record" if num_rows_affected != 1
-
- buffer << original_record
- end
-
- row[scd_effective_date_field] = timestamp
- row[scd_end_date_field] = '9999-12-31 00:00:00'
- elsif scd_type == 1
- ETL::Engine.logger.debug "type 1 SCD"
- else
- ETL::Engine.logger.debug "SCD not specified"
- end
-
- ETL::Engine.logger.debug "writing new record"
- buffer << row
+ @timestamp = Time.now
+
+ # See if the scd_fields of the current record have changed
+ # from the last time this record was loaded into the data
+ # warehouse. If they match then throw away this row (no need
+ # to process). If they do not match then the record is an
+ # 'update'. If the record doesn't exist then it is an 'insert'
+ ETL::Engine.logger.debug "Checking record for SCD change"
+ if @existing_row = preexisting_row(row)
+ if has_scd_field_changes?(row)
+ process_scd_change(row)
else
- ETL::Engine.logger.debug "CRC matches, skipping"
-
- raise ConfigurationError, "dimension_table setting required" unless dimension_table
- raise ConfigurationError, "dimension_target setting required" unless dimension_target
-
- conn = ETL::Engine.connection(dimension_target)
-
- q = "SELECT * FROM #{dimension_table} WHERE "
- q << natural_key.collect { |nk| "#{nk} = '#{row[nk]}'" }.join(" AND ")
- result = conn.select_one(q)
- if result
- # This was necessary when truncating and then loading, however I
- # am getting reluctant to having the ETL process do the truncation
- # as part of the bulk load, favoring using a preprocessor instead.
- # buffer << ETL::Row[result.symbolize_keys!]
- else
- # The record never made it into the database, so add the effective and end date
- # and add it into the bulk load file
- row[scd_effective_date_field] = timestamp
- row[scd_end_date_field] = '9999-12-31 00:00:00'
- buffer << row
- end
+ process_scd_match(row)
end
else
- ETL::Engine.logger.debug "record never loaded"
- # Set the effective and end date fields
- if scd_type == 2
- row[scd_effective_date_field] = timestamp
- row[scd_end_date_field] = '9999-12-31 00:00:00'
- end
-
- # Write the row
- buffer << row
-
- # Record the record
- if ETL::Engine.job # only record the execution if there is a job
- ETL::Execution::Record.time_spent += Benchmark.realtime do
- ETL::Execution::Record.create!(
- :control_file => control.file,
- :natural_key => nk,
- :crc => crc,
- :job_id => ETL::Engine.job.id
- )
- end
- end
+ schedule_new_record(row)
end
end
# Add any virtual fields to the row. Virtual rows will get their value
# from one of the following:
@@ -314,10 +225,12 @@
# * If the mapping is a Proc, then it will be called with the row
# * Otherwise the value itself will be assigned to the field
def add_virtuals!(row)
if mapping[:virtual]
mapping[:virtual].each do |key,value|
+ # If the row already has the virtual set, assume that's correct
+ next if row[key]
# Engine.logger.debug "Mapping virtual #{key}/#{value} for row #{row}"
case value
when Class
generator = generators[key] ||= value.new
row[key] = generator.next
@@ -336,21 +249,171 @@
end
end
end
private
+
# Determine the natural key. This method will always return an array
- # of symbols. The default value is [:id].
+ # of symbols. The default value is [].
def determine_natural_key
- case configuration[:natural_key]
- when Array
- configuration[:natural_key].collect(&:to_sym)
- when String, Symbol
- [configuration[:natural_key].to_sym]
+ Array(configuration[:natural_key]).collect(&:to_sym)
+ end
+
+ # Check whether a natural key has been defined, and if so, whether
+ # this row has enough information to do searches based on that natural
+ # key.
+ #
+ # TODO: This should be factored out into
+ # ETL::Row#has_all_fields?(field_array) But that's not possible
+ # until *all* sources cast to ETL::Row, instead of sometimes
+ # using Hash
+ def has_natural_key?(row)
+ natural_key.any? && natural_key.all? { |key| row.has_key?(key) }
+ end
+
+ # Helper for generating the SQL where clause that allows searching
+ # by a natural key
+ def natural_key_equality_for_row(row)
+ statement = []
+ values = []
+ natural_key.each do |nk|
+ statement << "#{nk} = ?"
+ values << row[nk]
+ end
+ statement = statement.join(" AND ")
+ ActiveRecord::Base.send(:sanitize_sql, [statement, *values])
+ end
+
+ # Do all the steps required when a SCD *has* changed. Exact steps
+ # depend on what type of SCD we're handling.
+ def process_scd_change(row)
+ ETL::Engine.logger.debug "SCD fields do not match"
+
+ if scd_type == 2
+ # SCD Type 2: new row should be added and old row should be updated
+ ETL::Engine.logger.debug "type 2 SCD"
+
+ # To update the old row, we delete the version in the database
+ # and insert a new expired version
+
+ # If there is no truncate then the row will exist twice in the database
+ delete_outdated_record
+
+ ETL::Engine.logger.debug "expiring original record"
+ @existing_row[scd_end_date_field] = @timestamp
+ @existing_row[scd_latest_version_field] = false
+
+ buffer << @existing_row
+
+ elsif scd_type == 1
+ # SCD Type 1: only the new row should be added
+ ETL::Engine.logger.debug "type 1 SCD"
+
+ # Copy primary key, and other non-evolving fields over from
+ # original version of record
+ non_evolving_fields.each do |non_evolving_field|
+ row[non_evolving_field] = @existing_row[non_evolving_field]
+ end
+
+ # If there is no truncate then the row will exist twice in the database
+ delete_outdated_record
else
- [] # no natural key defined
+ # SCD Type 3: not supported
+ ETL::Engine.logger.debug "SCD type #{scd_type} not supported"
end
+
+ # In all cases, the latest, greatest version of the record
+ # should go into the load
+ schedule_new_record(row)
end
+
+ # Do all the steps required when a SCD has *not* changed. Exact
+ # steps depend on what type of SCD we're handling.
+ def process_scd_match(row)
+ ETL::Engine.logger.debug "SCD fields match"
+
+ if scd_type == 2 && has_non_scd_field_changes?(row)
+ ETL::Engine.logger.debug "Non-SCD field changes"
+ # Copy important data over from original version of record
+ row[primary_key] = @existing_row[primary_key]
+ row[scd_end_date_field] = @existing_row[scd_end_date_field]
+ row[scd_effective_date_field] = @existing_row[scd_effective_date_field]
+ row[scd_latest_version_field] = @existing_row[scd_latest_version_field]
+
+ # If there is no truncate then the row will exist twice in the database
+ delete_outdated_record
+
+ buffer << row
+ else
+ # The record is totally the same, so skip it
+ end
+ end
+
+ # Find the version of this row that already exists in the datawarehouse.
+ def preexisting_row(row)
+ q = "SELECT * FROM #{dimension_table} WHERE #{natural_key_equality_for_row(row)}"
+ q << " AND #{scd_latest_version_field}" if scd_type == 2
+
+ #puts "looking for original record"
+ result = connection.select_one(q)
+
+ #puts "Result: #{result.inspect}"
+
+ result ? ETL::Row[result.symbolize_keys!] : nil
+ end
+
+ # Check whether non-scd fields have changed since the last
+ # load of this record.
+ def has_scd_field_changes?(row)
+ scd_fields(row).any? { |csd_field| row[csd_field].to_s != @existing_row[csd_field].to_s }
+ end
+
+ # Check whether non-scd fields have changed since the last
+ # load of this record.
+ def has_non_scd_field_changes?(row)
+ non_scd_fields(row).any? { |non_csd_field| row[non_csd_field].to_s != @existing_row[non_csd_field].to_s }
+ end
+
+ # Grab, or re-use, a database connection for running queries directly
+ # during the destination processing.
+ def connection
+ @conn ||= ETL::Engine.connection(dimension_target)
+ end
+
+ # Utility for removing a row that has outdated information. Note
+ # that this deletes directly from the database, even if this is a file
+ # destination. It needs to do this because you can't do deletes in a
+ # bulk load.
+ def delete_outdated_record
+ ETL::Engine.logger.debug "deleting old row"
+
+ q = "DELETE FROM #{dimension_table} WHERE #{primary_key} = #{@existing_row[primary_key]}"
+ connection.delete(q)
+ end
+
+ # Schedule the latest, greatest version of the row for insertion
+ # into the database
+ def schedule_new_record(row)
+ ETL::Engine.logger.debug "writing new record"
+ if scd_type == 2
+ row[scd_effective_date_field] = @timestamp
+ row[scd_end_date_field] = '9999-12-31 00:00:00'
+ row[scd_latest_version_field] = true
+ end
+ buffer << row
+ end
+
+ # Get the name of the primary key for this table. Asks the dimension
+ # model class for this information, but if that class hasn't been
+ # defined, just defaults to :id.
+ def primary_key
+ return @primary_key if @primary_key
+ @primary_key = dimension_table.to_s.camelize.constantize.primary_key.to_sym
+ rescue NameError => e
+ ETL::Engine.logger.debug "couldn't get primary_key from dimension model class, using default :id"
+ @primary_key = :id
+ end
+
end
end
end
Dir[File.dirname(__FILE__) + "/destination/*.rb"].each { |file| require(file) }
\ No newline at end of file