require 'rest_client' require 'sequel' require 'zlib' require 'taps/progress_bar' require 'taps/config' require 'taps/utils' require 'taps/data_stream' require 'taps/errors' # disable warnings, rest client makes a lot of noise right now $VERBOSE = nil module Taps class Operation attr_reader :database_url, :remote_url, :opts attr_reader :session_uri def initialize(database_url, remote_url, opts={}) @database_url = database_url @remote_url = remote_url @opts = opts @exiting = false @session_uri = opts[:session_uri] end def file_prefix "op" end def skip_schema? !!opts[:skip_schema] end def indexes_first? !!opts[:indexes_first] end def table_filter opts[:table_filter] end def exclude_tables opts[:exclude_tables] || [] end def apply_table_filter(tables) return tables unless table_filter || exclude_tables re = table_filter ? Regexp.new(table_filter) : nil if tables.kind_of?(Hash) ntables = {} tables.each do |t, d| if !exclude_tables.include?(t.to_s) && (!re || !re.match(t.to_s).nil?) ntables[t] = d end end ntables else tables.reject { |t| exclude_tables.include?(t.to_s) || (re && re.match(t.to_s).nil?) } end end def log Taps.log end def store_session file = "#{file_prefix}_#{Time.now.strftime("%Y%m%d%H%M")}.dat" puts "\nSaving session to #{file}.." File.open(file, 'w') do |f| f.write(::OkJson.encode(to_hash)) end end def to_hash { :klass => self.class.to_s, :database_url => database_url, :remote_url => remote_url, :session_uri => session_uri, :stream_state => stream_state, :completed_tables => completed_tables, :table_filter => table_filter, } end def exiting? !!@exiting end def setup_signal_trap trap("INT") { puts "\nCompleting current action..." @exiting = true } trap("TERM") { puts "\nCompleting current action..." @exiting = true } end def resuming? opts[:resume] == true end def default_chunksize opts[:default_chunksize] end def completed_tables opts[:completed_tables] ||= [] end def stream_state opts[:stream_state] ||= {} end def stream_state=(val) opts[:stream_state] = val end def compression_disabled? !!opts[:disable_compression] end def db @db ||= Sequel.connect(database_url) end def server @server ||= RestClient::Resource.new(remote_url) end def session_resource @session_resource ||= begin @session_uri ||= server['sessions'].post('', http_headers).to_s server[@session_uri] end end def set_session(uri) session_uri = uri @session_resource = server[session_uri] end def close_session @session_resource.delete(http_headers) if @session_resource end def safe_url(url) url.sub(/\/\/(.+?)?:(.*?)@/, '//\1:[hidden]@') end def safe_remote_url safe_url(remote_url) end def safe_database_url safe_url(database_url) end def http_headers(extra = {}) base = { :taps_version => Taps.version } if compression_disabled? base[:accept_encoding] = "" else base[:accept_encoding] = "gzip, deflate" end base.merge(extra) end def format_number(num) num.to_s.gsub(/(\d)(?=(\d\d\d)+(?!\d))/, "\\1,") end def verify_server begin server['/'].get(http_headers) rescue RestClient::RequestFailed => e if e.http_code == 417 puts "#{safe_remote_url} is running a different minor version of taps." puts "#{e.response.to_s}" exit(1) else raise end rescue RestClient::Unauthorized puts "Bad credentials given for #{safe_remote_url}" exit(1) rescue Errno::ECONNREFUSED puts "Can't connect to #{safe_remote_url}. Please check that it's running" exit(1) end end def catch_errors(&blk) verify_server begin blk.call close_session rescue RestClient::Exception, Taps::BaseError => e store_session if e.kind_of?(Taps::BaseError) puts "!!! Caught Server Exception" puts "#{e.class}: #{e.message}" puts "\n#{e.original_backtrace}" if e.original_backtrace exit(1) elsif e.respond_to?(:response) puts "!!! Caught Server Exception" puts "HTTP CODE: #{e.http_code}" puts "#{e.response.to_s}" exit(1) else raise end end end def self.factory(type, database_url, remote_url, opts) type = :resume if opts[:resume] klass = case type when :pull then Taps::Pull when :push then Taps::Push when :resume then eval(opts[:klass]) else raise "Unknown Operation Type -> #{type}" end klass.new(database_url, remote_url, opts) end end class Pull < Operation def file_prefix "pull" end def to_hash super.merge(:remote_tables_info => remote_tables_info) end def run catch_errors do unless resuming? pull_schema if !skip_schema? pull_indexes if indexes_first? && !skip_schema? end setup_signal_trap pull_partial_data if resuming? pull_data pull_indexes if !indexes_first? && !skip_schema? pull_reset_sequences end end def pull_schema puts "Receiving schema" progress = ProgressBar.new('Schema', tables.size) tables.each do |table_name, count| schema_data = session_resource['pull/schema'].post({:table_name => table_name}, http_headers).to_s log.debug "Table: #{table_name}\n#{schema_data}\n" output = Taps::Utils.load_schema(database_url, schema_data) output = output.to_s.strip puts output unless output.empty? progress.inc(1) end progress.finish end def pull_data puts "Receiving data" puts "#{tables.size} tables, #{format_number(record_count)} records" tables.each do |table_name, count| progress = ProgressBar.new(table_name.to_s, count) stream = Taps::DataStream.factory(db, { :chunksize => default_chunksize, :table_name => table_name }) pull_data_from_table(stream, progress) end end def pull_partial_data return if stream_state == {} table_name = stream_state[:table_name] record_count = tables[table_name.to_s] puts "Resuming #{table_name}, #{format_number(record_count)} records" progress = ProgressBar.new(table_name.to_s, record_count) stream = Taps::DataStream.factory(db, stream_state) pull_data_from_table(stream, progress) end def pull_data_from_table(stream, progress) loop do begin if exiting? store_session exit 0 end size = stream.fetch_remote(session_resource['pull/table'], http_headers) break if stream.complete? progress.inc(size) unless exiting? stream.error = false self.stream_state = stream.to_hash rescue Taps::CorruptedData => e puts "Corrupted Data Received #{e.message}, retrying..." stream.error = true next end end progress.finish completed_tables << stream.table_name.to_s self.stream_state = {} end def tables h = {} remote_tables_info.each do |table_name, count| next if completed_tables.include?(table_name.to_s) h[table_name.to_s] = count end h end def record_count @record_count ||= remote_tables_info.values.inject(0) { |a,c| a += c } end def remote_tables_info opts[:remote_tables_info] ||= fetch_remote_tables_info end def fetch_remote_tables_info retries = 0 max_retries = 10 begin tables = ::OkJson.decode(session_resource['pull/table_names'].get(http_headers).to_s) rescue RestClient::Exception retries += 1 retry if retries <= max_retries puts "Unable to fetch tables information from #{remote_url}. Please check the server log." exit(1) end data = {} apply_table_filter(tables).each do |table_name| retries = 0 begin count = Integer(session_resource['pull/table_count'].post({:table => table_name}, http_headers).to_s) data[table_name] = count rescue RestClient::Exception retries += 1 retry if retries <= max_retries puts "Unable to fetch tables information from #{remote_url}. Please check the server log." exit(1) end end data end def pull_indexes puts "Receiving indexes" idxs = ::OkJson.decode(session_resource['pull/indexes'].get(http_headers).to_s) apply_table_filter(idxs).each do |table, indexes| next unless indexes.size > 0 progress = ProgressBar.new(table, indexes.size) indexes.each do |idx| output = Taps::Utils.load_indexes(database_url, idx) output = output.to_s.strip puts output unless output.empty? progress.inc(1) end progress.finish end end def pull_reset_sequences puts "Resetting sequences" output = Taps::Utils.schema_bin(:reset_db_sequences, database_url) output = output.to_s.strip puts output unless output.empty? end end class Push < Operation def file_prefix "push" end def to_hash super.merge(:local_tables_info => local_tables_info) end def run catch_errors do unless resuming? push_schema if !skip_schema? push_indexes if indexes_first? && !skip_schema? end setup_signal_trap push_partial_data if resuming? push_data push_indexes if !indexes_first? && !skip_schema? push_reset_sequences end end def push_indexes idxs = ::OkJson.decode(Taps::Utils.schema_bin(:indexes_individual, database_url)) return unless idxs.size > 0 puts "Sending indexes" apply_table_filter(idxs).each do |table, indexes| next unless indexes.size > 0 progress = ProgressBar.new(table, indexes.size) indexes.each do |idx| session_resource['push/indexes'].post(idx, http_headers) progress.inc(1) end progress.finish end end def push_schema puts "Sending schema" progress = ProgressBar.new('Schema', tables.size) tables.each do |table, count| schema_data = Taps::Utils.schema_bin(:dump_table, database_url, table) log.debug "Table: #{table}\n#{schema_data}\n" session_resource['push/schema'].post(schema_data, http_headers) progress.inc(1) end progress.finish end def push_reset_sequences puts "Resetting sequences" session_resource['push/reset_sequences'].post('', http_headers) end def push_partial_data return if stream_state == {} table_name = stream_state[:table_name] record_count = tables[table_name.to_s] puts "Resuming #{table_name}, #{format_number(record_count)} records" progress = ProgressBar.new(table_name.to_s, record_count) stream = Taps::DataStream.factory(db, stream_state) push_data_from_table(stream, progress) end def push_data puts "Sending data" puts "#{tables.size} tables, #{format_number(record_count)} records" tables.each do |table_name, count| stream = Taps::DataStream.factory(db, :table_name => table_name, :chunksize => default_chunksize) progress = ProgressBar.new(table_name.to_s, count) push_data_from_table(stream, progress) end end def push_data_from_table(stream, progress) loop do if exiting? store_session exit 0 end row_size = 0 chunksize = stream.state[:chunksize] begin chunksize = Taps::Utils.calculate_chunksize(chunksize) do |c| stream.state[:chunksize] = c.to_i encoded_data, row_size, elapsed_time = nil d1 = c.time_delta do encoded_data, row_size, elapsed_time = stream.fetch end break if stream.complete? data = nil d2 = c.time_delta do data = { :state => stream.to_hash, :checksum => Taps::Utils.checksum(encoded_data).to_s } end begin content, content_type = nil d3 = c.time_delta do content, content_type = Taps::Multipart.create do |r| r.attach :name => :encoded_data, :payload => encoded_data, :content_type => 'application/octet-stream' r.attach :name => :json, :payload => ::OkJson.encode(data), :content_type => 'application/json' end end session_resource['push/table'].post(content, http_headers(:content_type => content_type)) self.stream_state = stream.to_hash rescue => e Taps::Utils.reraise_server_exception(e) end c.idle_secs = (d1 + d2 + d3) elapsed_time end rescue Taps::CorruptedData => e # retry the same data, it got corrupted somehow. next rescue Taps::DuplicatePrimaryKeyError => e # verify the stream and retry it stream = stream.verify_remote_stream(session_resource['push/verify_stream'], http_headers) next end stream.state[:chunksize] = chunksize progress.inc(row_size) stream.increment(row_size) break if stream.complete? end progress.finish completed_tables << stream.table_name.to_s self.stream_state = {} end def local_tables_info opts[:local_tables_info] ||= fetch_local_tables_info end def tables h = {} local_tables_info.each do |table_name, count| next if completed_tables.include?(table_name.to_s) h[table_name.to_s] = count end h end def record_count @record_count ||= local_tables_info.values.inject(0) { |a,c| a += c } end def fetch_local_tables_info tables_with_counts = {} db.tables.each do |table| tables_with_counts[table] = db[table.to_sym.identifier].count end apply_table_filter(tables_with_counts) end end end