lib/upsert/buffer/pg_connection.rb in upsert-0.0.1 vs lib/upsert/buffer/pg_connection.rb in upsert-0.1.0
- old
+ new
@@ -1,72 +1,66 @@
require 'upsert/buffer/pg_connection/column_definition'
class Upsert
class Buffer
+ # @private
class PG_Connection < Buffer
- attr_reader :db_function_name
+ include Quoter
- def compose(targets)
- target = targets.first
- unless created_db_function?
- create_db_function target
+ attr_reader :merge_function
+
+ def chunk
+ return false if rows.empty?
+ row = rows.shift
+ unless merge_function
+ create_merge_function row
end
- hsh = target.to_hash
+ hsh = row.to_hash
ordered_args = column_definitions.map do |c|
- if hsh.has_key? c.name
- hsh[c.name]
- else
- nil
- end
+ hsh[c.name]
end
- %{ SELECT #{db_function_name}(#{quote_values(ordered_args)}) }
+ %{SELECT #{merge_function}(#{quote_values(ordered_args)})}
end
def execute(sql)
connection.exec sql
end
- def max_length
- INFINITY
+ def quote_string(v)
+ SINGLE_QUOTE + connection.escape_string(v) + SINGLE_QUOTE
end
- def max_targets
- 1
+ def quote_binary(v)
+ E_AND_SINGLE_QUOTE + connection.escape_bytea(v) + SINGLE_QUOTE
end
- include Quoter
-
- def quote_ident(k)
- SINGLE_QUOTE + connection.quote_ident(k) + SINGLE_QUOTE
+ def quote_time(v)
+ quote_string [v.strftime(ISO8601_DATETIME), sprintf(USEC_SPRINTF, v.usec)].join('.')
end
-
- # FIXME escape_bytea with (v, k = nil)
- def quote_value(v)
- case v
- when NilClass
- 'NULL'
- when String, Symbol
- SINGLE_QUOTE + connection.escape_string(v.to_s) + SINGLE_QUOTE
- else
- v
- end
+
+ def quote_big_decimal(v)
+ v.to_s('F')
end
-
+
+ def quote_boolean(v)
+ v ? 'TRUE' : 'FALSE'
+ end
+
+ def quote_ident(k)
+ DOUBLE_QUOTE + connection.quote_ident(k.to_s) + DOUBLE_QUOTE
+ end
+
def column_definitions
@column_definitions ||= ColumnDefinition.all(connection, table_name)
end
private
- def created_db_function?
- !!@created_db_function_query
- end
-
- def create_db_function(example_row)
- @db_function_name = "pg_temp.merge_#{table_name}_#{Kernel.rand(1e11)}"
+ def create_merge_function(example_row)
+ @merge_function = "pg_temp.merge_#{table_name}_#{Kernel.rand(1e11)}"
execute <<-EOS
-CREATE FUNCTION #{db_function_name}(#{column_definitions.map { |c| "#{c.name}_input #{c.sql_type} DEFAULT #{c.default || 'NULL'}" }.join(',') }) RETURNS VOID AS
+CREATE FUNCTION #{merge_function}(#{column_definitions.map { |c| "#{c.name}_input #{c.sql_type} DEFAULT #{c.default || 'NULL'}" }.join(',') }) RETURNS VOID AS
$$
BEGIN
LOOP
-- first try to update the key
UPDATE #{table_name} SET #{column_definitions.map { |c| "#{c.name} = #{c.name}_input" }.join(',')} WHERE #{example_row.selector.keys.map { |k| "#{k} = #{k}_input" }.join(' AND ') };
@@ -85,10 +79,9 @@
END LOOP;
END;
$$
LANGUAGE plpgsql;
EOS
- @created_db_function_query = true
end
end
end
end