lib/etl/control/destination.rb in activewarehouse-etl-0.6.1 vs lib/etl/control/destination.rb in activewarehouse-etl-0.7.0
- old
+ new
@@ -24,12 +24,12 @@
attr_accessor :append_rows
class << self
# Get the destination class for the specified name.
#
- # For example if name is :database or 'database' then the DatabaseDestination class
- # is returned
+ # For example if name is :database or 'database' then the
+ # DatabaseDestination class is returned
def class_for_name(name)
ETL::Control.const_get("#{name.to_s.classify}Destination")
end
end
@@ -40,17 +40,18 @@
# * <tt>configuration</tt>: The configuration Hash
# * <tt>mapping</tt>: The mapping Hash
#
# Options:
# * <tt>:buffer_size</tt>: The output buffer size (default 1000 records)
- # * <tt>:condition</tt>: A conditional proc that must return true for the row to be written
+ # * <tt>:condition</tt>: A conditional proc that must return true for the
+ # row to be written
# * <tt>:append_rows</tt>: An array of rows to append
def initialize(control, configuration, mapping)
@control = control
@configuration = configuration
@mapping = mapping
- @buffer_size = configuration[:buffer_size] ||= 1000
+ @buffer_size = configuration[:buffer_size] ||= 100
@condition = configuration[:condition]
@append_rows = configuration[:append_rows]
end
# Get the current row number
@@ -59,11 +60,11 @@
end
# Write the given row
def write(row)
if @condition.nil? || @condition.call(row)
- buffer << row
+ process_change(row)
end
flush if buffer.length >= buffer_size
end
# Abstract method
@@ -103,30 +104,176 @@
end
end
order
end
- # Return true if the row is allowed. The row will not be allowed if the :unique option is specified
- # in the configuration and the compound key already exists
+ # Return true if the row is allowed. The row will not be allowed if the
+ # :unique option is specified in the configuration and the compound key
+ # already exists
def row_allowed?(row)
if unique
key = (unique.collect { |k| row[k] }).join('|')
return false if compound_key_constraints[key]
compound_key_constraints[key] = 1
end
return true
end
- # Get a hash of compound key contraints. This is used to determine if a row can be written when the
- # unique option is specified
+ # Get a hash of compound key contraints. This is used to determine if a
+ # row can be written when the unique option is specified
def compound_key_constraints
@compound_key_constraints ||= {}
end
- # Add any virtual fields to the row. Virtual rows will get their value from one of the following:
- # * If the mapping is a Class, then an object which implements the next method
- # * If the mapping is a Symbol, then the XGenerator where X is the classified symbol
+ # Return fields which are Slowly Changing Dimension fields. Return nil
+ # by default.
+ def scd_fields
+ @scd_fields ||= configuration[:scd_fields]
+ end
+
+ def scd?
+ !configuration[:scd].nil?
+ end
+
+ def scd_type
+ scd? ? configuration[:scd][:type] : nil
+ end
+
+ # Get the Slowly Changing Dimension effective date field. Defaults to
+ # 'effective_date'.
+ def scd_effective_date_field
+ configuration[:scd][:effective_date_field] || :effective_date if scd?
+ end
+
+ # Get the Slowly Changing Dimension end date field. Defaults to
+ # 'end_date'.
+ def scd_end_date_field
+ configuration[:scd][:end_date_field] || :end_date if scd?
+ end
+
+ # Return the natural key field name, defaults to :id
+ def natural_key
+ @natural_key ||= determine_natural_key
+ end
+
+ # Get the dimension table if specified
+ def dimension_table
+ configuration[:scd][:dimension_table] if scd?
+ end
+
+ # Process a row to determine the change type
+ def process_change(row)
+ return unless row
+
+ # Change processing can only occur if the natural key exists in the row
+ supports_change = true
+ natural_key.each do |key|
+ unless row.has_key?(key)
+ buffer << row
+ return
+ end
+ end
+
+ ETL::Engine.logger.debug "checking scd fields"
+ s = String.new
+ if scd_fields
+ scd_fields.each { |f| s << row[f].to_s }
+ else
+ row.each { |key,value| s << value.to_s }
+ end
+
+ # apply the CRC to 's' and see if it matches the last
+ # ETL::Execution::Record with the samenatural key. If they match then
+ # throw away this row (no need to process). If they do not match then
+ # the record is an 'update'. If the record doesn't exist then it is an
+ # 'insert'
+ nk = natural_key.collect{|k|row[k]}.join('|')
+ require 'zlib'
+ crc = Zlib.crc32(s)
+ record = ETL::Execution::Record.find_by_control_file_and_natural_key(control.file, nk)
+
+ timestamp = Time.now
+
+ ETL::Engine.logger.debug "checking record change type"
+ if record
+ if record.crc != crc.to_s
+ # SCD Type 1: only the new row should be added
+ # SCD Type 2: both an old and new row should be added
+ # SCD Type 3: not supported
+ ETL::Engine.logger.debug "CRC does not match"
+
+ if scd_type == 2
+ ETL::Engine.logger.debug "type 2 SCD"
+ q = "SELECT * FROM #{dimension_table} WHERE "
+ q << natural_key.collect { |nk| "#{nk} = '#{row[nk]}'" }.join(" AND ")
+ #puts "looking for original record"
+ result = ActiveRecord::Base.connection.select_one(q)
+ if result
+ #puts "Result: #{result.inspect}"
+ original_record = ETL::Row[result.symbolize_keys!]
+ original_record[scd_end_date_field] = timestamp
+ ETL::Engine.logger.debug "writing original record"
+ buffer << original_record
+ end
+
+ row[scd_effective_date_field] = timestamp
+ row[scd_end_date_field] = '9999-12-31 00:00:00'
+ elsif scd_type == 1
+ ETL::Engine.logger.debug "type 1 SCD"
+ else
+ ETL::Engine.logger.debug "SCD not specified"
+ end
+
+ ETL::Engine.logger.debug "writing new record"
+ buffer << row
+ else
+ ETL::Engine.logger.debug "CRC matches, skipping"
+
+ q = "SELECT * FROM #{dimension_table} WHERE "
+ q << natural_key.collect { |nk| "#{nk} = '#{row[nk]}'" }.join(" AND ")
+ result = ActiveRecord::Base.connection.select_one(q)
+ if result
+ buffer << ETL::Row[result.symbolize_keys!]
+ else
+ # The record never made it into the database, so add the effective and end date
+ # and add it into the bulk load file
+ row[scd_effective_date_field] = timestamp
+ row[scd_end_date_field] = '9999-12-31 00:00:00'
+ buffer << row
+ end
+ end
+ else
+ ETL::Engine.logger.debug "record never loaded"
+ # Set the effective and end date fields
+ if scd_type == 2
+ row[scd_effective_date_field] = timestamp
+ row[scd_end_date_field] = '9999-12-31 00:00:00'
+ end
+
+ # Write the row
+ buffer << row
+
+ # Record the record
+ if ETL::Engine.job # only record the execution if there is a job
+ ETL::Execution::Record.create!(
+ :control_file => control.file,
+ :natural_key => nk,
+ :crc => crc,
+ :job_id => ETL::Engine.job.id
+ )
+ end
+ end
+ rescue => e
+ puts e
+ end
+
+ # Add any virtual fields to the row. Virtual rows will get their value
+ # from one of the following:
+ # * If the mapping is a Class, then an object which implements the next
+ # method
+ # * If the mapping is a Symbol, then the XGenerator where X is the
+ # classified symbol
# * If the mapping is a Proc, then it will be called with the row
# * Otherwise the value itself will be assigned to the field
def add_virtuals!(row)
if mapping[:virtual]
mapping[:virtual].each do |key,value|
@@ -142,9 +289,23 @@
row[key] = value.call(row)
else
row[key] = value
end
end
+ end
+ end
+
+ private
+ # Determine the natural key. This method will always return an array
+ # of symbols. The default value is [:id].
+ def determine_natural_key
+ case configuration[:natural_key]
+ when Array
+ configuration[:natural_key].collect(&:to_sym)
+ when String, Symbol
+ [configuration[:natural_key].to_sym]
+ else
+ [:id]
end
end
end
end
end
\ No newline at end of file