lib/pgsync/task.rb in pgsync-0.6.3 vs lib/pgsync/task.rb in pgsync-0.6.4

- old
+ new

@@ -1,18 +1,20 @@ module PgSync class Task include Utils attr_reader :source, :destination, :config, :table, :opts - attr_accessor :from_columns, :to_columns + attr_accessor :from_columns, :to_columns, :from_sequences, :to_sequences, :to_primary_key def initialize(source:, destination:, config:, table:, opts:) @source = source @destination = destination @config = config @table = table @opts = opts + @from_sequences = [] + @to_sequences = [] end def quoted_table quote_ident_full(table) end @@ -37,18 +39,10 @@ def shared_fields @shared_fields ||= to_fields & from_fields end - def from_sequences - @from_sequences ||= opts[:no_sequences] ? [] : source.sequences(table, shared_fields) - end - - def to_sequences - @to_sequences ||= opts[:no_sequences] ? [] : destination.sequences(table, shared_fields) - end - def shared_sequences @shared_sequences ||= to_sequences & from_sequences end def notes @@ -86,19 +80,14 @@ sql_clause = String.new("") sql_clause << " #{opts[:sql]}" if opts[:sql] bad_fields = opts[:no_rules] ? [] : config["data_rules"] - primary_key = destination.primary_key(table) + primary_key = to_primary_key copy_fields = shared_fields.map { |f| f2 = bad_fields.to_a.find { |bf, _| rule_match?(table, f, bf) }; f2 ? "#{apply_strategy(f2[1], table, f, primary_key)} AS #{quote_ident(f)}" : "#{quoted_table}.#{quote_ident(f)}" }.join(", ") fields = shared_fields.map { |f| quote_ident(f) }.join(", ") - seq_values = {} - shared_sequences.each do |seq| - seq_values[seq] = source.last_value(seq) - end - copy_to_command = "COPY (SELECT #{copy_fields} FROM #{quoted_table}#{sql_clause}) TO STDOUT" if opts[:in_batches] raise Error, "No primary key" if primary_key.empty? primary_key = primary_key.first @@ -161,12 +150,15 @@ else destination.truncate(table) end copy(copy_to_command, dest_table: table, dest_fields: fields) end - seq_values.each do |seq, value| - destination.execute("SELECT setval(#{escape(seq)}, #{escape(value)})") + + # update sequences + shared_sequences.each do |seq| + value = source.last_value(seq) + destination.execute("SELECT setval(#{escape(quote_ident_full(seq))}, #{escape(value)})") end {status: "success"} end @@ -212,9 +204,13 @@ {status: "error", message: message} end def copy(source_command, dest_table:, dest_fields:) destination_command = "COPY #{quote_ident_full(dest_table)} (#{dest_fields}) FROM STDIN" + + source.log_sql(source_command) + destination.log_sql(destination_command) + destination.conn.copy_data(destination_command) do source.conn.copy_data(source_command) do while (row = source.conn.get_copy_data) destination.conn.put_copy_data(row) end