module ETL #:nodoc:
module Control #:nodoc:
# Base class for destinations.
class Destination
# Read-only accessor for the ETL::Control::Control instance
attr_reader :control
# Read-only accessor for the configuration Hash
attr_reader :configuration
# Read-only accessor for the destination mapping Hash
attr_reader :mapping
# Accessor to the buffer size
attr_accessor :buffer_size
# Unique flag.
attr_accessor :unique
# A condition for writing
attr_accessor :condition
# An array of rows to append to the destination
attr_accessor :append_rows
class << self
# Get the destination class for the specified name.
#
# For example if name is :database or 'database' then the
# DatabaseDestination class is returned
def class_for_name(name)
ETL::Control.const_get("#{name.to_s.classify}Destination")
end
end
# Initialize the destination
#
# Arguments:
# * control: The ETL::Control::Control instance
# * configuration: The configuration Hash
# * mapping: The mapping Hash
#
# Options:
# * :buffer_size: The output buffer size (default 1000 records)
# * :condition: A conditional proc that must return true for the
# row to be written
# * :append_rows: An array of rows to append
def initialize(control, configuration, mapping)
@control = control
@configuration = configuration
@mapping = mapping
@buffer_size = configuration[:buffer_size] ||= 100
@condition = configuration[:condition]
@append_rows = configuration[:append_rows]
end
# Get the current row number
def current_row
@current_row ||= 1
end
# Write the given row
def write(row)
if @condition.nil? || @condition.call(row)
process_change(row)
end
flush if buffer.length >= buffer_size
end
# Abstract method
def flush
raise NotImplementedError, "flush method must be implemented by subclasses"
end
# Abstract method
def close
raise NotImplementedError, "close method must be implemented by subclasses"
end
def errors
@errors ||= []
end
protected
# Access the buffer
def buffer
@buffer ||= []
end
# Access the generators map
def generators
@generators ||= {}
end
# Get the order of elements from the source order
def order_from_source
order = []
control.sources.first.definition.each do |item|
case item
when Hash
order << item[:name]
else
order << item
end
end
order
end
# Return true if the row is allowed. The row will not be allowed if the
# :unique option is specified in the configuration and the compound key
# already exists
def row_allowed?(row)
if unique
key = (unique.collect { |k| row[k] }).join('|')
return false if compound_key_constraints[key]
compound_key_constraints[key] = 1
end
return true
end
# Get a hash of compound key contraints. This is used to determine if a
# row can be written when the unique option is specified
def compound_key_constraints
@compound_key_constraints ||= {}
end
# Return fields which are Slowly Changing Dimension fields. Return nil
# by default.
def scd_fields
@scd_fields ||= configuration[:scd_fields]
end
def scd?
!configuration[:scd].nil?
end
def scd_type
scd? ? configuration[:scd][:type] : nil
end
# Get the Slowly Changing Dimension effective date field. Defaults to
# 'effective_date'.
def scd_effective_date_field
configuration[:scd][:effective_date_field] || :effective_date if scd?
end
# Get the Slowly Changing Dimension end date field. Defaults to
# 'end_date'.
def scd_end_date_field
configuration[:scd][:end_date_field] || :end_date if scd?
end
# Return the natural key field name, defaults to :id
def natural_key
@natural_key ||= determine_natural_key
end
# Get the dimension table if specified
def dimension_table
ETL::Engine.table(configuration[:scd][:dimension_table], dimension_target) if scd?
end
# Get the dimension target if specified
def dimension_target
configuration[:scd][:dimension_target] if scd?
end
# Process a row to determine the change type
def process_change(row)
ETL::Engine.logger.debug "Processing row: #{row.inspect}"
return unless row
# Change processing can only occur if the natural key exists in the row
ETL::Engine.logger.debug "Checking for natural key existence"
if natural_key.length == 0
buffer << row
return
end
natural_key.each do |key|
unless row.has_key?(key)
buffer << row
return
end
end
ETL::Engine.logger.debug "Checking for SCD fields"
s = String.new
if scd_fields
scd_fields.each { |f| s << row[f].to_s }
else
row.each { |key,value| s << value.to_s }
end
# apply the CRC to 's' and see if it matches the last
# ETL::Execution::Record with the samenatural key. If they match then
# throw away this row (no need to process). If they do not match then
# the record is an 'update'. If the record doesn't exist then it is an
# 'insert'
nk = natural_key.collect{|k|row[k]}.join('|')
require 'zlib'
crc = Zlib.crc32(s)
record = ETL::Execution::Record.find_by_control_file_and_natural_key(control.file, nk)
timestamp = Time.now
ETL::Engine.logger.debug "Checking record change type"
if record
if record.crc != crc.to_s
# SCD Type 1: only the new row should be added
# SCD Type 2: both an old and new row should be added
# SCD Type 3: not supported
ETL::Engine.logger.debug "CRC does not match"
if scd_type == 2
ETL::Engine.logger.debug "type 2 SCD"
raise ConfigurationError, "dimension_table setting required" unless dimension_table
raise ConfigurationError, "dimension_target setting required" unless dimension_target
conn = ETL::Engine.connection(dimension_target)
q = "SELECT * FROM #{dimension_table} WHERE "
q << natural_key.collect { |nk| "#{nk} = '#{row[nk]}'" }.join(" AND ")
#puts "looking for original record"
result = conn.select_one(q)
if result
#puts "Result: #{result.inspect}"
original_record = ETL::Row[result.symbolize_keys!]
original_record[scd_end_date_field] = timestamp
ETL::Engine.logger.debug "writing original record"
# if there is no truncate then the row will exist twice in the database
# need to figure out how to delete that old record before inserting the
# updated version of the record
q = "DELETE FROM #{dimension_table} WHERE "
q << natural_key.collect { |nk| "#{nk} = '#{row[nk]}'" }.join(" AND ")
num_rows_affected = conn.delete(q)
ETL::Engine.logger.debug "deleted old row"
# do this?
#raise "Should have deleted a single record" if num_rows_affected != 1
buffer << original_record
end
row[scd_effective_date_field] = timestamp
row[scd_end_date_field] = '9999-12-31 00:00:00'
elsif scd_type == 1
ETL::Engine.logger.debug "type 1 SCD"
else
ETL::Engine.logger.debug "SCD not specified"
end
ETL::Engine.logger.debug "writing new record"
buffer << row
else
ETL::Engine.logger.debug "CRC matches, skipping"
raise ConfigurationError, "dimension_table setting required" unless dimension_table
raise ConfigurationError, "dimension_target setting required" unless dimension_target
conn = ETL::Engine.connection(dimension_target)
q = "SELECT * FROM #{dimension_table} WHERE "
q << natural_key.collect { |nk| "#{nk} = '#{row[nk]}'" }.join(" AND ")
result = conn.select_one(q)
if result
# This was necessary when truncating and then loading, however I
# am getting reluctant to having the ETL process do the truncation
# as part of the bulk load, favoring using a preprocessor instead.
# buffer << ETL::Row[result.symbolize_keys!]
else
# The record never made it into the database, so add the effective and end date
# and add it into the bulk load file
row[scd_effective_date_field] = timestamp
row[scd_end_date_field] = '9999-12-31 00:00:00'
buffer << row
end
end
else
ETL::Engine.logger.debug "record never loaded"
# Set the effective and end date fields
if scd_type == 2
row[scd_effective_date_field] = timestamp
row[scd_end_date_field] = '9999-12-31 00:00:00'
end
# Write the row
buffer << row
# Record the record
if ETL::Engine.job # only record the execution if there is a job
ETL::Execution::Record.time_spent += Benchmark.realtime do
ETL::Execution::Record.create!(
:control_file => control.file,
:natural_key => nk,
:crc => crc,
:job_id => ETL::Engine.job.id
)
end
end
end
end
# Add any virtual fields to the row. Virtual rows will get their value
# from one of the following:
# * If the mapping is a Class, then an object which implements the next
# method
# * If the mapping is a Symbol, then the XGenerator where X is the
# classified symbol
# * If the mapping is a Proc, then it will be called with the row
# * Otherwise the value itself will be assigned to the field
def add_virtuals!(row)
if mapping[:virtual]
mapping[:virtual].each do |key,value|
# Engine.logger.debug "Mapping virtual #{key}/#{value} for row #{row}"
case value
when Class
generator = generators[key] ||= value.new
row[key] = generator.next
when Symbol
generator = generators[key] ||= ETL::Generator::Generator.class_for_name(value).new(options)
row[key] = generator.next
when Proc
row[key] = value.call(row)
else
if value.is_a?(ETL::Generator::Generator)
row[key] = value.next
else
row[key] = value
end
end
end
end
end
private
# Determine the natural key. This method will always return an array
# of symbols. The default value is [:id].
def determine_natural_key
case configuration[:natural_key]
when Array
configuration[:natural_key].collect(&:to_sym)
when String, Symbol
[configuration[:natural_key].to_sym]
else
[] # no natural key defined
end
end
end
end
end
Dir[File.dirname(__FILE__) + "/destination/*.rb"].each { |file| require(file) }