lib/etl/control/destination.rb in activewarehouse-etl-0.6.1 vs lib/etl/control/destination.rb in activewarehouse-etl-0.7.0

- old
+ new

@@ -24,12 +24,12 @@ attr_accessor :append_rows class << self # Get the destination class for the specified name. # - # For example if name is :database or 'database' then the DatabaseDestination class - # is returned + # For example if name is :database or 'database' then the + # DatabaseDestination class is returned def class_for_name(name) ETL::Control.const_get("#{name.to_s.classify}Destination") end end @@ -40,17 +40,18 @@ # * <tt>configuration</tt>: The configuration Hash # * <tt>mapping</tt>: The mapping Hash # # Options: # * <tt>:buffer_size</tt>: The output buffer size (default 1000 records) - # * <tt>:condition</tt>: A conditional proc that must return true for the row to be written + # * <tt>:condition</tt>: A conditional proc that must return true for the + # row to be written # * <tt>:append_rows</tt>: An array of rows to append def initialize(control, configuration, mapping) @control = control @configuration = configuration @mapping = mapping - @buffer_size = configuration[:buffer_size] ||= 1000 + @buffer_size = configuration[:buffer_size] ||= 100 @condition = configuration[:condition] @append_rows = configuration[:append_rows] end # Get the current row number @@ -59,11 +60,11 @@ end # Write the given row def write(row) if @condition.nil? || @condition.call(row) - buffer << row + process_change(row) end flush if buffer.length >= buffer_size end # Abstract method @@ -103,30 +104,176 @@ end end order end - # Return true if the row is allowed. The row will not be allowed if the :unique option is specified - # in the configuration and the compound key already exists + # Return true if the row is allowed. The row will not be allowed if the + # :unique option is specified in the configuration and the compound key + # already exists def row_allowed?(row) if unique key = (unique.collect { |k| row[k] }).join('|') return false if compound_key_constraints[key] compound_key_constraints[key] = 1 end return true end - # Get a hash of compound key contraints. This is used to determine if a row can be written when the - # unique option is specified + # Get a hash of compound key contraints. This is used to determine if a + # row can be written when the unique option is specified def compound_key_constraints @compound_key_constraints ||= {} end - # Add any virtual fields to the row. Virtual rows will get their value from one of the following: - # * If the mapping is a Class, then an object which implements the next method - # * If the mapping is a Symbol, then the XGenerator where X is the classified symbol + # Return fields which are Slowly Changing Dimension fields. Return nil + # by default. + def scd_fields + @scd_fields ||= configuration[:scd_fields] + end + + def scd? + !configuration[:scd].nil? + end + + def scd_type + scd? ? configuration[:scd][:type] : nil + end + + # Get the Slowly Changing Dimension effective date field. Defaults to + # 'effective_date'. + def scd_effective_date_field + configuration[:scd][:effective_date_field] || :effective_date if scd? + end + + # Get the Slowly Changing Dimension end date field. Defaults to + # 'end_date'. + def scd_end_date_field + configuration[:scd][:end_date_field] || :end_date if scd? + end + + # Return the natural key field name, defaults to :id + def natural_key + @natural_key ||= determine_natural_key + end + + # Get the dimension table if specified + def dimension_table + configuration[:scd][:dimension_table] if scd? + end + + # Process a row to determine the change type + def process_change(row) + return unless row + + # Change processing can only occur if the natural key exists in the row + supports_change = true + natural_key.each do |key| + unless row.has_key?(key) + buffer << row + return + end + end + + ETL::Engine.logger.debug "checking scd fields" + s = String.new + if scd_fields + scd_fields.each { |f| s << row[f].to_s } + else + row.each { |key,value| s << value.to_s } + end + + # apply the CRC to 's' and see if it matches the last + # ETL::Execution::Record with the samenatural key. If they match then + # throw away this row (no need to process). If they do not match then + # the record is an 'update'. If the record doesn't exist then it is an + # 'insert' + nk = natural_key.collect{|k|row[k]}.join('|') + require 'zlib' + crc = Zlib.crc32(s) + record = ETL::Execution::Record.find_by_control_file_and_natural_key(control.file, nk) + + timestamp = Time.now + + ETL::Engine.logger.debug "checking record change type" + if record + if record.crc != crc.to_s + # SCD Type 1: only the new row should be added + # SCD Type 2: both an old and new row should be added + # SCD Type 3: not supported + ETL::Engine.logger.debug "CRC does not match" + + if scd_type == 2 + ETL::Engine.logger.debug "type 2 SCD" + q = "SELECT * FROM #{dimension_table} WHERE " + q << natural_key.collect { |nk| "#{nk} = '#{row[nk]}'" }.join(" AND ") + #puts "looking for original record" + result = ActiveRecord::Base.connection.select_one(q) + if result + #puts "Result: #{result.inspect}" + original_record = ETL::Row[result.symbolize_keys!] + original_record[scd_end_date_field] = timestamp + ETL::Engine.logger.debug "writing original record" + buffer << original_record + end + + row[scd_effective_date_field] = timestamp + row[scd_end_date_field] = '9999-12-31 00:00:00' + elsif scd_type == 1 + ETL::Engine.logger.debug "type 1 SCD" + else + ETL::Engine.logger.debug "SCD not specified" + end + + ETL::Engine.logger.debug "writing new record" + buffer << row + else + ETL::Engine.logger.debug "CRC matches, skipping" + + q = "SELECT * FROM #{dimension_table} WHERE " + q << natural_key.collect { |nk| "#{nk} = '#{row[nk]}'" }.join(" AND ") + result = ActiveRecord::Base.connection.select_one(q) + if result + buffer << ETL::Row[result.symbolize_keys!] + else + # The record never made it into the database, so add the effective and end date + # and add it into the bulk load file + row[scd_effective_date_field] = timestamp + row[scd_end_date_field] = '9999-12-31 00:00:00' + buffer << row + end + end + else + ETL::Engine.logger.debug "record never loaded" + # Set the effective and end date fields + if scd_type == 2 + row[scd_effective_date_field] = timestamp + row[scd_end_date_field] = '9999-12-31 00:00:00' + end + + # Write the row + buffer << row + + # Record the record + if ETL::Engine.job # only record the execution if there is a job + ETL::Execution::Record.create!( + :control_file => control.file, + :natural_key => nk, + :crc => crc, + :job_id => ETL::Engine.job.id + ) + end + end + rescue => e + puts e + end + + # Add any virtual fields to the row. Virtual rows will get their value + # from one of the following: + # * If the mapping is a Class, then an object which implements the next + # method + # * If the mapping is a Symbol, then the XGenerator where X is the + # classified symbol # * If the mapping is a Proc, then it will be called with the row # * Otherwise the value itself will be assigned to the field def add_virtuals!(row) if mapping[:virtual] mapping[:virtual].each do |key,value| @@ -142,9 +289,23 @@ row[key] = value.call(row) else row[key] = value end end + end + end + + private + # Determine the natural key. This method will always return an array + # of symbols. The default value is [:id]. + def determine_natural_key + case configuration[:natural_key] + when Array + configuration[:natural_key].collect(&:to_sym) + when String, Symbol + [configuration[:natural_key].to_sym] + else + [:id] end end end end end \ No newline at end of file