require 'fileutils'
module ETL #:nodoc:
class NoLimitSpecifiedError < StandardError; end
class Source < ::ActiveRecord::Base #:nodoc:
# Connection for database sources
end
module Control #:nodoc:
# Source object which extracts data from a database using ActiveRecord.
class DatabaseSource < Source
attr_accessor :target
attr_accessor :table
# Initialize the source.
#
# Arguments:
# * control: The ETL::Control::Control instance
# * configuration: The configuration Hash
# * definition: The source definition
#
# Required configuration options:
# * :target: The target connection
# * :table: The source table name
# * :database: The database name
#
# Other options:
# * :join: Optional join part for the query (ignored unless
# specified)
# * :select: Optional select part for the query (defaults to
# '*')
# * :group: Optional group by part for the query (ignored
# unless specified)
# * :order: Optional order part for the query (ignored unless
# specified)
# * :new_records_only: Specify the column to use when comparing
# timestamps against the last successful ETL job execution for the
# current control file.
# * :store_locally: Set to false to not store a copy of the
# source data locally in a flat file (defaults to true)
def initialize(control, configuration, definition)
super
@target = configuration[:target]
@table = configuration[:table]
@query = configuration[:query]
end
# Get a String identifier for the source
def to_s
"#{host}/#{database}/#{@table}"
end
# Get the local directory to use, which is a combination of the
# local_base, the db hostname the db database name and the db table.
def local_directory
File.join(local_base, to_s)
end
# Get the join part of the query, defaults to nil
def join
configuration[:join]
end
# Get the select part of the query, defaults to '*'
def select
configuration[:select] || '*'
end
# Get the group by part of the query, defaults to nil
def group
configuration[:group]
end
# Get the order for the query, defaults to nil
def order
configuration[:order]
end
# Return the column which is used for in the where clause to identify
# new rows
def new_records_only
configuration[:new_records_only]
end
# Get the number of rows in the source
def count(use_cache=true)
return @count if @count && use_cache
if @store_locally || read_locally
@count = count_locally
else
@count = connection.select_value(query.gsub(/SELECT .* FROM/, 'SELECT count(1) FROM'))
end
end
# Get the list of columns to read. This is defined in the source
# definition as either an Array or Hash
def columns
# weird default is required for writing to cache correctly
@columns ||= query_rows.any? ? query_rows.first.keys : ['']
end
# Returns each row from the source. If read_locally is specified then
# this method will attempt to read from the last stored local file.
# If no locally stored file exists or if the trigger file for the last
# locally stored file does not exist then this method will raise an
# error.
def each(&block)
if read_locally # Read from the last stored source
ETL::Engine.logger.debug "Reading from local cache"
read_rows(last_local_file, &block)
else # Read from the original source
if @store_locally
file = local_file
write_local(file)
read_rows(file, &block)
else
query_rows.each do |r|
row = ETL::Row.new()
r.symbolize_keys.each_pair { |key, value|
row[key] = value
}
row.source = self
yield row
end
end
end
end
private
# Read rows from the local cache
def read_rows(file)
raise "Local cache file not found" unless File.exists?(file)
raise "Local cache trigger file not found" unless File.exists?(local_file_trigger(file))
t = Benchmark.realtime do
CSV.open(file, :headers => true).each do |row|
result_row = ETL::Row.new
result_row.source = self
row.each do |header, field|
result_row[header.to_sym] = field
end
yield result_row
end
end
ETL::Engine.average_rows_per_second = ETL::Engine.rows_read / t
end
def count_locally
counter = 0
File.open(last_local_file, 'r').each { |line| counter += 1 }
counter
end
# Write rows to the local cache
def write_local(file)
lines = 0
t = Benchmark.realtime do
CSV.open(file, 'w') do |f|
f << columns
query_rows.each do |row|
f << columns.collect { |column| row[column.to_s] }
lines += 1
end
end
File.open(local_file_trigger(file), 'w') {|f| }
end
ETL::Engine.logger.info "Stored locally in #{t}s (avg: #{lines/t} lines/sec)"
end
# Get the query to use
def query
return @query if @query
q = "SELECT #{select} FROM #{@table}"
q << " #{join}" if join
conditions = []
if new_records_only
last_completed = ETL::Execution::Job.maximum('created_at',
:conditions => ['control_file = ? and completed_at is not null', control.file]
)
if last_completed
conditions << "#{new_records_only} > #{connection.quote(last_completed.to_s(:db))}"
end
end
conditions << configuration[:conditions] if configuration[:conditions]
if conditions.length > 0
q << " WHERE #{conditions.join(' AND ')}"
end
q << " GROUP BY #{group}" if group
q << " ORDER BY #{order}" if order
limit = ETL::Engine.limit
offset = ETL::Engine.offset
if limit || offset
raise NoLimitSpecifiedError, "Specifying offset without limit is not allowed" if offset and limit.nil?
q << " LIMIT #{limit}"
q << " OFFSET #{offset}" if offset
end
q = q.gsub(/\n/,' ')
ETL::Engine.logger.info "Query: #{q}"
@query = q
end
def query_rows
return @query_rows if @query_rows
if (configuration[:mysqlstream] == true)
MySqlStreamer.new(query,@target,connection)
else
connection.select_all(query)
end
end
# Get the database connection to use
def connection
ETL::Engine.connection(target)
end
# Get the host, defaults to 'localhost'
def host
ETL::Base.configurations[target.to_s]['host'] || 'localhost'
end
def database
ETL::Base.configurations[target.to_s]['database']
end
end
end
end