require 'fileutils'
module ETL #:nodoc:
class Source < ::ActiveRecord::Base #:nodoc:
# Connection for database sources
end
module Control #:nodoc:
# Source object which extracts data from a database using ActiveRecord.
class DatabaseSource < Source
# Initialize the source.
#
# Arguments:
# * control: The ETL::Control::Control instance
# * configuration: The configuration Hash
# * definition: The source definition
#
# Required configuration options:
# * :table: The source table name
# * :database: The database name
#
# Other options:
# * :adapter: The adapter to use (defaults to :mysql)
# * :username: The database username (defaults to 'root')
# * :password: The password to the database (defaults to
# nothing)
# * :host: The host for the database (defaults to
# 'localhost')
# * :join: Optional join part for the query (ignored unless
# specified)
# * :select: Optional select part for the query (defaults to
# '*')
# * :order: Optional order part for the query (ignored unless
# specified)
# * :store_locally: Set to false to not store a copy of the
# source data locally in a flat file (defaults to true)
def initialize(control, configuration, definition)
super
connect
end
# Get a String identifier for the source
def to_s
"#{host}/#{configuration[:database]}/#{configuration[:table]}"
end
# Get the local directory to use, which is a combination of the
# local_base, the db hostname the db database name and the db table.
def local_directory
File.join(local_base, host, configuration[:database], configuration[:table])
end
# Get the join part of the query, defaults to nil
def join
configuration[:join]
end
# Get the select part of the query, defaults to '*'
def select
configuration[:select] || '*'
end
# Get the group by part of the query, defaults to nil
def group
configuration[:group]
end
# Get the order for the query, defaults to nil
def order
configuration[:order]
end
# Return the column which is used for in the where clause to identify
# new rows
def new_records_only
configuration[:new_records_only]
end
# Get the number of rows in the source
def count(use_cache=true)
return @count if @count && use_cache
if store_locally || read_locally
@count = count_locally
else
@count = connection.select_value(query.gsub(/SELECT .* FROM/, 'SELECT count(1) FROM'))
end
end
# Get the list of columns to read. This is defined in the source
# definition as either an Array or Hash
def columns
case definition
when Array
definition.collect(&:to_sym)
when Hash
definition.keys.collect(&:to_sym)
else
raise "Definition must be either an Array or a Hash"
end
end
# Returns each row from the source. If read_locally is specified then
# this method will attempt to read from the last stored local file.
# If no locally stored file exists or if the trigger file for the last
# locally stored file does not exist then this method will raise an
# error.
def each(&block)
if read_locally # Read from the last stored source
ETL::Engine.logger.debug "Reading from local cache"
read_rows(last_local_file, &block)
else # Read from the original source
if store_locally
file = local_file
write_local(file)
read_rows(file, &block)
else
connection.select_all(query).each do |row|
row = ETL::Row.new(row.symbolize_keys)
row.source = self
yield row
end
end
end
end
private
# Read rows from the local cache
def read_rows(file)
raise "Local cache file not found" unless File.exists?(file)
raise "Local cache trigger file not found" unless File.exists?(local_file_trigger(file))
t = Benchmark.realtime do
FasterCSV.open(file, :headers => true).each do |row|
result_row = ETL::Row.new
result_row.source = self
row.each do |header, field|
result_row[header.to_sym] = field
end
yield result_row
end
end
ETL::Engine.average_rows_per_second = ETL::Engine.rows_read / t
end
def count_locally
counter = 0
File.open(last_local_file, 'r').each { |line| counter += 1 }
counter
end
# Write rows to the local cache
def write_local(file)
lines = 0
t = Benchmark.realtime do
FasterCSV.open(file, 'w') do |f|
f << columns
connection.select_all(query).each do |row|
f << columns.collect { |column| row[column.to_s] }
lines += 1
end
end
File.open(local_file_trigger(file), 'w') {|f| }
end
ETL::Engine.logger.info "Stored locally in #{t}s (avg: #{lines/t} lines/sec)"
end
# Get the query to use
def query
return @query if @query
q = "SELECT #{select} FROM #{configuration[:table]}"
q << " #{join}" if join
conditions = []
if new_records_only
last_completed = ETL::Execution::Job.maximum('created_at',
:conditions => ['control_file = ? and completed_at is not null', control.file]
)
if last_completed
conditions << "#{new_records_only} > #{connection.quote(last_completed.to_s(:db))}"
end
end
conditions << configuration[:conditions] if configuration[:conditions]
if conditions.length > 0
q << " WHERE #{conditions.join(' AND ')}"
end
q << " GROUP BY #{group}" if group
q << " ORDER BY #{order}" if order
if ETL::Engine.limit || ETL::Engine.offset
options = {}
options[:limit] = ETL::Engine.limit if ETL::Engine.limit
options[:offset] = ETL::Engine.offset if ETL::Engine.offset
connection.add_limit_offset!(q, options)
end
q = q.gsub(/\n/,' ')
ETL::Engine.logger.info "Query: #{q}"
@query = q
end
# Get the database connection to use
def connection
ETL::Source.connection
end
# Get the adapter name, defaults to :mysql
def adapter
configuration[:adapter] || :mysql
end
# Get the host, defaults to 'localhost'
def host
configuration[:host] || "localhost"
end
# Get the username, defaults to 'root'
def username
configuration[:username] || 'root'
end
# Connect to the database.
#
# Required options:
# * :database: The database name
#
# Options:
# * :adapter: The adapter to use (defaults to :mysql)
# * :username: The database username (defaults to 'root')
# * :password: The password to the database (defaults
# to nothing)
# * :host: The host for the database (defaults to 'localhost')
def connect
ETL::Source.establish_connection(
:adapter => adapter,
:username => username,
:host => host,
:password => configuration[:password],
:database => configuration[:database]
)
end
end
end
end