lib/postgres_upsert/writer.rb in postgres_upsert-4.0.0 vs lib/postgres_upsert/writer.rb in postgres_upsert-5.0.0
- old
+ new
@@ -2,45 +2,62 @@
class Writer
def initialize(klass, source, options = {})
@klass = klass
@options = options.reverse_merge({
- :delimiter => ",",
- :header => true,
- :key_column => primary_key,
+ :delimiter => ",",
+ :header => true,
+ :unique_key => [primary_key],
:update_only => false})
+ @options[:unique_key] = Array.wrap(@options[:unique_key])
@source = source.instance_of?(String) ? File.open(source, 'r') : source
@columns_list = get_columns
generate_temp_table_name
end
def write
- if @columns_list.empty?
+ if @columns_list.empty?
raise "Either the :columns option or :header => true are required"
end
csv_options = "DELIMITER '#{@options[:delimiter]}' CSV"
copy_table = @temp_table_name
columns_string = columns_string_for_copy
create_temp_table
- ActiveRecord::Base.connection.raw_connection.copy_data %{COPY #{copy_table} #{columns_string} FROM STDIN #{csv_options}} do
+ @copy_result = database_connection.raw_connection.copy_data %{COPY #{copy_table} #{columns_string} FROM STDIN #{csv_options}} do
while line = @source.gets do
next if line.strip.size == 0
- ActiveRecord::Base.connection.raw_connection.put_copy_data line
+ database_connection.raw_connection.put_copy_data line
end
end
upsert_from_temp_table
drop_temp_table
- PostgresUpsert::Result.new(@insert_result, @update_result)
+
+ summarize_results
end
private
+ def database_connection
+ @klass.connection
+ end
+
+ def summarize_results
+ result = PostgresUpsert::Result.new(@insert_result, @update_result, @copy_result)
+ expected_rows = @options[:update_only] ? result.updated_rows : result.copied_rows
+
+ if result.changed_rows != expected_rows
+ raise "#{expected_rows} rows were copied, but #{result.changed_rows} were upserted to destination table. Check to make sure your key is unique."
+ end
+
+ return result
+ end
+
def primary_key
@klass.primary_key
end
def column_names
@@ -91,11 +108,13 @@
str
end
def select_string_for_create
columns = @columns_list.map(&:to_sym)
- columns << @options[:key_column].to_sym unless columns.include?(@options[:key_column].to_sym)
+ @options[:unique_key].each do |key_component|
+ columns << key_component.to_sym unless columns.include?(key_component.to_sym)
+ end
get_columns_string(columns)
end
def get_columns_string(columns = nil)
columns ||= @columns_list
@@ -110,16 +129,16 @@
update_from_temp_table
insert_from_temp_table unless @options[:update_only]
end
def update_from_temp_table
- @update_result = ActiveRecord::Base.connection.execute <<-SQL
+ @update_result = database_connection.execute <<-SQL
UPDATE #{quoted_table_name} AS d
#{update_set_clause}
FROM #{@temp_table_name} as t
- WHERE t.#{@options[:key_column]} = d.#{@options[:key_column]}
- AND d.#{@options[:key_column]} IS NOT NULL;
+ WHERE #{unique_key_select("t", "d")}
+ AND #{unique_key_present("d")}
SQL
end
def update_set_clause
command = @columns_list.map do |col|
@@ -130,42 +149,51 @@
end
def insert_from_temp_table
columns_string = columns_string_for_insert
select_string = select_string_for_insert
- @insert_result = ActiveRecord::Base.connection.execute <<-SQL
+ @insert_result = database_connection.execute <<-SQL
INSERT INTO #{quoted_table_name} (#{columns_string})
SELECT #{select_string}
FROM #{@temp_table_name} as t
- WHERE NOT EXISTS
- (SELECT 1
- FROM #{quoted_table_name} as d
- WHERE d.#{@options[:key_column]} = t.#{@options[:key_column]})
- AND t.#{@options[:key_column]} IS NOT NULL;
+ WHERE NOT EXISTS
+ (SELECT 1
+ FROM #{quoted_table_name} as d
+ WHERE #{unique_key_select("t", "d")});
SQL
end
+ def unique_key_select(source, dest)
+ @options[:unique_key].map {|field| "#{source}.#{field} = #{dest}.#{field}"}.join(' AND ')
+ end
+
+ def unique_key_present(source)
+ @options[:unique_key].map {|field| "#{source}.#{field} IS NOT NULL"}.join(' AND ')
+ end
+
def create_temp_table
columns_string = select_string_for_create
verify_temp_has_key
- ActiveRecord::Base.connection.execute <<-SQL
+ database_connection.execute <<-SQL
SET client_min_messages=WARNING;
DROP TABLE IF EXISTS #{@temp_table_name};
- CREATE TEMP TABLE #{@temp_table_name}
+ CREATE TEMP TABLE #{@temp_table_name}
AS SELECT #{columns_string} FROM #{quoted_table_name} WHERE 0 = 1;
SQL
end
def verify_temp_has_key
- unless @columns_list.include?(@options[:key_column].to_s)
- raise "Expected a unique column '#{@options[:key_column]}' but the source data does not include this column. Update the :columns list or explicitly set the :key_column option.}"
+ @options[:unique_key].each do |key_component|
+ unless @columns_list.include?(key_component.to_s)
+ raise "Expected a unique column '#{key_component}' but the source data does not include this column. Update the :columns list or explicitly set the unique_key option.}"
+ end
end
end
def drop_temp_table
- ActiveRecord::Base.connection.execute <<-SQL
- DROP TABLE #{@temp_table_name}
+ database_connection.execute <<-SQL
+ DROP TABLE #{@temp_table_name}
SQL
end
end
end