module ETL #:nodoc: module ActiveRecord #:nodoc: # Base class which is used for ActiveRecord connections. This is necessary # since AR connections are tied to the class, and using ActiveRecord::Base # directly can cause problems if the connection is closed. class Base < ::ActiveRecord::Base end end # The main ETL engine clas class Engine class << self # Initialization that is run when a job is executed. def init(options={}) unless @initialized @limit = options[:limit] @offset = options[:offset] @log_write_mode = 'w' if options[:newlog] @skip_bulk_import = options[:skip_bulk_import] @read_locally = options[:read_locally] options[:config] ||= 'database.yml' database_configuration = YAML::load(ERB.new(IO.read(options[:config])).result + "\n") ETL::ActiveRecord::Base.configurations = database_configuration ActiveRecord::Base.configurations.merge!(ETL::ActiveRecord::Base.configurations) require 'etl/execution' ETL::Execution::Base.establish_connection :etl_execution ETL::Execution::Execution.migrate @initialized = true end end # Process the specified control file. Acceptable values for control_file are # * Path to a file # * File object # * ETL::Control::Control instance def process(control_file) new().process(control_file) end attr_accessor :timestamped_log # Accessor for the log write mode. Default is 'a' for append. attr_accessor :log_write_mode def log_write_mode @log_write_mode ||= 'a' end # A logger for the engine attr_accessor :logger def logger #:nodoc: unless @logger if timestamped_log @logger = Logger.new("etl_#{timestamp}.log") else @logger = Logger.new(File.open('etl.log', log_write_mode)) end @logger.level = Logger::ERROR @logger.formatter = Logger::Formatter.new end @logger end # Get a timestamp value as a string def timestamp Time.now.strftime("%Y%m%d%H%M%S") end # The current source attr_accessor :current_source # The current source row attr_accessor :current_source_row # The current destination attr_accessor :current_destination # Set to true to activate realtime activity. This will cause certain # information messages to be printed to STDOUT attr_accessor :realtime_activity # Accessor for the total number of rows read from sources attr_accessor :rows_read def rows_read @rows_read ||= 0 end # Accessor for the total number of rows processed attr_accessor :rows_written def rows_written @rows_written ||= 0 end # Access the current ETL::Execution::Job instance attr_accessor :job # The limit on rows to load from the source, useful for testing the ETL # process prior to executing the entire batch. Default value is nil and # indicates that there is no limit attr_accessor :limit # The offset for the source to begin at, useful for testing the ETL # process prior to executing the entire batch. Default value is nil and # indicates that there is no offset attr_accessor :offset # Set to true to skip all bulk importing attr_accessor :skip_bulk_import # Set to true to read locally from the last source cache files attr_accessor :read_locally # Accessor for the average rows per second processed attr_accessor :average_rows_per_second end # Say the specified message, with a newline def say(message) say_without_newline(message + "\n") end # Say the specified message without a newline def say_without_newline(message) if Engine.realtime_activity $stdout.print message $stdout.flush end end # Say the message on its own line def say_on_own_line(message) say("\n" + message) end # Array of errors encountered during execution of the ETL process def errors @errors ||= [] end # Get a Hash of benchmark values where each value represents the total # amount of time in seconds spent processing in that portion of the ETL # pipeline. Keys include: # * :transforms # * :after_reads # * :before_writes # * :writes def benchmarks @benchmarks ||= { :transforms => 0, :after_reads => 0, :before_writes => 0, :writes => 0, } end # Process a control file or object. Acceptable values for control are: # * Path to a file # * File object # * ETL::Control::Control instance def process(control) control = ETL::Control::Control.resolve(control) ETL::Engine.job = ETL::Execution::Job.create!( :control_file => control.file, :status => 'executing' ) execute_dependencies(control) start_time = Time.now Engine.logger.debug "Pre-processing #{control.file}" pre_process(control) Engine.logger.debug "Pre-processing complete" sources = control.sources destinations = control.destinations say "Skipping bulk import" if Engine.skip_bulk_import sources.each do |source| Engine.current_source = source Engine.logger.debug "Processing source #{source}" say "Source: #{source}" say "Limiting enabled: #{Engine.limit}" if Engine.limit != nil say "Offset enabled: #{Engine.offset}" if Engine.offset != nil source.each_with_index do |row, index| # Break out of the row loop if the +Engine.limit+ is specified and # the number of rows read exceeds that value. if Engine.limit != nil && Engine.rows_read >= Engine.limit puts "Reached limit of #{Engine.limit}" break end Engine.logger.debug "Row #{index}: #{row.inspect}" Engine.rows_read += 1 Engine.current_source_row = index + 1 if Engine.realtime_activity && index > 0 && index % 1000 == 0 say_without_newline "." end # At this point a single row may be turned into multiple rows via row # processors all code after this line should work with the array of # rows rather than the single row rows = [row] t = Benchmark.realtime do begin Engine.logger.debug "Processing after read" control.after_read_processors.each do |processor| processed_rows = [] rows.each do |row| processed_rows << processor.process(row) end rows = processed_rows.flatten end rescue => e msg = "Error processing rows after read from #{Engine.current_source} on line #{Engine.current_source_row}: #{e}" errors << msg Engine.logger.error(msg) exceeded_error_threshold?(control) ? break : next end end benchmarks[:after_reads] += t unless t.nil? t = Benchmark.realtime do begin # execute transforms Engine.logger.debug "Executing transforms" rows.each do |row| control.transforms.each do |transform| name = transform.name.to_sym row[name] = transform.transform(name, row[name], row) end end rescue => e msg = "Error transforming from #{Engine.current_source} on line #{Engine.current_source_row}: #{e}" errors << msg Engine.logger.error(msg) e.backtrace.each { |line| Engine.logger.error(line) } begin exceeded_error_threshold?(control) ? break : next rescue => inner_error puts inner_error end end end benchmarks[:transforms] += t unless t.nil? t = Benchmark.realtime do begin # execute row-level "before write" processing Engine.logger.debug "Processing before write" control.before_write_processors.each do |processor| processed_rows = [] rows.each do |row| processed_rows << processor.process(row) end rows = processed_rows.flatten.compact end rescue => e msg = "Error processing rows before write from #{Engine.current_source} on line #{Engine.current_source_row}: #{e}" errors << msg Engine.logger.error(msg) e.backtrace.each { |line| Engine.logger.error(line) } exceeded_error_threshold?(control) ? break : next end end benchmarks[:before_writes] += t unless t.nil? t = Benchmark.realtime do begin # write the row to the destination destinations.each_with_index do |destination, index| Engine.current_destination = destination rows.each do |row| destination.write(row) Engine.rows_written += 1 if index == 0 end end rescue => e msg = "Error writing to #{Engine.current_destination}: #{e}" errors << msg Engine.logger.error msg e.backtrace.each { |line| Engine.logger.error(line) } exceeded_error_threshold?(control) ? break : next end end benchmarks[:writes] += t unless t.nil? end if exceeded_error_threshold?(control) say_on_own_line "Exiting due to exceeding error threshold: #{control.error_threshold}" return end end destinations.each do |destination| destination.close end say_on_own_line "Executing post processes" Engine.logger.debug "Post-processing #{control.file}" post_process(control) Engine.logger.debug "Post-processing complete" say "Post-processing complete" if sources.length > 0 say_on_own_line "Read #{Engine.rows_read} lines from sources" end if destinations.length > 0 say "Wrote #{Engine.rows_written} lines to destinations" end say "Completed #{control.file} in #{distance_of_time_in_words(start_time)} with #{errors.length} errors." say "Processing average: #{Engine.average_rows_per_second} rows/sec)" say "Avg after_reads: #{Engine.rows_read/benchmarks[:after_reads]} rows/sec" if benchmarks[:after_reads] > 0 say "Avg before_writes: #{Engine.rows_read/benchmarks[:before_writes]} rows/sec" if benchmarks[:before_writes] > 0 say "Avg transforms: #{Engine.rows_read/benchmarks[:transforms]} rows/sec" if benchmarks[:transforms] > 0 say "Avg writes: #{Engine.rows_read/benchmarks[:writes]} rows/sec" if benchmarks[:writes] > 0 say "Avg time writing execution records: #{ETL::Execution::Record.average_time_spent}" # ETL::Transform::Transform.benchmarks.each do |klass, t| # say "Avg #{klass}: #{Engine.rows_read/t} rows/sec" # end ETL::Engine.job.completed_at = Time.now ETL::Engine.job.status = (errors.length > 0 ? 'completed with errors' : 'completed') ETL::Engine.job.save! end private # Return true if the error threshold is exceeded def exceeded_error_threshold?(control) errors.length > control.error_threshold end # Execute all preprocessors def pre_process(control) control.pre_processors.each do |processor| processor.process end end # Execute all postprocessors def post_process(control) control.post_processors.each do |processor| processor.process end end # Execute all dependencies def execute_dependencies(control) Engine.logger.debug "Executing dependencies" control.dependencies.flatten.each do |dependency| case dependency when Symbol f = dependency.to_s + '.ctl' Engine.logger.debug "Executing dependency: #{f}" say "Executing dependency: #{f}" process(f) when String Engine.logger.debug "Executing dependency: #{f}" say "Executing dependency: #{f}" process(dependency) else raise "Invalid dependency type: #{dependency.class}" end end end # Return the distance of time in words from the given from_time to the specified to_time. If to_time # is not specified then Time.now is used. By default seconds are included...set the include_seconds # argument to false to disable the seconds. def distance_of_time_in_words(from_time, to_time=Time.now) from_time = from_time.to_time if from_time.respond_to?(:to_time) to_time = to_time.to_time if to_time.respond_to?(:to_time) seconds = (to_time - from_time).round distance_in_days = (seconds/(60*60*24)).round seconds = seconds % (60*60*24) distance_in_hours = (seconds/(60*60)).round seconds = seconds % (60*60) distance_in_minutes = (seconds/60).round seconds = seconds % 60 distance_in_seconds = seconds s = '' s << "#{distance_in_days} days," if distance_in_days > 0 s << "#{distance_in_hours} hours, " if distance_in_hours > 0 s << "#{distance_in_minutes} minutes, " if distance_in_minutes > 0 s << "#{distance_in_seconds} seconds" s end # Get the approximate disntance of time in words from the given from_time # to the the given to_time. If to_time is not specified then it is set # to Time.now. By default seconds are included...set the include_seconds # argument to false to disable the seconds. def approximate_distance_of_time_in_words(from_time, to_time=Time.now, include_seconds=true) from_time = from_time.to_time if from_time.respond_to?(:to_time) to_time = to_time.to_time if to_time.respond_to?(:to_time) distance_in_minutes = (((to_time - from_time).abs)/60).round distance_in_seconds = ((to_time - from_time).abs).round case distance_in_minutes when 0..1 return (distance_in_minutes == 0) ? 'less than a minute' : '1 minute' unless include_seconds case distance_in_seconds when 0..4 then 'less than 5 seconds' when 5..9 then 'less than 10 seconds' when 10..19 then 'less than 20 seconds' when 20..39 then 'half a minute' when 40..59 then 'less than a minute' else '1 minute' end when 2..44 then "#{distance_in_minutes} minutes" when 45..89 then 'about 1 hour' when 90..1439 then "about #{(distance_in_minutes.to_f / 60.0).round} hours" when 1440..2879 then '1 day' when 2880..43199 then "#{(distance_in_minutes / 1440).round} days" when 43200..86399 then 'about 1 month' when 86400..525959 then "#{(distance_in_minutes / 43200).round} months" when 525960..1051919 then 'about 1 year' else "over #{(distance_in_minutes / 525960).round} years" end end end end