module ETL #:nodoc:
module Control #:nodoc:
class CsvDestination < Destination
attr_reader :file, :append, :headers, :order
# Initialize the object.
# * control: The Control object
# * configuration: The configuration map
# * mapping: The output mapping
#
# Configuration options:
# * :file: The file to write to (REQUIRED)
# * :append: Set to true to append to the file (default is to overwrite)
# * :headers: Set to true to add the headers to the output also (default is true)
#
# Mapping options:
# * :order: The order array
def initialize(control, configuration, mapping={})
super
@file = File.join(File.dirname(control.file), configuration[:file])
@append = configuration[:append] ||= false
@headers = configuration[:headers] ||= true
@order = mapping[:order] || order_from_source
raise ControlError, "Order required in mapping" unless @order
end
# Close the destination. This will flush the buffer and close the underlying stream or connection.
def close
flush
f.close
end
# Flush the destination buffer
def flush
if write_header?
f << order
end
#puts "Flushing buffer (#{file}) with #{buffer.length} rows"
buffer.flatten.each do |row|
# check to see if this row's compound key constraint already exists
# note that the compound key constraint may not utilize virtual fields
next unless row_allowed?(row)
# add any virtual fields
add_virtuals!(row)
# collect all of the values using the order designated in the configuration
values = order.collect do |name|
value = row[name]
case value
when Date, Time, DateTime
value.to_s(:db)
else
value
# value.to_s
end
end
# write the values
f << values
end
f.flush
buffer.clear
end
private
# Get the open file stream
def f
@f ||= FasterCSV.open(file, mode)
end
# Get the appropriate mode to open the file stream
def mode
append ? 'a' : 'w'
end
def write_header?
if headers
@headers = false
return true
end
return false
end
end
end
end