lib/postgres_upsert/writer.rb in postgres_upsert-5.0.0 vs lib/postgres_upsert/writer.rb in postgres_upsert-5.1.0

- old
+ new

@@ -1,130 +1,143 @@ module PostgresUpsert class Writer - def initialize(klass, source, options = {}) + def initialize(klass, destination, source, options = {}) @klass = klass + @destination = destination + @source = source @options = options.reverse_merge({ - :delimiter => ",", - :header => true, - :unique_key => [primary_key], - :update_only => false}) + delimiter: ',', + header: true, + unique_key: [primary_key], + update_only: false + }) + @source = source @options[:unique_key] = Array.wrap(@options[:unique_key]) - @source = source.instance_of?(String) ? File.open(source, 'r') : source - @columns_list = get_columns - generate_temp_table_name + end def write - if @columns_list.empty? - raise "Either the :columns option or :header => true are required" - end + validate_options - csv_options = "DELIMITER '#{@options[:delimiter]}' CSV" - - copy_table = @temp_table_name - columns_string = columns_string_for_copy + create_temp_table - @copy_result = database_connection.raw_connection.copy_data %{COPY #{copy_table} #{columns_string} FROM STDIN #{csv_options}} do - - while line = @source.gets do - next if line.strip.size == 0 - database_connection.raw_connection.put_copy_data line - end + if @source.continuous_write_enabled + write_continuous + else + write_batched end upsert_from_temp_table drop_temp_table summarize_results end private + def write_continuous + csv_options = "DELIMITER '#{@options[:delimiter]}' CSV" + @copy_result = database_connection.raw_connection.copy_data %{COPY #{@temp_table_name} #{columns_string_for_copy} FROM STDIN #{csv_options}} do + while (line = @source.gets) + next if line.strip.empty? + + database_connection.raw_connection.put_copy_data line + end + end + end + + def write_batched + @source.gets do |line| + @copy_result = database_connection.raw_connection.copy_data %{COPY #{@temp_table_name} #{columns_string_for_copy} FROM STDIN} do + database_connection.raw_connection.put_copy_data line + end + end + end + def database_connection - @klass.connection + @destination.database_connection end def summarize_results result = PostgresUpsert::Result.new(@insert_result, @update_result, @copy_result) expected_rows = @options[:update_only] ? result.updated_rows : result.copied_rows - + if result.changed_rows != expected_rows raise "#{expected_rows} rows were copied, but #{result.changed_rows} were upserted to destination table. Check to make sure your key is unique." end - return result + result end def primary_key - @klass.primary_key + @destination.primary_key end - def column_names - @klass.column_names + def destination_columns + @destination.column_names end def quoted_table_name - @klass.quoted_table_name + @destination.quoted_table_name end - def get_columns - columns_list = @options[:columns] ? @options[:columns].map(&:to_s) : [] - if @options[:header] - #if header is present, we need to strip it from io, whether we use it for the columns list or not. - line = @source.gets - if columns_list.empty? - columns_list = line.strip.split(@options[:delimiter]) - end - end - columns_list = columns_list.map{|c| @options[:map][c.to_s] } if @options[:map] - return columns_list + def source_columns + @source.columns end def columns_string_for_copy str = get_columns_string str.empty? ? str : "(#{str})" end def columns_string_for_select - columns = @columns_list.clone - columns << "created_at" if column_names.include?("created_at") - columns << "updated_at" if column_names.include?("updated_at") - str = get_columns_string(columns) + columns = source_columns.clone + columns << 'created_at' if inject_create_timestamp? + columns << 'updated_at' if inject_update_timestamp? + get_columns_string(columns) end def columns_string_for_insert - columns = @columns_list.clone - columns << "created_at" if column_names.include?("created_at") - columns << "updated_at" if column_names.include?("updated_at") - str = get_columns_string(columns) + columns = source_columns.clone + columns << 'created_at' if inject_create_timestamp? + columns << 'updated_at' if inject_update_timestamp? + get_columns_string(columns) end def select_string_for_insert - columns = @columns_list.clone + columns = source_columns.clone str = get_columns_string(columns) - str << ",'#{DateTime.now.utc}'" if column_names.include?("created_at") - str << ",'#{DateTime.now.utc}'" if column_names.include?("updated_at") + str << ",'#{DateTime.now.utc}'" if inject_create_timestamp? + str << ",'#{DateTime.now.utc}'" if inject_update_timestamp? str end + def inject_create_timestamp? + destination_columns.include?('created_at') && !source_columns.include?('created_at') + end + + def inject_update_timestamp? + destination_columns.include?('updated_at') && !source_columns.include?('updated_at') + end + def select_string_for_create - columns = @columns_list.map(&:to_sym) + columns = source_columns.map(&:to_sym) @options[:unique_key].each do |key_component| columns << key_component.to_sym unless columns.include?(key_component.to_sym) end get_columns_string(columns) end def get_columns_string(columns = nil) - columns ||= @columns_list - columns.size > 0 ? "\"#{columns.join('","')}\"" : "" + columns ||= source_columns + !columns.empty? ? "\"#{columns.join('","')}\"" : '' end def generate_temp_table_name - @temp_table_name = "#{@table_name}_temp_#{rand(1000)}" + @temp_table_name ||= "#{@table_name}_temp_#{rand(1000)}" end def upsert_from_temp_table update_from_temp_table insert_from_temp_table unless @options[:update_only] @@ -133,21 +146,23 @@ def update_from_temp_table @update_result = database_connection.execute <<-SQL UPDATE #{quoted_table_name} AS d #{update_set_clause} FROM #{@temp_table_name} as t - WHERE #{unique_key_select("t", "d")} - AND #{unique_key_present("d")} + WHERE #{unique_key_select('t', 'd')} + AND #{unique_key_present('d')} SQL end def update_set_clause - command = @columns_list.map do |col| + command = source_columns.map do |col| "\"#{col}\" = t.\"#{col}\"" end - command << "\"updated_at\" = '#{DateTime.now.utc}'" if column_names.include?("updated_at") - "SET #{command.join(',')}" + unless source_columns.include?('updated_at') + command << "\"updated_at\" = '#{DateTime.now.utc}'" if destination_columns.include?('updated_at') + end + "SET #{command.join(',')}" end def insert_from_temp_table columns_string = columns_string_for_insert select_string = select_string_for_insert @@ -156,38 +171,41 @@ SELECT #{select_string} FROM #{@temp_table_name} as t WHERE NOT EXISTS (SELECT 1 FROM #{quoted_table_name} as d - WHERE #{unique_key_select("t", "d")}); + WHERE #{unique_key_select('t', 'd')}); SQL end def unique_key_select(source, dest) - @options[:unique_key].map {|field| "#{source}.#{field} = #{dest}.#{field}"}.join(' AND ') + @options[:unique_key].map { |field| "#{source}.#{field} = #{dest}.#{field}" }.join(' AND ') end def unique_key_present(source) - @options[:unique_key].map {|field| "#{source}.#{field} IS NOT NULL"}.join(' AND ') + @options[:unique_key].map { |field| "#{source}.#{field} IS NOT NULL" }.join(' AND ') end def create_temp_table - columns_string = select_string_for_create - verify_temp_has_key + generate_temp_table_name database_connection.execute <<-SQL SET client_min_messages=WARNING; DROP TABLE IF EXISTS #{@temp_table_name}; CREATE TEMP TABLE #{@temp_table_name} - AS SELECT #{columns_string} FROM #{quoted_table_name} WHERE 0 = 1; + AS SELECT #{select_string_for_create} FROM #{quoted_table_name} WHERE 0 = 1; SQL end - def verify_temp_has_key + def validate_options + if source_columns.empty? + raise 'Either the :columns option or :header => true are required' + end + @options[:unique_key].each do |key_component| - unless @columns_list.include?(key_component.to_s) - raise "Expected a unique column '#{key_component}' but the source data does not include this column. Update the :columns list or explicitly set the unique_key option.}" - end + unless source_columns.include?(key_component.to_s) + raise "Expected column '#{key_component}' was not found in source" + end end end def drop_temp_table database_connection.execute <<-SQL