module ETL #:nodoc: module Control #:nodoc: # Base class for destinations. class Destination # Read-only accessor for the ETL::Control::Control instance attr_reader :control # Read-only accessor for the configuration Hash attr_reader :configuration # Read-only accessor for the destination mapping Hash attr_reader :mapping # Accessor to the buffer size attr_accessor :buffer_size # Unique flag. attr_accessor :unique class << self # Get the destination class for the specified name. # # For example if name is :database or 'database' then the DatabaseDestination class # is returned def class_for_name(name) ETL::Control.const_get("#{name.to_s.classify}Destination") end end # Initialize the destination # # Arguments: # * control: The ETL::Control::Control instance # * configuration: The configuration Hash # * mapping: The mapping Hash # # Options: # * :buffer_size: The output buffer size (default 1000 records) def initialize(control, configuration, mapping) @control = control @configuration = configuration @mapping = mapping @buffer_size = configuration[:buffer_size] ||= 1000 end # Get the current row number def current_row @current_row ||= 1 end # Write the given row def write(row) buffer << row flush if buffer.length >= buffer_size end # Abstract method def flush raise NotImplementedError, "flush method must be implemented by subclasses" end # Abstract method def close raise NotImplementedError, "close method must be implemented by subclasses" end def errors @errors ||= [] end protected # Access the buffer def buffer @buffer ||= [] end # Access the generators map def generators @generators ||= {} end # Get the order of elements from the source order def order_from_source order = [] control.sources.first.definition.each do |item| case item when Hash order << item[:name] else order << item end end order end # Return true if the row is allowed. The row will not be allowed if the :unique option is specified # in the configuration and the compound key already exists def row_allowed?(row) if unique key = (unique.collect { |k| row[k] }).join('|') return false if compound_key_constraints[key] compound_key_constraints[key] = 1 end return true end # Get a hash of compound key contraints. This is used to determine if a row can be written when the # unique option is specified def compound_key_constraints @compound_key_constraints ||= {} end # Add any virtual fields to the row. Virtual rows will get their value from one of the following: # * If the mapping is a Class, then an object which implements the next method # * If the mapping is a Symbol, then the XGenerator where X is the classified symbol # * If the mapping is a Proc, then it will be called with the row # * Otherwise the value itself will be assigned to the field def add_virtuals!(row) if mapping[:virtual] mapping[:virtual].each do |key,value| # Engine.logger.debug "Mapping virtual #{key}/#{value} for row #{row}" case value when Class generator = generators[key] ||= value.new row[key] = generator.next when Symbol generator = generators[key] ||= ETL::Generator::Generator.class_for_name(value).new row[key] = generator.next when Proc row[key] = value.call(row) else row[key] = value end end end end end end end Dir[File.dirname(__FILE__) + "/destination/*.rb"].each { |file| require(file) }