require 'rest_client' require 'sequel' require 'zlib' require 'taps/progress_bar' require 'taps/config' require 'taps/utils' require 'taps/data_stream' require 'taps/errors' # disable warnings, rest client makes a lot of noise right now $VERBOSE = nil module Taps class Operation attr_reader :database_url, :remote_url, :opts attr_reader :session_uri def initialize(database_url, remote_url, opts = {}) @database_url = database_url @remote_url = remote_url @opts = opts @exiting = false @session_uri = opts[:session_uri] end def file_prefix 'op' end def skip_schema? !!opts[:skip_schema] end def indexes_first? !!opts[:indexes_first] end def table_filter opts[:table_filter] end def exclude_tables opts[:exclude_tables] || [] end def apply_table_filter(tables) return tables unless table_filter || exclude_tables re = table_filter ? Regexp.new(table_filter) : nil if tables.is_a?(Hash) ntables = {} tables.each do |t, d| if !exclude_tables.include?(t.to_s) && (!re || !re.match(t.to_s).nil?) ntables[t] = d end end ntables else tables.reject { |t| exclude_tables.include?(t.to_s) || (re && re.match(t.to_s).nil?) } end end def log Taps.log end def store_session file = "#{file_prefix}_#{Time.now.strftime('%Y%m%d%H%M')}.dat" puts "\nSaving session to #{file}.." File.open(file, 'w') do |f| f.write(::OkJson.encode(to_hash)) end end def to_hash { klass: self.class.to_s, database_url: database_url, remote_url: remote_url, session_uri: session_uri, stream_state: stream_state, completed_tables: completed_tables, table_filter: table_filter } end def exiting? !!@exiting end def setup_signal_trap trap('INT') do puts "\nCompleting current action..." @exiting = true end trap('TERM') do puts "\nCompleting current action..." @exiting = true end end def resuming? opts[:resume] == true end def default_chunksize opts[:default_chunksize] end def completed_tables opts[:completed_tables] ||= [] end def stream_state opts[:stream_state] ||= {} end def stream_state=(val) opts[:stream_state] = val end def compression_disabled? !!opts[:disable_compression] end def db @db ||= Sequel.connect(database_url) end def server @server ||= RestClient::Resource.new(remote_url) end def session_resource @session_resource ||= begin @session_uri ||= server['sessions'].post('', http_headers).to_s server[@session_uri] end end def set_session(uri) session_uri = uri @session_resource = server[session_uri] end def close_session @session_resource.delete(http_headers) if @session_resource end def safe_url(url) url.sub(/\/\/(.+?)?:(.*?)@/, '//\1:[hidden]@') end def safe_remote_url safe_url(remote_url) end def safe_database_url safe_url(database_url) end def http_headers(extra = {}) base = { taps_version: Taps::Version.current } base[:accept_encoding] = if compression_disabled? '' else 'gzip, deflate' end base.merge(extra) end def format_number(num) num.to_s.gsub(/(\d)(?=(\d\d\d)+(?!\d))/, '\\1,') end def verify_server server['/'].get(http_headers) rescue RestClient::RequestFailed => e if e.http_code == 417 puts "#{safe_remote_url} is running a different minor version of taps." puts e.response.to_s exit(1) else raise end rescue RestClient::Unauthorized puts "Bad credentials given for #{safe_remote_url}" exit(1) rescue Errno::ECONNREFUSED puts "Can't connect to #{safe_remote_url}. Please check that it's running" exit(1) end def catch_errors verify_server begin yield close_session rescue RestClient::Exception, Taps::BaseError => e store_session if e.is_a?(Taps::BaseError) puts '!!! Caught Server Exception' puts "#{e.class}: #{e.message}" puts "\n#{e.original_backtrace}" if e.original_backtrace exit(1) elsif e.respond_to?(:response) puts '!!! Caught Server Exception' puts "HTTP CODE: #{e.http_code}" puts e.response.to_s exit(1) else raise end end end def self.factory(type, database_url, remote_url, opts) type = :resume if opts[:resume] klass = case type when :pull then Taps::Pull when :push then Taps::Push when :resume then eval(opts[:klass]) else raise "Unknown Operation Type -> #{type}" end klass.new(database_url, remote_url, opts) end end class Pull < Operation def file_prefix 'pull' end def to_hash super.merge(remote_tables_info: remote_tables_info) end def run catch_errors do unless resuming? pull_schema unless skip_schema? pull_indexes if indexes_first? && !skip_schema? end setup_signal_trap pull_partial_data if resuming? pull_data pull_indexes if !indexes_first? && !skip_schema? pull_reset_sequences end end def pull_schema puts 'Receiving schema' progress = ProgressBar.new('Schema', tables.size) tables.each do |table_name, _count| schema_data = session_resource['pull/schema'].post({ table_name: table_name }, http_headers).to_s log.debug "Table: #{table_name}\n#{schema_data}\n" output = Taps::Utils.load_schema(database_url, schema_data) output = output.to_s.strip puts output unless output.empty? progress.inc(1) end progress.finish end def pull_data puts 'Receiving data' puts "#{tables.size} tables, #{format_number(record_count)} records" tables.each do |table_name, count| progress = ProgressBar.new(table_name.to_s, count) stream = Taps::DataStream.factory(db, chunksize: default_chunksize, table_name: table_name) pull_data_from_table(stream, progress) end end def pull_partial_data return if stream_state == {} table_name = stream_state[:table_name] record_count = tables[table_name.to_s] puts "Resuming #{table_name}, #{format_number(record_count)} records" progress = ProgressBar.new(table_name.to_s, record_count) stream = Taps::DataStream.factory(db, stream_state) pull_data_from_table(stream, progress) end def pull_data_from_table(stream, progress) loop do begin if exiting? store_session exit 0 end size = stream.fetch_remote(session_resource['pull/table'], http_headers) break if stream.complete? progress.inc(size) unless exiting? stream.error = false self.stream_state = stream.to_hash rescue Taps::CorruptedData => e puts "Corrupted Data Received #{e.message}, retrying..." stream.error = true next end end progress.finish completed_tables << stream.table_name.to_s self.stream_state = {} end def tables h = {} remote_tables_info.each do |table_name, count| next if completed_tables.include?(table_name.to_s) h[table_name.to_s] = count end h end def record_count @record_count ||= remote_tables_info.values.inject(0) { |a, c| a += c } end def remote_tables_info opts[:remote_tables_info] ||= fetch_remote_tables_info end def fetch_remote_tables_info retries = 0 max_retries = 10 begin tables = ::OkJson.decode(session_resource['pull/table_names'].get(http_headers).to_s) rescue RestClient::Exception retries += 1 retry if retries <= max_retries puts "Unable to fetch tables information from #{remote_url}. Please check the server log." exit(1) end data = {} apply_table_filter(tables).each do |table_name| retries = 0 begin count = Integer(session_resource['pull/table_count'].post({ table: table_name }, http_headers).to_s) data[table_name] = count rescue RestClient::Exception retries += 1 retry if retries <= max_retries puts "Unable to fetch tables information from #{remote_url}. Please check the server log." exit(1) end end data end def pull_indexes puts 'Receiving indexes' idxs = ::OkJson.decode(session_resource['pull/indexes'].get(http_headers).to_s) apply_table_filter(idxs).each do |table, indexes| next if indexes.empty? progress = ProgressBar.new(table, indexes.size) indexes.each do |idx| output = Taps::Utils.load_indexes(database_url, idx) output = output.to_s.strip puts output unless output.empty? progress.inc(1) end progress.finish end end def pull_reset_sequences puts 'Resetting sequences' output = Taps::Utils.schema_bin(:reset_db_sequences, database_url) output = output.to_s.strip puts output unless output.empty? end end class Push < Operation def file_prefix 'push' end def to_hash super.merge(local_tables_info: local_tables_info) end def run catch_errors do unless resuming? push_schema unless skip_schema? push_indexes if indexes_first? && !skip_schema? end setup_signal_trap push_partial_data if resuming? push_data push_indexes if !indexes_first? && !skip_schema? push_reset_sequences end end def push_indexes idxs = ::OkJson.decode(Taps::Utils.schema_bin(:indexes_individual, database_url)) return if idxs.empty? puts 'Sending indexes' apply_table_filter(idxs).each do |table, indexes| next if indexes.empty? progress = ProgressBar.new(table, indexes.size) indexes.each do |idx| session_resource['push/indexes'].post(idx, http_headers) progress.inc(1) end progress.finish end end def push_schema puts 'Sending schema' progress = ProgressBar.new('Schema', tables.size) tables.each do |table, _count| schema_data = Taps::Utils.schema_bin(:dump_table, database_url, table) log.debug "Table: #{table}\n#{schema_data}\n" session_resource['push/schema'].post(schema_data, http_headers) progress.inc(1) end progress.finish end def push_reset_sequences puts 'Resetting sequences' session_resource['push/reset_sequences'].post('', http_headers) end def push_partial_data return if stream_state == {} table_name = stream_state[:table_name] record_count = tables[table_name.to_s] puts "Resuming #{table_name}, #{format_number(record_count)} records" progress = ProgressBar.new(table_name.to_s, record_count) stream = Taps::DataStream.factory(db, stream_state) push_data_from_table(stream, progress) end def push_data puts 'Sending data' puts "#{tables.size} tables, #{format_number(record_count)} records" tables.each do |table_name, count| stream = Taps::DataStream.factory(db, table_name: table_name, chunksize: default_chunksize) progress = ProgressBar.new(table_name.to_s, count) push_data_from_table(stream, progress) end end def push_data_from_table(stream, progress) loop do if exiting? store_session exit 0 end row_size = 0 chunksize = stream.state[:chunksize] begin chunksize = Taps::Utils.calculate_chunksize(chunksize) do |c| stream.state[:chunksize] = c.to_i encoded_data, row_size, elapsed_time = nil d1 = c.time_delta do encoded_data, row_size, elapsed_time = stream.fetch end break if stream.complete? data = nil d2 = c.time_delta do data = { state: stream.to_hash, checksum: Taps::Utils.checksum(encoded_data).to_s } end begin content, content_type = nil d3 = c.time_delta do content, content_type = Taps::Multipart.create do |r| r.attach name: :encoded_data, payload: encoded_data, content_type: 'application/octet-stream' r.attach name: :json, payload: ::OkJson.encode(data), content_type: 'application/json' end end session_resource['push/table'].post(content, http_headers(content_type: content_type)) self.stream_state = stream.to_hash rescue => e Taps::Utils.reraise_server_exception(e) end c.idle_secs = (d1 + d2 + d3) elapsed_time end rescue Taps::CorruptedData => e # retry the same data, it got corrupted somehow. next rescue Taps::DuplicatePrimaryKeyError => e # verify the stream and retry it stream = stream.verify_remote_stream(session_resource['push/verify_stream'], http_headers) next end stream.state[:chunksize] = chunksize progress.inc(row_size) stream.increment(row_size) break if stream.complete? end progress.finish completed_tables << stream.table_name.to_s self.stream_state = {} end def local_tables_info opts[:local_tables_info] ||= fetch_local_tables_info end def tables h = {} local_tables_info.each do |table_name, count| next if completed_tables.include?(table_name.to_s) h[table_name.to_s] = count end h end def record_count @record_count ||= local_tables_info.values.inject(0) { |a, c| a += c } end def fetch_local_tables_info tables_with_counts = {} db.tables.each do |table| tables_with_counts[table] = db[table.to_sym.identifier].count end apply_table_filter(tables_with_counts) end end end