module ETL #:nodoc: module Processor #:nodoc: # Processor which is used to bulk import data into a target database. The # underlying database driver from ActiveRecord must support the methods # +bulk_load+ method. class BulkImportProcessor < ETL::Processor::Processor # The file to load from attr_reader :file # The target database attr_reader :target # The table name attr_reader :table # Set to true to truncate attr_reader :truncate # Array of symbols representing the column load order attr_reader :columns # The field separator (defaults to a comma) attr_accessor :field_separator # The field enclosure (defaults to nil) attr_accessor :field_enclosure # The line separator (defaults to a newline) attr_accessor :line_separator # Initialize the processor. # # Configuration options: # * :file: The file to load data from # * :target: The target database # * :table: The table name # * :truncate: Set to true to truncate before loading # * :columns: The columns to load in the order they appear in # the bulk data file # * :field_separator: The field separator. Defaults to a comma # * :line_separator: The line separator. Defaults to a newline # * :field_enclosure: The field enclosure charcaters def initialize(control, configuration) super @file = File.join(File.dirname(control.file), configuration[:file]) @target = configuration[:target] @table = configuration[:table] @truncate = configuration[:truncate] ||= false @columns = configuration[:columns] @field_separator = (configuration[:field_separator] || ',') @line_separator = (configuration[:line_separator] || "\n") @field_enclosure = configuration[:field_enclosure] raise ControlError, "Target must be specified" unless @target raise ControlError, "Table must be specified" unless @table end # Execute the processor def process return if ETL::Engine.skip_bulk_import return if File.size(file) == 0 conn = ETL::Engine.connection(target) conn.transaction do conn.truncate(table_name) if truncate options = {} options[:columns] = columns if field_separator || field_enclosure options[:fields] = {} options[:fields][:delimited_by] = field_separator if field_separator options[:fields][:enclosed_by] = field_enclosure if field_enclosure options[:fields][:terminated_by] = line_separator if line_separator end conn.bulk_load(file, table_name, options) end end def table_name ETL::Engine.table(table, ETL::Engine.connection(target)) end end end end