module ETL #:nodoc:
module Control #:nodoc:
# Base class for destinations.
class Destination
# Read-only accessor for the ETL::Control::Control instance
attr_reader :control
# Read-only accessor for the configuration Hash
attr_reader :configuration
# Read-only accessor for the destination mapping Hash
attr_reader :mapping
# Accessor to the buffer size
attr_accessor :buffer_size
# Unique flag.
attr_accessor :unique
class << self
# Get the destination class for the specified name.
#
# For example if name is :database or 'database' then the DatabaseDestination class
# is returned
def class_for_name(name)
ETL::Control.const_get("#{name.to_s.classify}Destination")
end
end
# Initialize the destination
#
# Arguments:
# * control: The ETL::Control::Control instance
# * configuration: The configuration Hash
# * mapping: The mapping Hash
#
# Options:
# * :buffer_size: The output buffer size (default 1000 records)
def initialize(control, configuration, mapping)
@control = control
@configuration = configuration
@mapping = mapping
@buffer_size = configuration[:buffer_size] ||= 1000
end
# Get the current row number
def current_row
@current_row ||= 1
end
# Write the given row
def write(row)
buffer << row
flush if buffer.length >= buffer_size
end
# Abstract method
def flush
raise NotImplementedError, "flush method must be implemented by subclasses"
end
# Abstract method
def close
raise NotImplementedError, "close method must be implemented by subclasses"
end
def errors
@errors ||= []
end
protected
# Access the buffer
def buffer
@buffer ||= []
end
# Access the generators map
def generators
@generators ||= {}
end
# Get the order of elements from the source order
def order_from_source
order = []
control.sources.first.definition.each do |item|
case item
when Hash
order << item[:name]
else
order << item
end
end
order
end
# Return true if the row is allowed. The row will not be allowed if the :unique option is specified
# in the configuration and the compound key already exists
def row_allowed?(row)
if unique
key = (unique.collect { |k| row[k] }).join('|')
return false if compound_key_constraints[key]
compound_key_constraints[key] = 1
end
return true
end
# Get a hash of compound key contraints. This is used to determine if a row can be written when the
# unique option is specified
def compound_key_constraints
@compound_key_constraints ||= {}
end
# Add any virtual fields to the row. Virtual rows will get their value from one of the following:
# * If the mapping is a Class, then an object which implements the next method
# * If the mapping is a Symbol, then the XGenerator where X is the classified symbol
# * If the mapping is a Proc, then it will be called with the row
# * Otherwise the value itself will be assigned to the field
def add_virtuals!(row)
if mapping[:virtual]
mapping[:virtual].each do |key,value|
# Engine.logger.debug "Mapping virtual #{key}/#{value} for row #{row}"
case value
when Class
generator = generators[key] ||= value.new
row[key] = generator.next
when Symbol
generator = generators[key] ||= ETL::Generator::Generator.class_for_name(value).new
row[key] = generator.next
when Proc
row[key] = value.call(row)
else
row[key] = value
end
end
end
end
end
end
end
Dir[File.dirname(__FILE__) + "/destination/*.rb"].each { |file| require(file) }