module ETL #:nodoc: module ActiveRecord #:nodoc: # Base class which is used for ActiveRecord connections. This is necessary # since AR connections are tied to the class, and using ActiveRecord::Base # directly can cause problems if the connection is closed. class Base < ::ActiveRecord::Base end end # The main ETL engine clas class Engine class << self # Process the specified control file. Acceptable values for control_file are # * Path to a file # * File object # * ETL::Control::Control instance def process(control_file) new().process(control_file) end # A logger for the engine attr_accessor :logger def logger #:nodoc: unless @logger @logger = Logger.new('etl.log') @logger.level = Logger::WARN end @logger end attr_accessor :current_source attr_accessor :current_source_row attr_accessor :current_destination attr_accessor :realtime_activity end def say(message) say_without_newline(message + "\n") end def say_without_newline(message) if Engine.realtime_activity $stdout.print message $stdout.flush end end def say_on_own_line(message) say("\n" + message) end # Process a control file or object. Acceptable values for control are: # * Path to a file # * File object # * ETL::Control::Control instance def process(control) start_time = Time.now control = ETL::Control::Control.resolve(control) Engine.logger.debug "Pre-processing #{control.file}" pre_process(control) Engine.logger.debug "Pre-processing complete" sources = control.sources destinations = control.destinations sources.each do |source| Engine.current_source = source Engine.logger.debug "Processing source #{source}" say "Source: #{source}" source.each_with_index do |row, index| Engine.current_source_row = index + 1 if Engine.realtime_activity && index % 1000 == 0 say_without_newline "." end begin # execute transforms row.each do |name, value| row[name] = ETL::Transform::Transform.transform(name, value, control.transform(name)) end rescue => e msg = "Error transforming from #{source} on line #{index}: #{e}" source.errors << msg Engine.logger.error msg end begin # write the row to the destination destinations.each do |destination| Engine.current_destination = destination destination.write(row) end rescue msg = "Error writing to #{destination} on line #{index}" destination.errors << msg Engine.logger.error msg end end say_on_own_line "Processed #{Engine.current_source_row} rows in #{distance_of_time_in_words(start_time)}" destinations.each do |destination| destination.close end end Engine.logger.debug "Post-processing #{control.file}" post_process(control) Engine.logger.debug "Post-processing complete" end private # Execute all preprocessors def pre_process(control) control.pre_processors.each do |processor| processor.process end end # Execute all postprocessors def post_process(control) control.post_processors.each do |processor| processor.process end end # Return the distance of time in words from the given from_time to the specified to_time. If to_time # is not specified then Time.now is used. By default seconds are included...set the include_seconds # argument to false to disable the seconds. def distance_of_time_in_words(from_time, to_time=Time.now) from_time = from_time.to_time if from_time.respond_to?(:to_time) to_time = to_time.to_time if to_time.respond_to?(:to_time) seconds = (to_time - from_time).round distance_in_days = (seconds/(60*60*24)).round seconds = seconds % (60*60*24) distance_in_hours = (seconds/(60*60)).round seconds = seconds % (60*60) distance_in_minutes = (seconds/60).round seconds = seconds % 60 distance_in_seconds = seconds s = '' s << "#{distance_in_days} days," if distance_in_days > 0 s << "#{distance_in_hours} hours, " if distance_in_hours > 0 s << "#{distance_in_minutes} minutes, " if distance_in_minutes > 0 s << "#{distance_in_seconds} seconds" s end def approximate_distance_of_time_in_words(from_time, to_time=Time.now, include_seconds=true) from_time = from_time.to_time if from_time.respond_to?(:to_time) to_time = to_time.to_time if to_time.respond_to?(:to_time) distance_in_minutes = (((to_time - from_time).abs)/60).round distance_in_seconds = ((to_time - from_time).abs).round case distance_in_minutes when 0..1 return (distance_in_minutes == 0) ? 'less than a minute' : '1 minute' unless include_seconds case distance_in_seconds when 0..4 then 'less than 5 seconds' when 5..9 then 'less than 10 seconds' when 10..19 then 'less than 20 seconds' when 20..39 then 'half a minute' when 40..59 then 'less than a minute' else '1 minute' end when 2..44 then "#{distance_in_minutes} minutes" when 45..89 then 'about 1 hour' when 90..1439 then "about #{(distance_in_minutes.to_f / 60.0).round} hours" when 1440..2879 then '1 day' when 2880..43199 then "#{(distance_in_minutes / 1440).round} days" when 43200..86399 then 'about 1 month' when 86400..525959 then "#{(distance_in_minutes / 43200).round} months" when 525960..1051919 then 'about 1 year' else "over #{(distance_in_minutes / 525960).round} years" end end end end