lib/etl/control/destination.rb in activewarehouse-etl-0.7.2 vs lib/etl/control/destination.rb in activewarehouse-etl-0.8.0
- old
+ new
@@ -160,22 +160,28 @@
configuration[:scd][:dimension_table] if scd?
end
# Process a row to determine the change type
def process_change(row)
+ ETL::Engine.logger.debug "Processing row: #{row.inspect}"
return unless row
# Change processing can only occur if the natural key exists in the row
- supports_change = true
+ ETL::Engine.logger.debug "Checking for natural key existence"
+ if natural_key.length == 0
+ buffer << row
+ return
+ end
+
natural_key.each do |key|
unless row.has_key?(key)
buffer << row
return
end
end
- ETL::Engine.logger.debug "checking scd fields"
+ ETL::Engine.logger.debug "Checking for SCD fields"
s = String.new
if scd_fields
scd_fields.each { |f| s << row[f].to_s }
else
row.each { |key,value| s << value.to_s }
@@ -191,11 +197,11 @@
crc = Zlib.crc32(s)
record = ETL::Execution::Record.find_by_control_file_and_natural_key(control.file, nk)
timestamp = Time.now
- ETL::Engine.logger.debug "checking record change type"
+ ETL::Engine.logger.debug "Checking record change type"
if record
if record.crc != crc.to_s
# SCD Type 1: only the new row should be added
# SCD Type 2: both an old and new row should be added
# SCD Type 3: not supported
@@ -204,11 +210,11 @@
if scd_type == 2
ETL::Engine.logger.debug "type 2 SCD"
q = "SELECT * FROM #{dimension_table} WHERE "
q << natural_key.collect { |nk| "#{nk} = '#{row[nk]}'" }.join(" AND ")
#puts "looking for original record"
- result = ActiveRecord::Base.connection.select_one(q)
+ result = ETL::ActiveRecord::Base.connection.select_one(q)
if result
#puts "Result: #{result.inspect}"
original_record = ETL::Row[result.symbolize_keys!]
original_record[scd_end_date_field] = timestamp
ETL::Engine.logger.debug "writing original record"
@@ -228,13 +234,16 @@
else
ETL::Engine.logger.debug "CRC matches, skipping"
q = "SELECT * FROM #{dimension_table} WHERE "
q << natural_key.collect { |nk| "#{nk} = '#{row[nk]}'" }.join(" AND ")
- result = ActiveRecord::Base.connection.select_one(q)
+ result = ETL::ActiveRecord::Base.connection.select_one(q)
if result
- buffer << ETL::Row[result.symbolize_keys!]
+ # This was necessary when truncating and then loading, however I
+ # am getting reluctant to having the ETL process do the truncation
+ # as part of the bulk load, favoring using a preprocessor instead.
+ # buffer << ETL::Row[result.symbolize_keys!]
else
# The record never made it into the database, so add the effective and end date
# and add it into the bulk load file
row[scd_effective_date_field] = timestamp
row[scd_end_date_field] = '9999-12-31 00:00:00'
@@ -260,12 +269,10 @@
:crc => crc,
:job_id => ETL::Engine.job.id
)
end
end
- rescue => e
- puts e
end
# Add any virtual fields to the row. Virtual rows will get their value
# from one of the following:
# * If the mapping is a Class, then an object which implements the next
@@ -286,11 +293,15 @@
generator = generators[key] ||= ETL::Generator::Generator.class_for_name(value).new
row[key] = generator.next
when Proc
row[key] = value.call(row)
else
- row[key] = value
+ if value.is_a?(ETL::Generator::Generator)
+ row[key] = value.next
+ else
+ row[key] = value
+ end
end
end
end
end
@@ -302,10 +313,10 @@
when Array
configuration[:natural_key].collect(&:to_sym)
when String, Symbol
[configuration[:natural_key].to_sym]
else
- [:id]
+ [] # no natural key defined
end
end
end
end
end
\ No newline at end of file