module Forklift module Patterns class Mysql def self.pipe(source, from_table, destination, to_table, tmp_table="_forklift_tmp") start = Time.new.to_i from_db = source.current_database to_db = destination.current_database source.forklift.logger.log("mysql pipe: `#{from_db}`.`#{from_table}` => `#{to_db}`.`#{to_table}`") source.q("drop table if exists `#{to_db}`.`#{tmp_table}`") source.q("create table `#{to_db}`.`#{tmp_table}` like `#{from_db}`.`#{from_table}`") source.q("insert into `#{to_db}`.`#{tmp_table}` select * from `#{from_db}`.`#{from_table}`") source.q("drop table if exists `#{to_db}`.`#{to_table}`") source.q("rename table `#{to_db}`.`#{tmp_table}` to `#{to_db}`.`#{to_table}`") delta = Time.new.to_i - start source.forklift.logger.log(" ^ moved #{destination.count(to_table, to_db)} rows in #{delta}s") end def self.incremental_pipe(source, from_table, destination, to_table, matcher=source.default_matcher, primary_key='id') start = Time.new.to_i from_db = source.current_database to_db = destination.current_database source.forklift.logger.log("mysql incremental_pipe: `#{from_db}`.`#{from_table}` => `#{to_db}`.`#{to_table}`") source.q("create table if not exists `#{to_db}`.`#{to_table}` like `#{from_db}`.`#{from_table}`") # Count the number of rows in to_table original_count = source.count(to_table, to_db) # Find the latest/max/newest timestamp from the final table # in order to determine the last copied row. latest_timestamp = source.max_timestamp(to_table, matcher, to_db) # If to_table has existing rows, ensure none of them are "stale." # A stale row in to_table means a previously copied row was # updated in from_table, so let's delete it from the to_table # so we can get a fresh copy of that row. if original_count > 0 # Get the ids of rows in from_table that are newer than the newest row in to_table. # Some of these rows could either be a) stale or b) new. source.read("select `#{primary_key}` from `#{from_db}`.`#{from_table}` where `#{matcher}` > \"#{latest_timestamp}\" order by `#{matcher}`") do |stale_rows| if stale_rows.length > 0 # Delete these ids from to_table. # If the ids are stale, then they'll be deleted. If they're new, they won't exist, and nothing will happen. stale_ids = stale_rows.map { |row| row[primary_key.to_sym] }.join(',') source.q("delete from `#{to_db}`.`#{to_table}` where `#{primary_key}` in (#{stale_ids})") source.forklift.logger.log(" ^ deleted up to #{stale_rows.length} stale rows from `#{to_db}`.`#{to_table}`") end end end # Do the insert into to_table destination.q("insert into `#{to_db}`.`#{to_table}` select * from `#{from_db}`.`#{from_table}` where `#{matcher}` > \"#{latest_timestamp}\" order by `#{matcher}`") delta = Time.new.to_i - start new_count = destination.count(to_table, to_db) - original_count source.forklift.logger.log(" ^ created #{new_count} new rows in #{delta}s") end def self.optimistic_pipe(source, from_table, destination, to_table, matcher=source.default_matcher, primary_key='id') from_db = source.current_database to_db = destination.current_database if self.can_incremental_pipe?(source, from_table, destination, to_table, matcher) begin incremental_pipe(source, from_table, destination, to_table, matcher, primary_key) rescue Exception => e source.forklift.logger.log("! incremental_pipe failure on #{from_table} => #{to_table}: #{e} ") source.forklift.logger.log("! falling back to pipe...") pipe(source, from_table, destination, to_table) end else pipe(source, from_table, destination, to_table) end end def self.can_incremental_pipe?(source, from_table, destination, to_table, matcher=source.default_matcher) return false unless source.tables.include?(from_table) return false unless destination.tables.include?(to_table) source_cols = source.columns(from_table, source.current_database) destination_cols = destination.columns(to_table, destination.current_database) return false unless source_cols.include?(matcher) return false unless destination_cols.include?(matcher) source_cols.each do |source_col| return false unless destination_cols.include?(source_col) end destination_cols.each do |destination_col| return false unless source_cols.include?(destination_col) end true end ## When you are copying data to and from mysql ## An implementation of "pipe" for remote databases def self.mysql_optimistic_import(source, destination, matcher=source.default_matcher) source.tables.each do |table| if( source.columns(table).include?(matcher) && destination.tables.include?(table) && destination.columns(table).include?(matcher) ) since = destination.max_timestamp(table) source.read_since(table, since){ |data| destination.write(data, table) } else # destination.truncate table destination.drop! table if destination.tables.include?(table) source.read("select * from #{table}"){ |data| destination.write(data, table) } end end end # The high water method will stub a row in all tables with a `default_matcher` column prentending to have a record from `time` # This enabled partial forklift funs which will only extract data "later than X" # TODO: assumes all columns have a default NULL setting def self.write_high_water_mark(db, time, matcher=db.default_matcher) db.tables.each do |table| columns, types = db.columns(table, db.current_database, true) if columns.include?(matcher) row = {} i = 0 while( i < columns.length ) if(columns[i] == matcher) row[columns[i]] = time.to_s(:db) elsif( types[i] =~ /text/ ) row[columns[i]] = "~~stub~~" elsif( types[i] =~ /varchar/ ) row[columns[i]] = "~~stub~~".to_sym elsif( types[i] =~ /float/ || types[i] =~ /int/ ) row[columns[i]] = 0 elsif( types[i] =~ /datetime/ || types[i] =~ /timetsamp/ ) row[columns[i]] = time.to_s(:db) elsif( types[i] =~ /date/ ) row[columns[i]] = time.to_s(:db).split(" ").first else row[columns[i]] = "NULL" end i = i + 1 end db.write([row], table) end end end end end end