lib/pgsync/task.rb in pgsync-0.6.3 vs lib/pgsync/task.rb in pgsync-0.6.4
- old
+ new
@@ -1,18 +1,20 @@
module PgSync
class Task
include Utils
attr_reader :source, :destination, :config, :table, :opts
- attr_accessor :from_columns, :to_columns
+ attr_accessor :from_columns, :to_columns, :from_sequences, :to_sequences, :to_primary_key
def initialize(source:, destination:, config:, table:, opts:)
@source = source
@destination = destination
@config = config
@table = table
@opts = opts
+ @from_sequences = []
+ @to_sequences = []
end
def quoted_table
quote_ident_full(table)
end
@@ -37,18 +39,10 @@
def shared_fields
@shared_fields ||= to_fields & from_fields
end
- def from_sequences
- @from_sequences ||= opts[:no_sequences] ? [] : source.sequences(table, shared_fields)
- end
-
- def to_sequences
- @to_sequences ||= opts[:no_sequences] ? [] : destination.sequences(table, shared_fields)
- end
-
def shared_sequences
@shared_sequences ||= to_sequences & from_sequences
end
def notes
@@ -86,19 +80,14 @@
sql_clause = String.new("")
sql_clause << " #{opts[:sql]}" if opts[:sql]
bad_fields = opts[:no_rules] ? [] : config["data_rules"]
- primary_key = destination.primary_key(table)
+ primary_key = to_primary_key
copy_fields = shared_fields.map { |f| f2 = bad_fields.to_a.find { |bf, _| rule_match?(table, f, bf) }; f2 ? "#{apply_strategy(f2[1], table, f, primary_key)} AS #{quote_ident(f)}" : "#{quoted_table}.#{quote_ident(f)}" }.join(", ")
fields = shared_fields.map { |f| quote_ident(f) }.join(", ")
- seq_values = {}
- shared_sequences.each do |seq|
- seq_values[seq] = source.last_value(seq)
- end
-
copy_to_command = "COPY (SELECT #{copy_fields} FROM #{quoted_table}#{sql_clause}) TO STDOUT"
if opts[:in_batches]
raise Error, "No primary key" if primary_key.empty?
primary_key = primary_key.first
@@ -161,12 +150,15 @@
else
destination.truncate(table)
end
copy(copy_to_command, dest_table: table, dest_fields: fields)
end
- seq_values.each do |seq, value|
- destination.execute("SELECT setval(#{escape(seq)}, #{escape(value)})")
+
+ # update sequences
+ shared_sequences.each do |seq|
+ value = source.last_value(seq)
+ destination.execute("SELECT setval(#{escape(quote_ident_full(seq))}, #{escape(value)})")
end
{status: "success"}
end
@@ -212,9 +204,13 @@
{status: "error", message: message}
end
def copy(source_command, dest_table:, dest_fields:)
destination_command = "COPY #{quote_ident_full(dest_table)} (#{dest_fields}) FROM STDIN"
+
+ source.log_sql(source_command)
+ destination.log_sql(destination_command)
+
destination.conn.copy_data(destination_command) do
source.conn.copy_data(source_command) do
while (row = source.conn.get_copy_data)
destination.conn.put_copy_data(row)
end