require 'fileutils' module ETL #:nodoc: class Source < ::ActiveRecord::Base #:nodoc: # Connection for database sources end module Control #:nodoc: # Source object which extracts data from a database using ActiveRecord. class DatabaseSource < Source # Initialize the source. # # Arguments: # * control: The ETL::Control::Control instance # * configuration: The configuration Hash # * definition: The source definition # # Required configuration options: # * :table: The source table name # * :database: The database name # # Other options: # * :adapter: The adapter to use (defaults to :mysql) # * :username: The database username (defaults to 'root') # * :password: The password to the database (defaults to # nothing) # * :host: The host for the database (defaults to # 'localhost') # * :join: Optional join part for the query (ignored unless # specified) # * :select: Optional select part for the query (defaults to # '*') # * :order: Optional order part for the query (ignored unless # specified) # * :store_locally: Set to false to not store a copy of the # source data locally in a flat file (defaults to true) def initialize(control, configuration, definition) super connect end # Get a String identifier for the source def to_s "#{host}/#{configuration[:database]}/#{configuration[:table]}" end # Get the local directory to use, which is a combination of the # local_base, the db hostname the db database name and the db table. def local_directory File.join(local_base, host, configuration[:database], configuration[:table]) end # Get the join part of the query, defaults to nil def join configuration[:join] end # Get the select part of the query, defaults to '*' def select configuration[:select] || '*' end # Get the group by part of the query, defaults to nil def group configuration[:group] end # Get the order for the query, defaults to nil def order configuration[:order] end # Return the column which is used for in the where clause to identify # new rows def new_records_only configuration[:new_records_only] end # Get the number of rows in the source def count(use_cache=true) return @count if @count && use_cache if store_locally || read_locally @count = count_locally else @count = connection.select_value(query.gsub(/SELECT .* FROM/, 'SELECT count(1) FROM')) end end # Get the list of columns to read. This is defined in the source # definition as either an Array or Hash def columns case definition when Array definition.collect(&:to_sym) when Hash definition.keys.collect(&:to_sym) else raise "Definition must be either an Array or a Hash" end end # Returns each row from the source. If read_locally is specified then # this method will attempt to read from the last stored local file. # If no locally stored file exists or if the trigger file for the last # locally stored file does not exist then this method will raise an # error. def each(&block) if read_locally # Read from the last stored source ETL::Engine.logger.debug "Reading from local cache" read_rows(last_local_file, &block) else # Read from the original source if store_locally file = local_file write_local(file) read_rows(file, &block) else connection.select_all(query).each do |row| row = ETL::Row.new(row.symbolize_keys) row.source = self yield row end end end end private # Read rows from the local cache def read_rows(file) raise "Local cache file not found" unless File.exists?(file) raise "Local cache trigger file not found" unless File.exists?(local_file_trigger(file)) t = Benchmark.realtime do FasterCSV.open(file, :headers => true).each do |row| result_row = ETL::Row.new result_row.source = self row.each do |header, field| result_row[header.to_sym] = field end yield result_row end end ETL::Engine.average_rows_per_second = ETL::Engine.rows_read / t end def count_locally counter = 0 File.open(last_local_file, 'r').each { |line| counter += 1 } counter end # Write rows to the local cache def write_local(file) lines = 0 t = Benchmark.realtime do FasterCSV.open(file, 'w') do |f| f << columns connection.select_all(query).each do |row| f << columns.collect { |column| row[column.to_s] } lines += 1 end end File.open(local_file_trigger(file), 'w') {|f| } end ETL::Engine.logger.info "Stored locally in #{t}s (avg: #{lines/t} lines/sec)" end # Get the query to use def query return @query if @query q = "SELECT #{select} FROM #{configuration[:table]}" q << " #{join}" if join conditions = [] if new_records_only last_completed = ETL::Execution::Job.maximum('created_at', :conditions => ['control_file = ? and completed_at is not null', control.file] ) if last_completed conditions << "#{new_records_only} > #{connection.quote(last_completed.to_s(:db))}" end end conditions << configuration[:conditions] if configuration[:conditions] if conditions.length > 0 q << " WHERE #{conditions.join(' AND ')}" end q << " GROUP BY #{group}" if group q << " ORDER BY #{order}" if order if ETL::Engine.limit || ETL::Engine.offset options = {} options[:limit] = ETL::Engine.limit if ETL::Engine.limit options[:offset] = ETL::Engine.offset if ETL::Engine.offset connection.add_limit_offset!(q, options) end q = q.gsub(/\n/,' ') ETL::Engine.logger.info "Query: #{q}" @query = q end # Get the database connection to use def connection ETL::Source.connection end # Get the adapter name, defaults to :mysql def adapter configuration[:adapter] || :mysql end # Get the host, defaults to 'localhost' def host configuration[:host] || "localhost" end # Get the username, defaults to 'root' def username configuration[:username] || 'root' end # Connect to the database. # # Required options: # * :database: The database name # # Options: # * :adapter: The adapter to use (defaults to :mysql) # * :username: The database username (defaults to 'root') # * :password: The password to the database (defaults # to nothing) # * :host: The host for the database (defaults to 'localhost') def connect ETL::Source.establish_connection( :adapter => adapter, :username => username, :host => host, :password => configuration[:password], :database => configuration[:database] ) end end end end