module ETL #:nodoc: module Control #:nodoc: class Destination attr_reader :control, :configuration, :mapping attr_accessor :buffer_size, :current_row, :unique class << self def class_for_name(name) ETL::Control.const_get("#{name.to_s.classify}Destination") end end def initialize(control, configuration, mapping) @control = control @configuration = configuration @mapping = mapping @buffer_size = configuration[:buffer_size] ||= 1000 end def current_row @current_row ||= 1 end def write(row) buffer << row flush if buffer.length >= buffer_size end # Abstract method def flush raise NotImplementedError, "flush method must be implemented by subclasses" end # Abstract method def close raise NotImplementedError, "close method must be implemented by subclasses" end protected def buffer @buffer ||= [] end def generators @generators ||= {} end # Get the order of elements from the source order def order_from_source order = [] control.sources.first.definition.each do |item| case item when Hash order << item[:name] else order << item end end order end # Return true if the row is allowed. The row will not be allowed if the :unique option is specified # in the configuration and the compound key already exists def row_allowed?(row) if unique key = (unique.collect { |k| row[k] }).join('|') return false if compound_key_constraints[key] compound_key_constraints[key] = 1 end return true end # Get a hash of compound key contraints. This is used to determine if a row can be written when the # unique option is specified def compound_key_constraints @compound_key_constraints ||= {} end # Add any virtual fields to the row def add_virtuals!(row) if mapping[:virtual] mapping[:virtual].each do |key,value| # Engine.logger.debug "Mapping virtual #{key}/#{value} for row #{row}" case value when Symbol generators[key] ||= ETL::Generator::Generator.class_for_name(value).new row[key] = generators[key].next when Proc row[key] = value.call(row) else row[key] = value end end end end end end end Dir[File.dirname(__FILE__) + "/destination/*.rb"].each { |file| require(file) }