# This source file contains the ETL::Control::FileDestination module ETL #:nodoc: module Control #:nodoc: # File as the final destination. class FileDestination < Destination # The File to write to attr_reader :file # The output order attr_reader :order # Flag which indicates to append (default is to overwrite) attr_accessor :append # The separator attr_accessor :separator # The end of line marker attr_accessor :eol # The enclosure character attr_accessor :enclose # Initialize the object. # * control: The Control object # * configuration: The configuration map # * mapping: The output mapping # # Configuration options: # * :file: The file to write to (REQUIRED) # * :append: Set to true to append to the file (default is to overwrite) # * :separator: Record separator (default is a comma) # * :eol: End of line marker (default is \n) # * :enclose: Enclosure character (default is none) # * :unique: Set to true to only write unique records # * :append_rows: Array of rows to append # # Mapping options: # * :order: The order array def initialize(control, configuration, mapping={}) super path = Pathname.new(configuration[:file]) @file = path.absolute? ? path : Pathname.new(File.dirname(File.expand_path(control.file))) + path @append = configuration[:append] ||= false @separator = configuration[:separator] ||= ',' @eol = configuration[:eol] ||= "\n" @enclose = configuration[:enclose] @unique = configuration[:unique] ? configuration[:unique] + scd_required_fields : configuration[:unique] @unique.uniq! unless @unique.nil? @order = mapping[:order] ? mapping[:order] + scd_required_fields : order_from_source @order.uniq! unless @order.nil? raise ControlError, "Order required in mapping" unless @order end # Close the destination. This will flush the buffer and close the underlying stream or connection. def close buffer << append_rows if append_rows flush f.close end # Flush the destination buffer def flush #puts "Flushing buffer (#{file}) with #{buffer.length} rows" buffer.flatten.each do |row| #puts "row change type: #{row.change_type}" # check to see if this row's compound key constraint already exists # note that the compound key constraint may not utilize virtual fields next unless row_allowed?(row) # add any virtual fields add_virtuals!(row) # collect all of the values using the order designated in the configuration values = order.collect do |name| value = row[name] case value when Date, Time, DateTime value.to_s(:db) else value.to_s end end values.collect! { |v| v.gsub(/\\/, '\\\\\\\\')} values.collect! { |v| v.gsub(separator, "\\#{separator}")} values.collect! { |v| v.gsub(/\n|\r/, '')} # enclose the value if required if !enclose.nil? values.collect! { |v| enclose + v.gsub(/(#{enclose})/, '\\\\\1') + enclose } end # write the values joined by the separator defined in the configuration f.write(values.join(separator)) # write the end-of-line f.write(eol) end f.flush buffer.clear #puts "After flush there are #{buffer.length} rows" end private # Get the open file stream def f @f ||= open(file, mode) end def options @options ||= { :col_sep => separator, :row_sep => eol, :force_quotes => !enclose.nil? } end # Get the appropriate mode to open the file stream def mode append ? 'a' : 'w' end end end end