lib/upsert/buffer/pg_connection/merge_function.rb in upsert-0.4.0 vs lib/upsert/buffer/pg_connection/merge_function.rb in upsert-0.5.0
- old
+ new
@@ -1,138 +1,179 @@
require 'digest/md5'
class Upsert
- # @private
class Buffer
class PG_Connection < Buffer
+ # @private
class MergeFunction
class << self
def execute(buffer, row)
- first_try = true
- begin
- buffer.parent.connection.execute sql(buffer, row)
- rescue PG::Error => pg_error
- if first_try and pg_error.message =~ /function upsert_(.+) does not exist/
- Upsert.logger.info %{[upsert] Function #{"upsert_#{$1}".inspect} went missing, trying to recreate}
- first_try = false
- @lookup.clear
- retry
- else
- raise pg_error
- end
- end
- end
-
- def sql(buffer, row)
merge_function = lookup buffer, row
- %{SELECT #{merge_function.name}(#{merge_function.values_sql(row)})}
+ merge_function.execute row
end
- def unique_key(table_name, selector, columns)
- [
+ def unique_name(table_name, selector, setter)
+ parts = [
+ 'upsert',
table_name,
- selector.join(','),
- columns.join(',')
- ].join '/'
+ 'SEL',
+ selector.join('_A_'),
+ 'SET',
+ setter.join('_A_')
+ ].join('_')
+ # maybe i should md5 instead
+ crc32 = Zlib.crc32(parts).to_s
+ [ parts.first(MAX_NAME_LENGTH-11), crc32 ].join
end
def lookup(buffer, row)
@lookup ||= {}
- s = row.selector.keys
- c = row.columns
- @lookup[unique_key(buffer.parent.table_name, s, c)] ||= new(buffer, s, c)
+ selector = row.selector.keys
+ setter = row.setter.keys
+ key = [buffer.parent.table_name, selector, setter]
+ @lookup[key] ||= new(buffer, selector, setter)
end
+
+ def clear(buffer)
+ connection = buffer.parent.connection
+ # http://stackoverflow.com/questions/7622908/postgresql-drop-function-without-knowing-the-number-type-of-parameters
+ connection.execute <<-EOS
+CREATE OR REPLACE FUNCTION pg_temp.upsert_delfunc(text)
+ RETURNS void AS
+$BODY$
+DECLARE
+ _sql text;
+BEGIN
+
+FOR _sql IN
+ SELECT 'DROP FUNCTION ' || quote_ident(n.nspname)
+ || '.' || quote_ident(p.proname)
+ || '(' || pg_catalog.pg_get_function_identity_arguments(p.oid) || ');'
+ FROM pg_catalog.pg_proc p
+ LEFT JOIN pg_catalog.pg_namespace n ON n.oid = p.pronamespace
+ WHERE p.proname = $1
+ AND pg_catalog.pg_function_is_visible(p.oid) -- you may or may not want this
+LOOP
+ EXECUTE _sql;
+END LOOP;
+
+END;
+$BODY$
+ LANGUAGE plpgsql;
+EOS
+ res = connection.execute(%{SELECT proname FROM pg_proc WHERE proname LIKE 'upsert_%'})
+ res.each do |row|
+ k = row['proname']
+ next if k == 'upsert_delfunc'
+ Upsert.logger.info %{[upsert] Dropping function #{k.inspect}}
+ connection.execute %{SELECT pg_temp.upsert_delfunc('#{k}')}
+ end
+ end
end
+ MAX_NAME_LENGTH = 63
+
attr_reader :buffer
attr_reader :selector
- attr_reader :columns
+ attr_reader :setter
- def initialize(buffer, selector, columns)
+ def initialize(buffer, selector, setter)
@buffer = buffer
@selector = selector
- @columns = columns
+ @setter = setter
create!
end
def name
- @name ||= "upsert_#{Digest::MD5.hexdigest(unique_key)}"
+ @name ||= MergeFunction.unique_name table_name, selector, setter
end
- def values_sql(row)
- ordered_args = columns.map do |k|
- row.quoted_value(k) || NULL_WORD
- end.join(',')
+ def execute(row)
+ first_try = true
+ bind_selector_values = row.selector.values.map(&:bind_value)
+ bind_setter_values = row.setter.values.map(&:bind_value)
+ begin
+ connection.execute sql, (bind_selector_values + bind_setter_values)
+ rescue PG::Error => pg_error
+ if pg_error.message =~ /function #{name}.* does not exist/i
+ if first_try
+ Upsert.logger.info %{[upsert] Function #{name.inspect} went missing, trying to recreate}
+ first_try = false
+ create!
+ retry
+ else
+ Upsert.logger.info %{[upsert] Failed to create function #{name.inspect} for some reason}
+ raise pg_error
+ end
+ else
+ raise pg_error
+ end
+ end
end
private
- def unique_key
- @unique_key ||= MergeFunction.unique_key buffer.parent.table_name, selector, columns
+ def sql
+ @sql ||= begin
+ bind_params = []
+ 1.upto(selector.length + setter.length) { |i| bind_params << "$#{i}" }
+ %{SELECT #{name}(#{bind_params.join(', ')})}
+ end
end
def connection
buffer.parent.connection
end
+ def table_name
+ buffer.parent.table_name
+ end
+
def quoted_table_name
buffer.parent.quoted_table_name
end
- ColumnDefinition = Struct.new(:quoted_name, :quoted_input_name, :sql_type, :default)
-
- # activerecord-3.2.5/lib/active_record/connection_adapters/postgresql_adapter.rb#column_definitions
- def get_column_definitions
- res = connection.execute <<-EOS
-SELECT a.attname AS name, format_type(a.atttypid, a.atttypmod) AS sql_type, d.adsrc AS default
-FROM pg_attribute a LEFT JOIN pg_attrdef d
- ON a.attrelid = d.adrelid AND a.attnum = d.adnum
-WHERE a.attrelid = '#{quoted_table_name}'::regclass
- AND a.attnum > 0 AND NOT a.attisdropped
-EOS
- unsorted = res.select do |row|
- columns.include? row['name']
- end.inject({}) do |memo, row|
- k = row['name']
- memo[k] = ColumnDefinition.new connection.quote_ident(k), connection.quote_ident("#{k}_input"), row['sql_type'], row['default']
- memo
- end
- columns.map do |k|
- unsorted[k]
- end
- end
-
# the "canonical example" from http://www.postgresql.org/docs/9.1/static/plpgsql-control-structures.html#PLPGSQL-UPSERT-EXAMPLE
+ # differentiate between selector and setter
def create!
- Upsert.logger.info "[upsert] Creating or replacing database function #{name.inspect} on table #{buffer.parent.table_name.inspect} for selector #{selector.map(&:inspect).join(', ')} and columns #{columns.map(&:inspect).join(', ')}"
- column_definitions = get_column_definitions
+ Upsert.logger.info "[upsert] Creating or replacing database function #{name.inspect} on table #{table_name.inspect} for selector #{selector.map(&:inspect).join(', ')} and setter #{setter.map(&:inspect).join(', ')}"
+ column_definitions = ColumnDefinition.all buffer, table_name
+ selector_column_definitions = column_definitions.select { |cd| selector.include?(cd.name) }
+ setter_column_definitions = column_definitions.select { |cd| setter.include?(cd.name) }
connection.execute <<-EOS
-CREATE OR REPLACE FUNCTION #{name}(#{column_definitions.map { |c| "#{c.quoted_input_name} #{c.sql_type} DEFAULT #{c.default || 'NULL'}" }.join(',') }) RETURNS VOID AS
+CREATE OR REPLACE FUNCTION #{name}(#{(selector_column_definitions.map(&:to_selector_arg) + setter_column_definitions.map(&:to_setter_arg)).join(', ')}) RETURNS VOID AS
$$
+DECLARE
+ first_try INTEGER := 1;
BEGIN
LOOP
-- first try to update the key
- UPDATE #{quoted_table_name} SET #{column_definitions.map { |c| "#{c.quoted_name} = #{c.quoted_input_name}" }.join(',')}
- WHERE #{selector.map { |k| "#{connection.quote_ident(k)} = #{connection.quote_ident([k,'input'].join('_'))}" }.join(' AND ') };
+ UPDATE #{quoted_table_name} SET #{setter_column_definitions.map(&:to_setter).join(', ')}
+ WHERE #{selector_column_definitions.map(&:to_selector).join(' AND ') };
IF found THEN
RETURN;
END IF;
-- not there, so try to insert the key
-- if someone else inserts the same key concurrently,
-- we could get a unique-key failure
BEGIN
- INSERT INTO #{quoted_table_name}(#{column_definitions.map { |c| c.quoted_name }.join(',')}) VALUES (#{column_definitions.map { |c| c.quoted_input_name }.join(',')});
+ INSERT INTO #{quoted_table_name}(#{setter_column_definitions.map(&:quoted_name).join(', ')}) VALUES (#{setter_column_definitions.map(&:quoted_setter_name).join(', ')});
RETURN;
EXCEPTION WHEN unique_violation THEN
+ -- seamusabshere 9/20/12 only retry once
+ IF (first_try = 1) THEN
+ first_try := 0;
+ ELSE
+ RETURN;
+ END IF;
-- Do nothing, and loop to try the UPDATE again.
END;
END LOOP;
END;
$$
LANGUAGE plpgsql;
EOS
end
-
end
end
end
end