module ETL #:nodoc:
module ActiveRecord #:nodoc:
# Base class which is used for ActiveRecord connections. This is necessary
# since AR connections are tied to the class, and using ActiveRecord::Base
# directly can cause problems if the connection is closed.
class Base < ::ActiveRecord::Base
end
end
# The main ETL engine clas
class Engine
class << self
# Initialization that is run when a job is executed.
def init(options={})
unless @initialized
@limit = options[:limit]
@offset = options[:offset]
@log_write_mode = 'w' if options[:newlog]
@skip_bulk_import = options[:skip_bulk_import]
@read_locally = options[:read_locally]
options[:config] ||= 'database.yml'
database_configuration = YAML::load(ERB.new(IO.read(options[:config])).result + "\n")
ETL::ActiveRecord::Base.configurations = database_configuration
ActiveRecord::Base.configurations.merge!(ETL::ActiveRecord::Base.configurations)
require 'etl/execution'
ETL::Execution::Base.establish_connection :etl_execution
ETL::Execution::Execution.migrate
@initialized = true
end
end
# Process the specified control file. Acceptable values for control_file are
# * Path to a file
# * File object
# * ETL::Control::Control instance
def process(control_file)
new().process(control_file)
end
attr_accessor :timestamped_log
# Accessor for the log write mode. Default is 'a' for append.
attr_accessor :log_write_mode
def log_write_mode
@log_write_mode ||= 'a'
end
# A logger for the engine
attr_accessor :logger
def logger #:nodoc:
unless @logger
if timestamped_log
@logger = Logger.new("etl_#{timestamp}.log")
else
@logger = Logger.new(File.open('etl.log', log_write_mode))
end
@logger.level = Logger::ERROR
@logger.formatter = Logger::Formatter.new
end
@logger
end
# Get a timestamp value as a string
def timestamp
Time.now.strftime("%Y%m%d%H%M%S")
end
# The current source
attr_accessor :current_source
# The current source row
attr_accessor :current_source_row
# The current destination
attr_accessor :current_destination
# Set to true to activate realtime activity. This will cause certain
# information messages to be printed to STDOUT
attr_accessor :realtime_activity
# Accessor for the total number of rows read from sources
attr_accessor :rows_read
def rows_read
@rows_read ||= 0
end
# Accessor for the total number of rows processed
attr_accessor :rows_written
def rows_written
@rows_written ||= 0
end
# Access the current ETL::Execution::Job instance
attr_accessor :job
# The limit on rows to load from the source, useful for testing the ETL
# process prior to executing the entire batch. Default value is nil and
# indicates that there is no limit
attr_accessor :limit
# The offset for the source to begin at, useful for testing the ETL
# process prior to executing the entire batch. Default value is nil and
# indicates that there is no offset
attr_accessor :offset
# Set to true to skip all bulk importing
attr_accessor :skip_bulk_import
# Set to true to read locally from the last source cache files
attr_accessor :read_locally
# Accessor for the average rows per second processed
attr_accessor :average_rows_per_second
end
# Say the specified message, with a newline
def say(message)
say_without_newline(message + "\n")
end
# Say the specified message without a newline
def say_without_newline(message)
if Engine.realtime_activity
$stdout.print message
$stdout.flush
end
end
# Say the message on its own line
def say_on_own_line(message)
say("\n" + message)
end
# Array of errors encountered during execution of the ETL process
def errors
@errors ||= []
end
# Get a Hash of benchmark values where each value represents the total
# amount of time in seconds spent processing in that portion of the ETL
# pipeline. Keys include:
# * :transforms
# * :after_reads
# * :before_writes
# * :writes
def benchmarks
@benchmarks ||= {
:transforms => 0,
:after_reads => 0,
:before_writes => 0,
:writes => 0,
}
end
# Process a control file or object. Acceptable values for control are:
# * Path to a file
# * File object
# * ETL::Control::Control instance
def process(control)
control = ETL::Control::Control.resolve(control)
ETL::Engine.job = ETL::Execution::Job.create!(
:control_file => control.file,
:status => 'executing'
)
execute_dependencies(control)
start_time = Time.now
Engine.logger.debug "Pre-processing #{control.file}"
pre_process(control)
Engine.logger.debug "Pre-processing complete"
sources = control.sources
destinations = control.destinations
say "Skipping bulk import" if Engine.skip_bulk_import
sources.each do |source|
Engine.current_source = source
Engine.logger.debug "Processing source #{source}"
say "Source: #{source}"
say "Limiting enabled: #{Engine.limit}" if Engine.limit != nil
say "Offset enabled: #{Engine.offset}" if Engine.offset != nil
source.each_with_index do |row, index|
# Break out of the row loop if the +Engine.limit+ is specified and
# the number of rows read exceeds that value.
if Engine.limit != nil && Engine.rows_read >= Engine.limit
puts "Reached limit of #{Engine.limit}"
break
end
Engine.logger.debug "Row #{index}: #{row.inspect}"
Engine.rows_read += 1
Engine.current_source_row = index + 1
if Engine.realtime_activity && index > 0 && index % 1000 == 0
say_without_newline "."
end
# At this point a single row may be turned into multiple rows via row
# processors all code after this line should work with the array of
# rows rather than the single row
rows = [row]
t = Benchmark.realtime do
begin
Engine.logger.debug "Processing after read"
control.after_read_processors.each do |processor|
processed_rows = []
rows.each do |row|
processed_rows << processor.process(row)
end
rows = processed_rows.flatten
end
rescue => e
msg = "Error processing rows after read from #{Engine.current_source} on line #{Engine.current_source_row}: #{e}"
errors << msg
Engine.logger.error(msg)
exceeded_error_threshold?(control) ? break : next
end
end
benchmarks[:after_reads] += t unless t.nil?
t = Benchmark.realtime do
begin
# execute transforms
Engine.logger.debug "Executing transforms"
rows.each do |row|
control.transforms.each do |transform|
name = transform.name.to_sym
row[name] = transform.transform(name, row[name], row)
end
end
rescue => e
msg = "Error transforming from #{Engine.current_source} on line #{Engine.current_source_row}: #{e}"
errors << msg
Engine.logger.error(msg)
e.backtrace.each { |line| Engine.logger.error(line) }
begin
exceeded_error_threshold?(control) ? break : next
rescue => inner_error
puts inner_error
end
end
end
benchmarks[:transforms] += t unless t.nil?
t = Benchmark.realtime do
begin
# execute row-level "before write" processing
Engine.logger.debug "Processing before write"
control.before_write_processors.each do |processor|
processed_rows = []
rows.each do |row|
processed_rows << processor.process(row)
end
rows = processed_rows.flatten.compact
end
rescue => e
msg = "Error processing rows before write from #{Engine.current_source} on line #{Engine.current_source_row}: #{e}"
errors << msg
Engine.logger.error(msg)
e.backtrace.each { |line| Engine.logger.error(line) }
exceeded_error_threshold?(control) ? break : next
end
end
benchmarks[:before_writes] += t unless t.nil?
t = Benchmark.realtime do
begin
# write the row to the destination
destinations.each_with_index do |destination, index|
Engine.current_destination = destination
rows.each do |row|
destination.write(row)
Engine.rows_written += 1 if index == 0
end
end
rescue => e
msg = "Error writing to #{Engine.current_destination}: #{e}"
errors << msg
Engine.logger.error msg
e.backtrace.each { |line| Engine.logger.error(line) }
exceeded_error_threshold?(control) ? break : next
end
end
benchmarks[:writes] += t unless t.nil?
end
if exceeded_error_threshold?(control)
say_on_own_line "Exiting due to exceeding error threshold: #{control.error_threshold}"
return
end
end
destinations.each do |destination|
destination.close
end
say_on_own_line "Executing post processes"
Engine.logger.debug "Post-processing #{control.file}"
post_process(control)
Engine.logger.debug "Post-processing complete"
say "Post-processing complete"
if sources.length > 0
say_on_own_line "Read #{Engine.rows_read} lines from sources"
end
if destinations.length > 0
say "Wrote #{Engine.rows_written} lines to destinations"
end
say "Completed #{control.file} in #{distance_of_time_in_words(start_time)} with #{errors.length} errors."
say "Processing average: #{Engine.average_rows_per_second} rows/sec)"
say "Avg after_reads: #{Engine.rows_read/benchmarks[:after_reads]} rows/sec" if benchmarks[:after_reads] > 0
say "Avg before_writes: #{Engine.rows_read/benchmarks[:before_writes]} rows/sec" if benchmarks[:before_writes] > 0
say "Avg transforms: #{Engine.rows_read/benchmarks[:transforms]} rows/sec" if benchmarks[:transforms] > 0
say "Avg writes: #{Engine.rows_read/benchmarks[:writes]} rows/sec" if benchmarks[:writes] > 0
say "Avg time writing execution records: #{ETL::Execution::Record.average_time_spent}"
# ETL::Transform::Transform.benchmarks.each do |klass, t|
# say "Avg #{klass}: #{Engine.rows_read/t} rows/sec"
# end
ETL::Engine.job.completed_at = Time.now
ETL::Engine.job.status = (errors.length > 0 ? 'completed with errors' : 'completed')
ETL::Engine.job.save!
end
private
# Return true if the error threshold is exceeded
def exceeded_error_threshold?(control)
errors.length > control.error_threshold
end
# Execute all preprocessors
def pre_process(control)
control.pre_processors.each do |processor|
processor.process
end
end
# Execute all postprocessors
def post_process(control)
control.post_processors.each do |processor|
processor.process
end
end
# Execute all dependencies
def execute_dependencies(control)
Engine.logger.debug "Executing dependencies"
control.dependencies.flatten.each do |dependency|
case dependency
when Symbol
f = dependency.to_s + '.ctl'
Engine.logger.debug "Executing dependency: #{f}"
say "Executing dependency: #{f}"
process(f)
when String
Engine.logger.debug "Executing dependency: #{f}"
say "Executing dependency: #{f}"
process(dependency)
else
raise "Invalid dependency type: #{dependency.class}"
end
end
end
# Return the distance of time in words from the given from_time to the specified to_time. If to_time
# is not specified then Time.now is used. By default seconds are included...set the include_seconds
# argument to false to disable the seconds.
def distance_of_time_in_words(from_time, to_time=Time.now)
from_time = from_time.to_time if from_time.respond_to?(:to_time)
to_time = to_time.to_time if to_time.respond_to?(:to_time)
seconds = (to_time - from_time).round
distance_in_days = (seconds/(60*60*24)).round
seconds = seconds % (60*60*24)
distance_in_hours = (seconds/(60*60)).round
seconds = seconds % (60*60)
distance_in_minutes = (seconds/60).round
seconds = seconds % 60
distance_in_seconds = seconds
s = ''
s << "#{distance_in_days} days," if distance_in_days > 0
s << "#{distance_in_hours} hours, " if distance_in_hours > 0
s << "#{distance_in_minutes} minutes, " if distance_in_minutes > 0
s << "#{distance_in_seconds} seconds"
s
end
# Get the approximate disntance of time in words from the given from_time
# to the the given to_time. If to_time is not specified then it is set
# to Time.now. By default seconds are included...set the include_seconds
# argument to false to disable the seconds.
def approximate_distance_of_time_in_words(from_time, to_time=Time.now, include_seconds=true)
from_time = from_time.to_time if from_time.respond_to?(:to_time)
to_time = to_time.to_time if to_time.respond_to?(:to_time)
distance_in_minutes = (((to_time - from_time).abs)/60).round
distance_in_seconds = ((to_time - from_time).abs).round
case distance_in_minutes
when 0..1
return (distance_in_minutes == 0) ? 'less than a minute' : '1 minute' unless include_seconds
case distance_in_seconds
when 0..4 then 'less than 5 seconds'
when 5..9 then 'less than 10 seconds'
when 10..19 then 'less than 20 seconds'
when 20..39 then 'half a minute'
when 40..59 then 'less than a minute'
else '1 minute'
end
when 2..44 then "#{distance_in_minutes} minutes"
when 45..89 then 'about 1 hour'
when 90..1439 then "about #{(distance_in_minutes.to_f / 60.0).round} hours"
when 1440..2879 then '1 day'
when 2880..43199 then "#{(distance_in_minutes / 1440).round} days"
when 43200..86399 then 'about 1 month'
when 86400..525959 then "#{(distance_in_minutes / 43200).round} months"
when 525960..1051919 then 'about 1 year'
else "over #{(distance_in_minutes / 525960).round} years"
end
end
end
end