require 'rest_client' require 'sequel' require 'zlib' require 'taps/progress_bar' require 'taps/config' require 'taps/utils' require 'taps/data_stream' # disable warnings, rest client makes a lot of noise right now $VERBOSE = nil module Taps class Operation attr_reader :database_url, :remote_url, :opts attr_reader :session_uri def initialize(database_url, remote_url, opts={}) @database_url = database_url @remote_url = remote_url @opts = opts @exiting = false @session_uri = opts[:session_uri] end def file_prefix "op" end def indexes_first? !!opts[:indexes_first] end def table_filter opts[:table_filter] end def apply_table_filter(tables) return tables unless table_filter re = Regexp.new(table_filter) if tables.kind_of?(Hash) ntables = {} tables.each do |t, d| unless re.match(t.to_s).nil? ntables[t] = d end end ntables else tables.reject { |t| re.match(t.to_s).nil? } end end def log Taps.log end def store_session file = "#{file_prefix}_#{Time.now.strftime("%Y%m%d%H%M")}.dat" puts "\nSaving session to #{file}.." File.open(file, 'w') do |f| f.write(to_hash.to_json) end end def to_hash { :klass => self.class.to_s, :database_url => database_url, :remote_url => remote_url, :session_uri => session_uri, :stream_state => stream_state, :completed_tables => completed_tables, :table_filter => table_filter, } end def exiting? !!@exiting end def setup_signal_trap trap("INT") { puts "\nCompleting current action..." @exiting = true } trap("TERM") { puts "\nCompleting current action..." @exiting = true } end def resuming? opts[:resume] == true end def default_chunksize opts[:default_chunksize] end def completed_tables opts[:completed_tables] ||= [] end def stream_state opts[:stream_state] ||= {} end def stream_state=(val) opts[:stream_state] = val end def compression_disabled? !!opts[:disable_compression] end def db @db ||= Sequel.connect(database_url) end def server @server ||= RestClient::Resource.new(remote_url) end def session_resource @session_resource ||= begin @session_uri ||= server['sessions'].post('', http_headers).to_s server[@session_uri] end end def set_session(uri) session_uri = uri @session_resource = server[session_uri] end def close_session @session_resource.delete(http_headers) if @session_resource end def safe_url(url) url.sub(/\/\/(.+?)?:(.*?)@/, '//\1:[hidden]@') end def safe_remote_url safe_url(remote_url) end def safe_database_url safe_url(database_url) end def http_headers(extra = {}) base = { :taps_version => Taps.version } if compression_disabled? base[:accept_encoding] = "" else base[:accept_encoding] = "gzip, deflate" end base.merge(extra) end def format_number(num) num.to_s.gsub(/(\d)(?=(\d\d\d)+(?!\d))/, "\\1,") end def verify_server begin server['/'].get(http_headers) rescue RestClient::RequestFailed => e if e.http_code == 417 puts "#{safe_remote_url} is running a different minor version of taps." puts "#{e.response.to_s}" exit(1) else raise end rescue RestClient::Unauthorized puts "Bad credentials given for #{safe_remote_url}" exit(1) rescue Errno::ECONNREFUSED puts "Can't connect to #{safe_remote_url}. Please check that it's running" exit(1) end end def self.factory(type, database_url, remote_url, opts) type = :resume if opts[:resume] klass = case type when :pull then Taps::Pull when :push then Taps::Push when :resume then eval(opts[:klass]) else raise "Unknown Operation Type -> #{type}" end klass.new(database_url, remote_url, opts) end end class Pull < Operation def file_prefix "pull" end def to_hash super.merge(:remote_tables_info => remote_tables_info) end def run verify_server begin unless resuming? pull_schema pull_indexes if indexes_first? end setup_signal_trap pull_partial_data if resuming? pull_data pull_indexes unless indexes_first? pull_reset_sequences close_session rescue RestClient::Exception => e store_session if e.respond_to?(:response) puts "!!! Caught Server Exception" puts "HTTP CODE: #{e.http_code}" puts "#{e.response.to_s}" exit(1) else raise end end end def pull_schema puts "Receiving schema" progress = ProgressBar.new('Schema', tables.size) tables.each do |table_name, count| schema_data = session_resource['pull/schema'].post({:table_name => table_name}, http_headers).to_s log.debug "Table: #{table_name}\n#{schema_data}\n" output = Taps::Utils.load_schema(database_url, schema_data) puts output if output progress.inc(1) end progress.finish end def pull_data puts "Receiving data" puts "#{tables.size} tables, #{format_number(record_count)} records" tables.each do |table_name, count| progress = ProgressBar.new(table_name.to_s, count) stream = Taps::DataStream.factory(db, { :chunksize => default_chunksize, :table_name => table_name }) pull_data_from_table(stream, progress) end end def pull_partial_data return if stream_state == {} table_name = stream_state[:table_name] record_count = tables[table_name.to_s] puts "Resuming #{table_name}, #{format_number(record_count)} records" progress = ProgressBar.new(table_name.to_s, record_count) stream = Taps::DataStream.factory(db, stream_state) pull_data_from_table(stream, progress) end def pull_data_from_table(stream, progress) loop do begin if exiting? store_session exit 0 end size = stream.fetch_remote(session_resource['pull/table'], http_headers) break if stream.complete? progress.inc(size) unless exiting? stream.error = false self.stream_state = stream.to_hash rescue DataStream::CorruptedData => e puts "Corrupted Data Received #{e.message}, retrying..." stream.error = true next end end progress.finish completed_tables << stream.table_name.to_s self.stream_state = {} end def tables h = {} remote_tables_info.each do |table_name, count| next if completed_tables.include?(table_name.to_s) h[table_name.to_s] = count end h end def record_count @record_count ||= remote_tables_info.values.inject(0) { |a,c| a += c } end def remote_tables_info opts[:remote_tables_info] ||= fetch_remote_tables_info end def fetch_remote_tables_info retries = 0 max_retries = 10 begin tables = JSON.load(session_resource['pull/table_names'].get(http_headers).to_s) rescue RestClient::Exception retries += 1 retry if retries <= max_retries puts "Unable to fetch tables information from #{remote_url}. Please check the server log." exit(1) end data = {} apply_table_filter(tables).each do |table_name| retries = 0 begin count = session_resource['pull/table_count'].post({:table => table_name}, http_headers).to_s.to_i data[table_name] = count rescue RestClient::Exception retries += 1 retry if retries <= max_retries puts "Unable to fetch tables information from #{remote_url}. Please check the server log." exit(1) end end data end def pull_indexes puts "Receiving indexes" idxs = JSON.parse(session_resource['pull/indexes'].get(http_headers).to_s) apply_table_filter(idxs).each do |table, indexes| next unless indexes.size > 0 progress = ProgressBar.new(table, indexes.size) indexes.each do |idx| output = Taps::Utils.load_indexes(database_url, idx) puts output if output progress.inc(1) end progress.finish end end def pull_reset_sequences puts "Resetting sequences" output = Taps::Utils.schema_bin(:reset_db_sequences, database_url) puts output if output end end class Push < Operation def file_prefix "push" end def to_hash super.merge(:local_tables_info => local_tables_info) end def run verify_server begin unless resuming? push_schema push_indexes if indexes_first? end setup_signal_trap push_partial_data if resuming? push_data push_indexes unless indexes_first? push_reset_sequences close_session rescue RestClient::Exception => e store_session if e.respond_to?(:response) puts "!!! Caught Server Exception" puts "HTTP CODE: #{e.http_code}" puts "#{e.response.to_s}" exit(1) else raise end end end def push_indexes idxs = JSON.parse(Taps::Utils.schema_bin(:indexes_individual, database_url)) return unless idxs.size > 0 puts "Sending indexes" apply_table_filter(idxs).each do |table, indexes| next unless indexes.size > 0 progress = ProgressBar.new(table, indexes.size) indexes.each do |idx| session_resource['push/indexes'].post(idx, http_headers) progress.inc(1) end progress.finish end end def push_schema puts "Sending schema" progress = ProgressBar.new('Schema', tables.size) tables.each do |table, count| schema_data = Taps::Utils.schema_bin(:dump_table, database_url, table) log.debug "Table: #{table}\n#{schema_data}\n" session_resource['push/schema'].post(schema_data, http_headers) progress.inc(1) end progress.finish end def push_reset_sequences puts "Resetting sequences" session_resource['push/reset_sequences'].post('', http_headers) end def push_partial_data return if stream_state == {} table_name = stream_state[:table_name] record_count = tables[table_name.to_s] puts "Resuming #{table_name}, #{format_number(record_count)} records" progress = ProgressBar.new(table_name.to_s, record_count) stream = Taps::DataStream.factory(db, stream_state) push_data_from_table(stream, progress) end def push_data puts "Sending data" puts "#{tables.size} tables, #{format_number(record_count)} records" tables.each do |table_name, count| stream = Taps::DataStream.factory(db, :table_name => table_name, :chunksize => default_chunksize) progress = ProgressBar.new(table_name.to_s, count) push_data_from_table(stream, progress) end end def push_data_from_table(stream, progress) loop do if exiting? store_session exit 0 end row_size = 0 chunksize = stream.state[:chunksize] chunksize = Taps::Utils.calculate_chunksize(chunksize) do |c| stream.state[:chunksize] = c encoded_data, row_size, elapsed_time = stream.fetch break if stream.complete? data = { :state => stream.to_hash, :checksum => Taps::Utils.checksum(encoded_data).to_s } begin content, content_type = Taps::Multipart.create do |r| r.attach :name => :encoded_data, :payload => encoded_data, :content_type => 'application/octet-stream' r.attach :name => :json, :payload => data.to_json, :content_type => 'application/json' end session_resource['push/table'].post(content, http_headers(:content_type => content_type)) self.stream_state = stream.to_hash rescue RestClient::RequestFailed => e # retry the same data, it got corrupted somehow. if e.http_code == 412 next end raise end elapsed_time end stream.state[:chunksize] = chunksize progress.inc(row_size) stream.increment(row_size) break if stream.complete? end progress.finish completed_tables << stream.table_name.to_s self.stream_state = {} end def local_tables_info opts[:local_tables_info] ||= fetch_local_tables_info end def tables h = {} local_tables_info.each do |table_name, count| next if completed_tables.include?(table_name.to_s) h[table_name.to_s] = count end h end def record_count @record_count ||= local_tables_info.values.inject(0) { |a,c| a += c } end def fetch_local_tables_info tables_with_counts = {} db.tables.each do |table| tables_with_counts[table] = db[table.to_sym.identifier].count end apply_table_filter(tables_with_counts) end end end