module ETL #:nodoc: module Control #:nodoc: # Base class for destinations. class Destination # Read-only accessor for the ETL::Control::Control instance attr_reader :control # Read-only accessor for the configuration Hash attr_reader :configuration # Read-only accessor for the destination mapping Hash attr_reader :mapping # Accessor to the buffer size attr_accessor :buffer_size # Unique flag. attr_accessor :unique # A condition for writing attr_accessor :condition # An array of rows to append to the destination attr_accessor :append_rows class << self # Get the destination class for the specified name. # # For example if name is :database or 'database' then the # DatabaseDestination class is returned def class_for_name(name) ETL::Control.const_get("#{name.to_s.camelize}Destination") end end # Initialize the destination # # Arguments: # * control: The ETL::Control::Control instance # * configuration: The configuration Hash # * mapping: The mapping Hash # # Options: # * :buffer_size: The output buffer size (default 1000 records) # * :condition: A conditional proc that must return true for the # row to be written # * :append_rows: An array of rows to append def initialize(control, configuration, mapping) @control = control @configuration = configuration @mapping = mapping @buffer_size = configuration[:buffer_size] ||= 100 @condition = configuration[:condition] @append_rows = configuration[:append_rows] end # Get the current row number def current_row @current_row ||= 1 end # Write the given row def write(row) if @condition.nil? || @condition.call(row) process_change(row) end flush if buffer.length >= buffer_size end # Abstract method def flush raise NotImplementedError, "flush method must be implemented by subclasses" end # Abstract method def close raise NotImplementedError, "close method must be implemented by subclasses" end def errors @errors ||= [] end protected # Access the buffer def buffer @buffer ||= [] end # Access the generators map def generators @generators ||= {} end # Get the order of elements from the source order def order_from_source order = [] control.sources.first.definition.each do |item| case item when Hash order << item[:name] else order << item end end order end # Return true if the row is allowed. The row will not be allowed if the # :unique option is specified in the configuration and the compound key # already exists def row_allowed?(row) if unique key = (unique.collect { |k| row[k] }).join('|') return false if compound_key_constraints[key] compound_key_constraints[key] = 1 end return true end # Get a hash of compound key contraints. This is used to determine if a # row can be written when the unique option is specified def compound_key_constraints @compound_key_constraints ||= {} end # Return fields which are Slowly Changing Dimension fields. # Uses the scd_fields specified in the configuration. If that's # missing, uses all of the row's fields. def scd_fields(row) @scd_fields ||= configuration[:scd_fields] || row.keys ETL::Engine.logger.debug "@scd_fields is: #{@scd_fields.inspect}" @scd_fields end # returns the fields that are required to identify an SCD def scd_required_fields if scd? [scd_effective_date_field, scd_end_date_field, scd_latest_version_field] else [] end end def non_scd_fields(row) @non_scd_fields ||= row.keys - natural_key - scd_fields(row) - [primary_key] - scd_required_fields ETL::Engine.logger.debug "@non_scd_fields is: #{@non_scd_fields.inspect}" @non_scd_fields end def non_evolving_fields (Array(configuration[:scd][:non_evolving_fields]) << primary_key).uniq end def scd? !configuration[:scd].nil? end def scd_type scd? ? configuration[:scd][:type] : nil end # Get the Slowly Changing Dimension effective date field. Defaults to # 'effective_date'. def scd_effective_date_field configuration[:scd][:effective_date_field] || :effective_date if scd? end # Get the Slowly Changing Dimension end date field. Defaults to # 'end_date'. def scd_end_date_field configuration[:scd][:end_date_field] || :end_date if scd? end # Get the Slowly Changing Dimension latest version field. Defaults to # 'latest_version'. def scd_latest_version_field configuration[:scd][:latest_version_field] || :latest_version if scd? end # Return the natural key field names, defaults to [] def natural_key @natural_key ||= determine_natural_key end # Get the dimension table if specified def dimension_table @dimension_table ||= if scd? ETL::Engine.table(configuration[:scd][:dimension_table], dimension_target) or raise ConfigurationError, "dimension_table setting required" end end # Get the dimension target if specified def dimension_target @dimension_target ||= if scd? configuration[:scd][:dimension_target] or raise ConfigurationError, "dimension_target setting required" end end # Process a row to determine the change type def process_change(row) ETL::Engine.logger.debug "Processing row: #{row.inspect}" return unless row # Change processing can only occur if the natural key exists in the row ETL::Engine.logger.debug "Checking for natural key existence" unless has_natural_key?(row) buffer << row return end @timestamp = case configuration[:scd][:timestamp] when Time, Date then configuration[:scd][:timestamp] when Symbol then row[configuration[:scd][:timestamp]] when nil then Time.now else raise "Unknown timestamp: #{configuration[:scd][:timestamp].inspect}. Use Time or Date for a specific time, a symbol for a value from each row, or nil for the current time" end # See if the scd_fields of the current record have changed # from the last time this record was loaded into the data # warehouse. If they match then throw away this row (no need # to process). If they do not match then the record is an # 'update'. If the record doesn't exist then it is an 'insert' ETL::Engine.logger.debug "Checking record for SCD change" if @existing_row = preexisting_row(row) if has_scd_field_changes?(row) process_scd_change(row) else process_scd_match(row) end else schedule_new_record(row) end end # Add any virtual fields to the row. Virtual rows will get their value # from one of the following: # * If the mapping is a Class, then an object which implements the next # method # * If the mapping is a Symbol, then the XGenerator where X is the # classified symbol # * If the mapping is a Proc, then it will be called with the row # * Otherwise the value itself will be assigned to the field def add_virtuals!(row) if mapping[:virtual] mapping[:virtual].each do |key,value| # If the row already has the virtual set, assume that's correct next if row[key] # Engine.logger.debug "Mapping virtual #{key}/#{value} for row #{row}" case value when Class generator = generators[key] ||= value.new row[key] = generator.next when Symbol generator = generators[key] ||= ETL::Generator::Generator.class_for_name(value).new(options) row[key] = generator.next when Proc row[key] = value.call(row) else if value.is_a?(ETL::Generator::Generator) row[key] = value.next else row[key] = value end end end end end private # Determine the natural key. This method will always return an array # of symbols. The default value is []. def determine_natural_key Array(configuration[:natural_key]).collect(&:to_sym) end # Check whether a natural key has been defined, and if so, whether # this row has enough information to do searches based on that natural # key. # # TODO: This should be factored out into # ETL::Row#has_all_fields?(field_array) But that's not possible # until *all* sources cast to ETL::Row, instead of sometimes # using Hash def has_natural_key?(row) natural_key.any? && natural_key.all? { |key| row.has_key?(key) } end # Helper for generating the SQL where clause that allows searching # by a natural key def natural_key_equality_for_row(row) statement = [] values = [] natural_key.each do |nk| if row[nk].nil? statement << "#{nk} IS NULL" else statement << "#{nk} = ?" values << row[nk] end end statement = statement.join(" AND ") x=ETL::Execution::Base.send(:sanitize_sql_array, [statement, *values]) return x end # Do all the steps required when a SCD *has* changed. Exact steps # depend on what type of SCD we're handling. def process_scd_change(row) ETL::Engine.logger.debug "SCD fields do not match" if scd_type == 2 # SCD Type 2: new row should be added and old row should be updated ETL::Engine.logger.debug "type 2 SCD" # To update the old row, we delete the version in the database # and insert a new expired version # If there is no truncate then the row will exist twice in the database delete_outdated_record ETL::Engine.logger.debug "expiring original record" @existing_row[scd_end_date_field] = @timestamp @existing_row[scd_latest_version_field] = false if configuration[:scd][:merge_nils] scd_fields(row).each do |f| row[f] ||= @existing_row[f] end end buffer << @existing_row elsif scd_type == 1 # SCD Type 1: only the new row should be added ETL::Engine.logger.debug "type 1 SCD" # Copy primary key, and other non-evolving fields over from # original version of record non_evolving_fields.each do |non_evolving_field| row[non_evolving_field] = @existing_row[non_evolving_field] end # If there is no truncate then the row will exist twice in the database delete_outdated_record else # SCD Type 3: not supported ETL::Engine.logger.debug "SCD type #{scd_type} not supported" end # In all cases, the latest, greatest version of the record # should go into the load schedule_new_record(row) end # Do all the steps required when a SCD has *not* changed. Exact # steps depend on what type of SCD we're handling. def process_scd_match(row) ETL::Engine.logger.debug "SCD fields match" if scd_type == 2 && has_non_scd_field_changes?(row) ETL::Engine.logger.debug "Non-SCD field changes" # Copy important data over from original version of record row[primary_key] = @existing_row[primary_key] row[scd_end_date_field] = @existing_row[scd_end_date_field] row[scd_effective_date_field] = @existing_row[scd_effective_date_field] row[scd_latest_version_field] = @existing_row[scd_latest_version_field] # If there is no truncate then the row will exist twice in the database delete_outdated_record buffer << row else # The record is totally the same, so skip it end end # Find the version of this row that already exists in the datawarehouse. def preexisting_row(row) q = "SELECT * FROM #{dimension_table} WHERE #{natural_key_equality_for_row(row)}" q << " AND #{scd_latest_version_field}" if scd_type == 2 ETL::Engine.logger.debug "looking for original record" result = connection.select_one(q) ETL::Engine.logger.debug "Result: #{result.inspect}" result ? ETL::Row[result.symbolize_keys!] : nil end # Check whether non-scd fields have changed since the last # load of this record. def has_scd_field_changes?(row) fields = scd_fields(row) ETL::Engine.logger.debug " Row: %s" % row.slice(*fields).inspect ETL::Engine.logger.debug "Existing Row: %s" % @existing_row.slice(*fields).inspect fields.any? { |csd_field| mismatch = configuration[:scd][:merge_nils] ? !row[csd_field].nil? : true mismatch = mismatch && (row[csd_field].to_s != @existing_row[csd_field].to_s) ETL::Engine.logger.debug "#{csd_field}: " + (mismatch ? row[csd_field].to_s + " != " + @existing_row[csd_field].to_s : @existing_row[csd_field].to_s) mismatch } end # Check whether non-scd fields have changed since the last # load of this record. def has_non_scd_field_changes?(row) non_scd_fields(row).any? { |non_csd_field| row[non_csd_field].to_s != @existing_row[non_csd_field].to_s } end # Grab, or re-use, a database connection for running queries directly # during the destination processing. def connection @conn ||= ETL::Engine.connection(dimension_target) end # Utility for removing a row that has outdated information. Note # that this deletes directly from the database, even if this is a file # destination. It needs to do this because you can't do deletes in a # bulk load. def delete_outdated_record ETL::Engine.logger.debug "deleting old row" q = "DELETE FROM #{dimension_table} WHERE #{primary_key} = #{@existing_row[primary_key]}" connection.delete(q) end # Schedule the latest, greatest version of the row for insertion # into the database def schedule_new_record(row) ETL::Engine.logger.debug "writing new record" if scd_type == 2 row[scd_effective_date_field] = @timestamp row[scd_end_date_field] = '9999-12-31 00:00:00' row[scd_latest_version_field] = true end buffer << row end # Get the name of the primary key for this table. Asks the dimension # model class for this information, but if that class hasn't been # defined, just defaults to :id. def primary_key return @primary_key if @primary_key @primary_key = dimension_table.to_s.camelize.constantize.primary_key.to_sym rescue NameError => e ETL::Engine.logger.debug "couldn't get primary_key from dimension model class, using default :id" @primary_key = :id end end end end Dir[File.dirname(__FILE__) + "/destination/*.rb"].each { |file| require(file) }