lib/upsert/buffer/pg_connection.rb in upsert-0.0.1 vs lib/upsert/buffer/pg_connection.rb in upsert-0.1.0

- old
+ new

@@ -1,72 +1,66 @@ require 'upsert/buffer/pg_connection/column_definition' class Upsert class Buffer + # @private class PG_Connection < Buffer - attr_reader :db_function_name + include Quoter - def compose(targets) - target = targets.first - unless created_db_function? - create_db_function target + attr_reader :merge_function + + def chunk + return false if rows.empty? + row = rows.shift + unless merge_function + create_merge_function row end - hsh = target.to_hash + hsh = row.to_hash ordered_args = column_definitions.map do |c| - if hsh.has_key? c.name - hsh[c.name] - else - nil - end + hsh[c.name] end - %{ SELECT #{db_function_name}(#{quote_values(ordered_args)}) } + %{SELECT #{merge_function}(#{quote_values(ordered_args)})} end def execute(sql) connection.exec sql end - def max_length - INFINITY + def quote_string(v) + SINGLE_QUOTE + connection.escape_string(v) + SINGLE_QUOTE end - def max_targets - 1 + def quote_binary(v) + E_AND_SINGLE_QUOTE + connection.escape_bytea(v) + SINGLE_QUOTE end - include Quoter - - def quote_ident(k) - SINGLE_QUOTE + connection.quote_ident(k) + SINGLE_QUOTE + def quote_time(v) + quote_string [v.strftime(ISO8601_DATETIME), sprintf(USEC_SPRINTF, v.usec)].join('.') end - - # FIXME escape_bytea with (v, k = nil) - def quote_value(v) - case v - when NilClass - 'NULL' - when String, Symbol - SINGLE_QUOTE + connection.escape_string(v.to_s) + SINGLE_QUOTE - else - v - end + + def quote_big_decimal(v) + v.to_s('F') end - + + def quote_boolean(v) + v ? 'TRUE' : 'FALSE' + end + + def quote_ident(k) + DOUBLE_QUOTE + connection.quote_ident(k.to_s) + DOUBLE_QUOTE + end + def column_definitions @column_definitions ||= ColumnDefinition.all(connection, table_name) end private - def created_db_function? - !!@created_db_function_query - end - - def create_db_function(example_row) - @db_function_name = "pg_temp.merge_#{table_name}_#{Kernel.rand(1e11)}" + def create_merge_function(example_row) + @merge_function = "pg_temp.merge_#{table_name}_#{Kernel.rand(1e11)}" execute <<-EOS -CREATE FUNCTION #{db_function_name}(#{column_definitions.map { |c| "#{c.name}_input #{c.sql_type} DEFAULT #{c.default || 'NULL'}" }.join(',') }) RETURNS VOID AS +CREATE FUNCTION #{merge_function}(#{column_definitions.map { |c| "#{c.name}_input #{c.sql_type} DEFAULT #{c.default || 'NULL'}" }.join(',') }) RETURNS VOID AS $$ BEGIN LOOP -- first try to update the key UPDATE #{table_name} SET #{column_definitions.map { |c| "#{c.name} = #{c.name}_input" }.join(',')} WHERE #{example_row.selector.keys.map { |k| "#{k} = #{k}_input" }.join(' AND ') }; @@ -85,10 +79,9 @@ END LOOP; END; $$ LANGUAGE plpgsql; EOS - @created_db_function_query = true end end end end