lib/pgsync/client.rb in pgsync-0.4.3 vs lib/pgsync/client.rb in pgsync-0.5.0
- old
+ new
@@ -3,11 +3,10 @@
def initialize(args)
$stdout.sync = true
$stderr.sync = true
@exit = false
@arguments, @options = parse_args(args)
- @mutex = windows? ? Mutex.new : MultiProcessing::Mutex.new
end
# TODO clean up this mess
def perform
return if @exit
@@ -90,12 +89,12 @@
end
unless opts[:schema_only]
confirm_tables_exist(destination, tables, "destination")
- in_parallel(tables) do |table, table_opts|
- TableSync.new.sync(@mutex, config, table, opts.merge(table_opts), source.url, destination.url, source.search_path.find { |sp| sp != "pg_catalog" })
+ in_parallel(tables, first_schema: source.search_path.find { |sp| sp != "pg_catalog" }) do |table, table_opts|
+ TableSync.new.sync(config, table, opts.merge(table_opts), source.url, destination.url)
end
end
log_completed(start_time)
end
@@ -179,10 +178,11 @@
o.boolean "--truncate", "truncate existing rows", default: false
o.boolean "--schema-first", "schema first", default: false
o.boolean "--schema-only", "schema only", default: false
o.boolean "--all-schemas", "all schemas", default: false
o.boolean "--no-rules", "do not apply data rules", default: false
+ o.boolean "--no-sequences", "do not sync sequences", default: false
o.boolean "--init", "init", default: false
o.boolean "--setup", "setup", default: false, help: false
o.boolean "--in-batches", "in batches", default: false, help: false
o.integer "--batch-size", "batch size", default: 10000, help: false
o.float "--sleep", "sleep", default: 0, help: false
@@ -265,20 +265,62 @@
def log(message = nil)
$stderr.puts message
end
- def in_parallel(tables, &block)
+ def in_parallel(tables, first_schema:, &block)
+ spinners = TTY::Spinner::Multi.new(format: :dots)
+ item_spinners = {}
+
+ start = lambda do |item, i|
+ table, opts = item
+ message = String.new(":spinner ")
+ message << table.sub("#{first_schema}.", "")
+ # maybe output later
+ # message << " #{opts[:sql]}" if opts[:sql]
+ spinner = spinners.register(message)
+ spinner.auto_spin
+ item_spinners[item] = spinner
+ end
+
+ errors = 0
+
+ finish = lambda do |item, i, result|
+ spinner = item_spinners[item]
+ if result[:status] == "success"
+ spinner.success(display_message(result))
+ else
+ # TODO add option to fail fast
+ spinner.error(display_message(result))
+ errors += 1
+ end
+
+ unless spinner.send(:tty?)
+ status = result[:status] == "success" ? "✔" : "✖"
+ log [status, item.first.sub("#{first_schema}.", ""), display_message(result)].compact.join(" ")
+ end
+ end
+
+ options = {start: start, finish: finish}
if @options[:debug] || @options[:in_batches]
- tables.each(&block)
+ options[:in_processes] = 0
else
- options = {}
options[:in_threads] = 4 if windows?
- Parallel.each(tables, options, &block)
end
+
+ Parallel.each(tables, **options, &block)
+
+ raise PgSync::Error, "Synced failed for #{errors} table#{errors == 1 ? nil : "s"}" if errors > 0
end
+ def display_message(result)
+ message = String.new("")
+ message << "- #{result[:time]}s" if result[:time]
+ message << "(#{result[:message].gsub("\n", " ").strip})" if result[:message]
+ message
+ end
+
def pretty_list(items)
items.each do |item|
log item
end
end
@@ -287,13 +329,22 @@
log "[DEPRECATED] #{message}"
end
def log_completed(start_time)
time = Time.now - start_time
- log "Completed in #{time.round(1)}s"
+ message = "Completed in #{time.round(1)}s"
+ log self.class.colorize(message, 32) # green
end
def windows?
Gem.win_platform?
+ end
+
+ def self.colorize(message, color_code)
+ if $stderr.tty?
+ "\e[#{color_code}m#{message}\e[0m"
+ else
+ message
+ end
end
end
end