module ETL #:nodoc: class Base < ActiveRecord::Base end # The main ETL engine clas class Engine include ETL::Util class << self # Initialization that is run when a job is executed. # # Options: # * :limit: Limit the number of records returned from sources # * :offset: Specify the records for data from sources # * :log_write_mode: If true then the log will write, otherwise it will append # * :skip_bulk_import: Set to true to skip bulk import # * :read_locally: Set to true to read from the local cache # * :rails_root: Set to the rails root to boot rails def init(options={}) unless @initialized puts "initializing ETL engine\n\n" @limit = options[:limit] @offset = options[:offset] @log_write_mode = 'w' if options[:newlog] @skip_bulk_import = options[:skip_bulk_import] @read_locally = options[:read_locally] @rails_root = options[:rails_root] require File.join(@rails_root, 'config/environment') if @rails_root options[:config] ||= 'database.yml' options[:config] = 'config/database.yml' unless File.exist?(options[:config]) database_configuration = YAML::load(ERB.new(IO.read(options[:config])).result + "\n") ActiveRecord::Base.configurations.merge!(database_configuration) ETL::Base.configurations = database_configuration #puts "configurations in init: #{ActiveRecord::Base.configurations.inspect}" require 'etl/execution' ETL::Execution::Base.establish_connection :etl_execution ETL::Execution::Execution.migrate @initialized = true end end # Process the specified file. Acceptable values for file are: # * Path to a file # * File object # * ETL::Control::Control instance # * ETL::Batch::Batch instance # # The process command will accept either a .ctl Control file or a .ebf # ETL Batch File. def process(file) new().process(file) end attr_accessor :timestamped_log # Accessor for the log write mode. Default is 'a' for append. attr_accessor :log_write_mode def log_write_mode @log_write_mode ||= 'a' end # A logger for the engine attr_accessor :logger def logger #:nodoc: unless @logger if timestamped_log @logger = Logger.new("etl_#{timestamp}.log") else @logger = Logger.new(File.open('etl.log', log_write_mode)) end @logger.level = Logger::WARN @logger.formatter = Logger::Formatter.new end @logger end # Get a timestamp value as a string def timestamp Time.now.strftime("%Y%m%d%H%M%S") end # The current source attr_accessor :current_source # The current source row attr_accessor :current_source_row # The current destination attr_accessor :current_destination # Set to true to activate realtime activity. This will cause certain # information messages to be printed to STDOUT attr_accessor :realtime_activity # Accessor for the total number of rows read from sources attr_accessor :rows_read def rows_read @rows_read ||= 0 end # Accessor for the total number of rows processed attr_accessor :rows_written def rows_written @rows_written ||= 0 end # Access the current ETL::Execution::Job instance attr_accessor :job # Access the current ETL::Execution::Batch instance attr_accessor :batch # The limit on rows to load from the source, useful for testing the ETL # process prior to executing the entire batch. Default value is nil and # indicates that there is no limit attr_accessor :limit # The offset for the source to begin at, useful for testing the ETL # process prior to executing the entire batch. Default value is nil and # indicates that there is no offset attr_accessor :offset # Set to true to skip all bulk importing attr_accessor :skip_bulk_import # Set to true to read locally from the last source cache files attr_accessor :read_locally # Accessor for the average rows per second processed attr_accessor :average_rows_per_second # Get a named connection def connection(name) logger.debug "Retrieving connection #{name}" conn = connections[name] ||= establish_connection(name) #conn.verify!(ActiveRecord::Base.verification_timeout) conn.reconnect! unless conn.active? conn end # Set to true to use temp tables attr_accessor :use_temp_tables # Get a registry of temp tables def temp_tables @temp_tables ||= {} end # Called when a batch job finishes, allowing for cleanup to occur def finish temp_tables.each do |temp_table, mapping| actual_table = mapping[:table] #puts "move #{temp_table} to #{actual_table}" conn = mapping[:connection] conn.transaction do conn.rename_table(actual_table, "#{actual_table}_old") conn.rename_table(temp_table, actual_table) conn.drop_table("#{actual_table}_old") end end end # Return true if using temp tables def use_temp_tables? use_temp_tables ? true : false end # Modify the table name if necessary def table(table_name, connection) if use_temp_tables? returning "tmp_#{table_name}" do |temp_table_name| if temp_tables[temp_table_name].nil? # Create the temp table and add it to the mapping begin connection.drop_table(temp_table_name); rescue; end connection.copy_table(table_name, temp_table_name) temp_tables[temp_table_name] = { :table => table_name, :connection => connection } end end else table_name end end protected # Hash of database connections that can be used throughout the ETL # process def connections @connections ||= {} end # Establish the named connection and return the database specific connection def establish_connection(name) logger.debug "Establishing connection to #{name}" conn_config = ETL::Base.configurations[name.to_s] raise ETL::ETLError, "No connection found for #{name}" unless conn_config connection_method = "#{conn_config['adapter']}_connection" ETL::Base.send(connection_method, conn_config) end end # class << self # Say the specified message, with a newline def say(message) say_without_newline(message + "\n") end # Say the specified message without a newline def say_without_newline(message) if ETL::Engine.realtime_activity $stdout.print message $stdout.flush end end # Say the message on its own line def say_on_own_line(message) say("\n" + message) end # Array of errors encountered during execution of the ETL process def errors @errors ||= [] end # Get a Hash of benchmark values where each value represents the total # amount of time in seconds spent processing in that portion of the ETL # pipeline. Keys include: # * :transforms # * :after_reads # * :before_writes # * :writes def benchmarks @benchmarks ||= { :transforms => 0, :after_reads => 0, :before_writes => 0, :writes => 0, } end # Process a file, control object or batch object. Acceptable values for # file are: # * Path to a file # * File object # * ETL::Control::Control instance # * ETL::Batch::Batch instance def process(file) case file when String process(File.new(file)) when File process_control(file) if file.path =~ /.ctl$/ process_batch(file) if file.path =~ /.ebf$/ when ETL::Control::Control process_control(file) when ETL::Batch::Batch process_batch(file) else raise RuntimeError, "Process object must be a String, File, Control instance or Batch instance" end end protected # Process the specified batch file def process_batch(batch) batch = ETL::Batch::Batch.resolve(batch, self) say "Processing batch #{batch.file}" ETL::Engine.batch = ETL::Execution::Batch.create!( :batch_file => batch.file, :status => 'executing' ) batch.execute ETL::Engine.batch.completed_at = Time.now ETL::Engine.batch.status = (errors.length > 0 ? 'completed with errors' : 'completed') ETL::Engine.batch.save! end # Process the specified control file def process_control(control) control = ETL::Control::Control.resolve(control) say_on_own_line "Processing control #{control.file}" ETL::Engine.job = ETL::Execution::Job.create!( :control_file => control.file, :status => 'executing', :batch_id => ETL::Engine.batch ? ETL::Engine.batch.id : nil ) execute_dependencies(control) start_time = Time.now pre_process(control) sources = control.sources destinations = control.destinations say "Skipping bulk import" if Engine.skip_bulk_import sources.each do |source| Engine.current_source = source Engine.logger.debug "Processing source #{source}" say "Source: #{source}" say "Limiting enabled: #{Engine.limit}" if Engine.limit != nil say "Offset enabled: #{Engine.offset}" if Engine.offset != nil source.each_with_index do |row, index| # Break out of the row loop if the +Engine.limit+ is specified and # the number of rows read exceeds that value. if Engine.limit != nil && Engine.rows_read >= Engine.limit puts "Reached limit of #{Engine.limit}" break end Engine.logger.debug "Row #{index}: #{row.inspect}" Engine.rows_read += 1 Engine.current_source_row = index + 1 say_without_newline "." if Engine.realtime_activity && index > 0 && index % 1000 == 0 # At this point a single row may be turned into multiple rows via row # processors all code after this line should work with the array of # rows rather than the single row rows = [row] t = Benchmark.realtime do begin Engine.logger.debug "Processing after read" control.after_read_processors.each do |processor| processed_rows = [] rows.each do |row| processed_rows << processor.process(row) end rows = processed_rows.flatten end rescue => e msg = "Error processing rows after read from #{Engine.current_source} on line #{Engine.current_source_row}: #{e}" errors << msg Engine.logger.error(msg) exceeded_error_threshold?(control) ? break : next end end benchmarks[:after_reads] += t unless t.nil? t = Benchmark.realtime do begin Engine.logger.debug "Executing transforms" rows.each do |row| control.transforms.each do |transform| name = transform.name.to_sym row[name] = transform.transform(name, row[name], row) end end rescue ResolverError => e Engine.logger.error(e.message) errors << e.message rescue => e msg = "Error transforming from #{Engine.current_source} on line #{Engine.current_source_row}: #{e}" errors << msg Engine.logger.error(msg) e.backtrace.each { |line| Engine.logger.error(line) } ensure begin exceeded_error_threshold?(control) ? break : next rescue => inner_error puts inner_error end end end benchmarks[:transforms] += t unless t.nil? t = Benchmark.realtime do begin # execute row-level "before write" processing Engine.logger.debug "Processing before write" control.before_write_processors.each do |processor| processed_rows = [] rows.each { |row| processed_rows << processor.process(row) } rows = processed_rows.flatten.compact end rescue => e msg = "Error processing rows before write from #{Engine.current_source} on line #{Engine.current_source_row}: #{e}" errors << msg Engine.logger.error(msg) e.backtrace.each { |line| Engine.logger.error(line) } exceeded_error_threshold?(control) ? break : next end end benchmarks[:before_writes] += t unless t.nil? t = Benchmark.realtime do begin # write the row to the destination destinations.each_with_index do |destination, index| Engine.current_destination = destination rows.each do |row| destination.write(row) Engine.rows_written += 1 if index == 0 end end rescue => e msg = "Error writing to #{Engine.current_destination}: #{e}" errors << msg Engine.logger.error msg e.backtrace.each { |line| Engine.logger.error(line) } exceeded_error_threshold?(control) ? break : next end end benchmarks[:writes] += t unless t.nil? end if exceeded_error_threshold?(control) say_on_own_line "Exiting due to exceeding error threshold: #{control.error_threshold}" return end end destinations.each do |destination| destination.close end say_on_own_line "Executing before post-process screens" begin execute_screens(control) rescue FatalScreenError => e say "Fatal screen error during job execution: #{e.message}" exit rescue ScreenError => e say "Screen error during job execution: #{e.message}" return else say "Screens passed" end post_process(control) if sources.length > 0 say_on_own_line "Read #{Engine.rows_read} lines from sources" end if destinations.length > 0 say "Wrote #{Engine.rows_written} lines to destinations" end say_on_own_line "Executing after post-process screens" begin execute_screens(control, :after_post_process) rescue FatalScreenError => e say "Fatal screen error during job execution: #{e.message}" exit rescue ScreenError => e say "Screen error during job execution: #{e.message}" return else say "Screens passed" end say_on_own_line "Completed #{control.file} in #{distance_of_time_in_words(start_time)} with #{errors.length} errors." say "Processing average: #{Engine.average_rows_per_second} rows/sec)" say "Avg after_reads: #{Engine.rows_read/benchmarks[:after_reads]} rows/sec" if benchmarks[:after_reads] > 0 say "Avg before_writes: #{Engine.rows_read/benchmarks[:before_writes]} rows/sec" if benchmarks[:before_writes] > 0 say "Avg transforms: #{Engine.rows_read/benchmarks[:transforms]} rows/sec" if benchmarks[:transforms] > 0 say "Avg writes: #{Engine.rows_read/benchmarks[:writes]} rows/sec" if benchmarks[:writes] > 0 # say "Avg time writing execution records: #{ETL::Execution::Record.average_time_spent}" # # ETL::Transform::Transform.benchmarks.each do |klass, t| # say "Avg #{klass}: #{Engine.rows_read/t} rows/sec" # end ETL::Engine.job.completed_at = Time.now ETL::Engine.job.status = (errors.length > 0 ? 'completed with errors' : 'completed') ETL::Engine.job.save! end private # Return true if the error threshold is exceeded def exceeded_error_threshold?(control) errors.length > control.error_threshold end # Execute all preprocessors def pre_process(control) Engine.logger.debug "Pre-processing #{control.file}" control.pre_processors.each do |processor| processor.process end Engine.logger.debug "Pre-processing complete" end # Execute all postprocessors def post_process(control) say_on_own_line "Executing post processes" Engine.logger.debug "Post-processing #{control.file}" control.post_processors.each do |processor| processor.process end Engine.logger.debug "Post-processing complete" say "Post-processing complete" end # Execute all dependencies def execute_dependencies(control) Engine.logger.debug "Executing dependencies" control.dependencies.flatten.each do |dependency| case dependency when Symbol f = dependency.to_s + '.ctl' Engine.logger.debug "Executing dependency: #{f}" say "Executing dependency: #{f}" process(f) when String Engine.logger.debug "Executing dependency: #{f}" say "Executing dependency: #{f}" process(dependency) else raise "Invalid dependency type: #{dependency.class}" end end end # Execute all screens def execute_screens(control, timing = :before_post_process) screens = case timing when :after_post_process control.after_post_process_screens else # default to before post-process screens control.screens end [:fatal,:error,:warn].each do |type| screens[type].each do |block| begin block.call rescue => e case type when :fatal raise FatalScreenError, e when :error raise ScreenError, e when :warn say "Screen warning: #{e}" end end end end end end end