require 'rest_client'
require 'sequel'
require 'zlib'

require 'taps/progress_bar'
require 'taps/config'
require 'taps/utils'
require 'taps/data_stream'

# disable warnings, rest client makes a lot of noise right now
$VERBOSE = nil

module Taps

class Operation
	attr_reader :database_url, :remote_url, :opts
	attr_reader :session_uri

	def initialize(database_url, remote_url, opts={})
		@database_url = database_url
		@remote_url = remote_url
		@opts = opts
		@exiting = false
		@session_uri = opts[:session_uri]
	end

	def file_prefix
		"op"
	end

	def table_filter
		opts[:table_filter]
	end

	def apply_table_filter(tables)
		return tables unless table_filter
		re = Regexp.new(table_filter)
		if tables.kind_of?(Hash)
			ntables = {}
			tables.each do |t, d|
				unless re.match(t.to_s).nil?
					ntables[t] = d
				end
			end
			ntables
		else
			tables.reject { |t| re.match(t.to_s).nil? }
		end
	end

	def log
		Taps.log
	end

	def store_session
		file = "#{file_prefix}_#{Time.now.strftime("%Y%m%d%H%M")}.dat"
		puts "Saving session to #{file}.."
		File.open(file, 'w') do |f|
			f.write(to_hash.to_json)
		end
	end

	def to_hash
		{
			:klass => self.class.to_s,
			:database_url => database_url,
			:remote_url => remote_url,
			:session_uri => session_uri,
			:stream_state => stream_state,
			:completed_tables => completed_tables,
			:table_filter => table_filter,
		}
	end

	def exiting?
		!!@exiting
	end

	def setup_signal_trap
		trap("INT") {
			puts "\nCompleting current action..."
			@exiting = true
		}

		trap("TERM") {
			puts "\nCompleting current action..."
			@exiting = true
		}
	end

	def resuming?
		opts[:resume] == true
	end

	def default_chunksize
		opts[:default_chunksize]
	end

	def completed_tables
		opts[:completed_tables] ||= []
	end

	def stream_state
		opts[:stream_state] ||= {}
	end

	def stream_state=(val)
		opts[:stream_state] = val
	end

	def compression_disabled?
		!!opts[:disable_compression]
	end

	def db
		@db ||= Sequel.connect(database_url)
	end

	def server
		@server ||= RestClient::Resource.new(remote_url)
	end

	def session_resource
		@session_resource ||= begin
			@session_uri ||= server['sessions'].post('', http_headers).to_s
			server[@session_uri]
		end
	end

	def set_session(uri)
		session_uri = uri
		@session_resource = server[session_uri]
	end

	def close_session
		@session_resource.delete(http_headers) if @session_resource
	end

	def safe_url(url)
		url.sub(/\/\/(.+?)?:(.*?)@/, '//\1:[hidden]@')
	end

	def safe_remote_url
		safe_url(remote_url)
	end

	def safe_database_url
		safe_url(database_url)
	end

	def http_headers(extra = {})
		base = { :taps_version => Taps.compatible_version }
		if compression_disabled?
			base[:accept_encoding] = ""
		else
			base[:accept_encoding] = "gzip, deflate"
		end
		base.merge(extra)
	end

	def format_number(num)
		num.to_s.gsub(/(\d)(?=(\d\d\d)+(?!\d))/, "\\1,")
	end

	def verify_server
		begin
			server['/'].get(http_headers)
		rescue RestClient::RequestFailed => e
			if e.http_code == 417
				puts "#{safe_remote_url} is running a different minor version of taps."
				puts "#{e.response.to_s}"
				exit(1)
			else
				raise
			end
		rescue RestClient::Unauthorized
			puts "Bad credentials given for #{safe_remote_url}"
			exit(1)
		rescue Errno::ECONNREFUSED
			puts "Can't connect to #{safe_remote_url}. Please check that it's running"
			exit(1)
		end
	end

	def self.factory(type, database_url, remote_url, opts)
		type = :resume if opts[:resume]
		klass = case type
			when :pull then Taps::Pull
			when :push then Taps::Push
			when :resume then eval(opts[:klass])
			else raise "Unknown Operation Type -> #{type}"
		end

		klass.new(database_url, remote_url, opts)
	end
end

class Pull < Operation
	def file_prefix
		"pull"
	end

	def to_hash
		super.merge(:remote_tables_info => remote_tables_info)
	end

	def run
		verify_server

		begin
			pull_schema unless resuming?

			setup_signal_trap

			pull_partial_data if resuming?

			pull_data
			pull_indexes
			pull_reset_sequences
			close_session
		rescue RestClient::Exception => e
			store_session
			if e.respond_to?(:response)
				puts "!!! Caught Server Exception"
				puts "HTTP CODE: #{e.http_code}"
				puts "#{e.response.to_s}"
				exit(1)
			else
				raise
			end
		end
	end

	def pull_schema
		puts "Receiving schema"

		tables.each do |table_name, count|
			schema_data = session_resource['pull/schema'].post({:table_name => table_name}, http_headers).to_s
			output = Taps::Utils.load_schema(database_url, schema_data)
			puts output if output
		end
	end

	def pull_data
		puts "Receiving data"

		puts "#{tables.size} tables, #{format_number(record_count)} records"

		tables.each do |table_name, count|
			progress = ProgressBar.new(table_name.to_s, count)
			stream = Taps::DataStream.factory(db, {
				:chunksize => default_chunksize,
				:table_name => table_name
			})
			pull_data_from_table(stream, progress)
		end
	end

	def pull_partial_data
		return if stream_state == {}

		table_name = stream_state[:table_name]
		record_count = tables[table_name.to_s]
		puts "Resuming #{table_name}, #{format_number(record_count)} records"

		progress = ProgressBar.new(table_name.to_s, record_count)
		stream = Taps::DataStream.factory(db, stream_state)
		pull_data_from_table(stream, progress)
	end

	def pull_data_from_table(stream, progress)
		loop do
			begin
				if exiting?
					store_session
					exit 0
				end

				size = stream.fetch_remote(session_resource['pull/table'], http_headers)
				break if stream.complete?
				progress.inc(size) unless exiting?
				stream.error = false
				self.stream_state = stream.to_hash
			rescue DataStream::CorruptedData => e
				puts "Corrupted Data Received #{e.message}, retrying..."
				stream.error = true
				next
			end
		end

		progress.finish
		completed_tables << stream.table_name.to_s
		self.stream_state = {}
	end

	def tables
		h = {}
		remote_tables_info.each do |table_name, count|
			next if completed_tables.include?(table_name.to_s)
			h[table_name.to_s] = count
		end
		h
	end

	def record_count
		@record_count ||= remote_tables_info.values.inject(0) { |a,c| a += c }
	end

	def remote_tables_info
		opts[:remote_tables_info] ||= fetch_remote_tables_info
	end

	def fetch_remote_tables_info
		retries = 0
		max_retries = 10
		begin
			tables = JSON.load(session_resource['pull/table_names'].get(http_headers).to_s)
		rescue RestClient::Exception
			retries += 1
			retry if retries <= max_retries
			puts "Unable to fetch tables information from #{remote_url}. Please check the server log."
			exit(1)
		end

		data = {}
		apply_table_filter(tables).each do |table_name|
			retries = 0
			begin
				count = session_resource['pull/table_count'].post({:table => table_name}, http_headers).to_s.to_i
				data[table_name] = count
			rescue RestClient::Exception
				retries += 1
				retry if retries <= max_retries
				puts "Unable to fetch tables information from #{remote_url}. Please check the server log."
				exit(1)
			end
		end
		data
	end

	def pull_indexes
		puts "Receiving indexes"

		idxs = JSON.parse(session_resource['pull/indexes'].get(http_headers).to_s)

		apply_table_filter(idxs).each do |table, indexes|
			next unless indexes.size > 0
			progress = ProgressBar.new(table, indexes.size)
			indexes.each do |idx|
				output = Taps::Utils.load_indexes(database_url, idx)
				puts output if output
				progress.inc(1)
			end
			progress.finish
		end
	end

	def pull_reset_sequences
		puts "Resetting sequences"

		output = Taps::Utils.schema_bin(:reset_db_sequences, database_url)
		puts output if output
	end
end

class Push < Operation
	def file_prefix
		"push"
	end

	def to_hash
		super.merge(:local_tables_info => local_tables_info)
	end

	def run
		verify_server
		begin
			push_schema unless resuming?

			setup_signal_trap

			push_partial_data if resuming?

			push_data
			push_indexes
			push_reset_sequences
			close_session
		rescue RestClient::Exception => e
			store_session
			if e.respond_to?(:response)
				puts "!!! Caught Server Exception"
				puts "HTTP CODE: #{e.http_code}"
				puts "#{e.response.to_s}"
				exit(1)
			else
				raise
			end
		end
	end

	def push_indexes
		idxs = JSON.parse(Taps::Utils.schema_bin(:indexes_individual, database_url))

		return unless idxs.size > 0

		puts "Sending indexes"

		apply_table_filter(idxs).each do |table, indexes|
			progress = ProgressBar.new(table, indexes.size)
			indexes.each do |idx|
				session_resource['push/indexes'].post(idx, http_headers)
				progress.inc(1)
			end
			progress.finish
		end
	end

	def push_schema
		puts "Sending schema"

		tables.each do |table, count|
			schema_data = Taps::Utils.schema_bin(:dump_table, database_url, table)
			session_resource['push/schema'].post(schema_data, http_headers)
		end
	end

	def push_reset_sequences
		puts "Resetting sequences"

		session_resource['push/reset_sequences'].post('', http_headers)
	end

	def push_partial_data
		return if stream_state == {}

		table_name = stream_state[:table_name]
		record_count = tables[table_name.to_s]
		puts "Resuming #{table_name}, #{format_number(record_count)} records"
		progress = ProgressBar.new(table_name.to_s, record_count)
		stream = Taps::DataStream.factory(db, stream_state)
		push_data_from_table(stream, progress)
	end

	def push_data
		puts "Sending data"

		puts "#{tables.size} tables, #{format_number(record_count)} records"

		tables.each do |table_name, count|
			stream = Taps::DataStream.factory(db,
				:table_name => table_name,
				:chunksize => default_chunksize)
			progress = ProgressBar.new(table_name.to_s, count)
			push_data_from_table(stream, progress)
		end
	end

	def push_data_from_table(stream, progress)
		loop do
			if exiting?
				store_session
				exit 0
			end

			row_size = 0
			chunksize = stream.state[:chunksize]
			chunksize = Taps::Utils.calculate_chunksize(chunksize) do |c|
				stream.state[:chunksize] = c
				encoded_data, row_size, elapsed_time = stream.fetch
				break if stream.complete?

				data = {
					:state => stream.to_hash,
					:checksum => Taps::Utils.checksum(encoded_data).to_s
				}

				begin
					content, content_type = Taps::Multipart.create do |r|
						r.attach :name => :encoded_data,
							:payload => encoded_data,
							:content_type => 'application/octet-stream'
						r.attach :name => :json,
							:payload => data.to_json,
							:content_type => 'application/json'
					end
					session_resource['push/table'].post(content, http_headers(:content_type => content_type))
					self.stream_state = stream.to_hash
				rescue RestClient::RequestFailed => e
					# retry the same data, it got corrupted somehow.
					if e.http_code == 412
						next
					end
					raise
				end
				elapsed_time
			end
			stream.state[:chunksize] = chunksize

			progress.inc(row_size)

			stream.increment(row_size)
			break if stream.complete?
		end

		progress.finish
		completed_tables << stream.table_name.to_s
		self.stream_state = {}
	end

	def local_tables_info
		opts[:local_tables_info] ||= fetch_local_tables_info
	end

	def tables
		h = {}
		local_tables_info.each do |table_name, count|
			next if completed_tables.include?(table_name.to_s)
			h[table_name.to_s] = count
		end
		h
	end

	def record_count
		@record_count ||= local_tables_info.values.inject(0) { |a,c| a += c }
	end

	def fetch_local_tables_info
		tables_with_counts = {}
		db.tables.each do |table|
			tables_with_counts[table] = db[table].count
		end
		apply_table_filter(tables_with_counts)
	end

end

end