lib/etl/control/destination.rb in activewarehouse-etl-0.7.2 vs lib/etl/control/destination.rb in activewarehouse-etl-0.8.0

- old
+ new

@@ -160,22 +160,28 @@ configuration[:scd][:dimension_table] if scd? end # Process a row to determine the change type def process_change(row) + ETL::Engine.logger.debug "Processing row: #{row.inspect}" return unless row # Change processing can only occur if the natural key exists in the row - supports_change = true + ETL::Engine.logger.debug "Checking for natural key existence" + if natural_key.length == 0 + buffer << row + return + end + natural_key.each do |key| unless row.has_key?(key) buffer << row return end end - ETL::Engine.logger.debug "checking scd fields" + ETL::Engine.logger.debug "Checking for SCD fields" s = String.new if scd_fields scd_fields.each { |f| s << row[f].to_s } else row.each { |key,value| s << value.to_s } @@ -191,11 +197,11 @@ crc = Zlib.crc32(s) record = ETL::Execution::Record.find_by_control_file_and_natural_key(control.file, nk) timestamp = Time.now - ETL::Engine.logger.debug "checking record change type" + ETL::Engine.logger.debug "Checking record change type" if record if record.crc != crc.to_s # SCD Type 1: only the new row should be added # SCD Type 2: both an old and new row should be added # SCD Type 3: not supported @@ -204,11 +210,11 @@ if scd_type == 2 ETL::Engine.logger.debug "type 2 SCD" q = "SELECT * FROM #{dimension_table} WHERE " q << natural_key.collect { |nk| "#{nk} = '#{row[nk]}'" }.join(" AND ") #puts "looking for original record" - result = ActiveRecord::Base.connection.select_one(q) + result = ETL::ActiveRecord::Base.connection.select_one(q) if result #puts "Result: #{result.inspect}" original_record = ETL::Row[result.symbolize_keys!] original_record[scd_end_date_field] = timestamp ETL::Engine.logger.debug "writing original record" @@ -228,13 +234,16 @@ else ETL::Engine.logger.debug "CRC matches, skipping" q = "SELECT * FROM #{dimension_table} WHERE " q << natural_key.collect { |nk| "#{nk} = '#{row[nk]}'" }.join(" AND ") - result = ActiveRecord::Base.connection.select_one(q) + result = ETL::ActiveRecord::Base.connection.select_one(q) if result - buffer << ETL::Row[result.symbolize_keys!] + # This was necessary when truncating and then loading, however I + # am getting reluctant to having the ETL process do the truncation + # as part of the bulk load, favoring using a preprocessor instead. + # buffer << ETL::Row[result.symbolize_keys!] else # The record never made it into the database, so add the effective and end date # and add it into the bulk load file row[scd_effective_date_field] = timestamp row[scd_end_date_field] = '9999-12-31 00:00:00' @@ -260,12 +269,10 @@ :crc => crc, :job_id => ETL::Engine.job.id ) end end - rescue => e - puts e end # Add any virtual fields to the row. Virtual rows will get their value # from one of the following: # * If the mapping is a Class, then an object which implements the next @@ -286,11 +293,15 @@ generator = generators[key] ||= ETL::Generator::Generator.class_for_name(value).new row[key] = generator.next when Proc row[key] = value.call(row) else - row[key] = value + if value.is_a?(ETL::Generator::Generator) + row[key] = value.next + else + row[key] = value + end end end end end @@ -302,10 +313,10 @@ when Array configuration[:natural_key].collect(&:to_sym) when String, Symbol [configuration[:natural_key].to_sym] else - [:id] + [] # no natural key defined end end end end end \ No newline at end of file