require 'digest/md5' class Upsert class Buffer class PG_Connection < Buffer # @private class MergeFunction class << self def execute(buffer, row) merge_function = lookup buffer, row merge_function.execute row end def unique_name(table_name, selector, setter) parts = [ 'upsert', table_name, 'SEL', selector.join('_A_'), 'SET', setter.join('_A_') ].join('_') # maybe i should md5 instead crc32 = Zlib.crc32(parts).to_s [ parts.first(MAX_NAME_LENGTH-11), crc32 ].join end def lookup(buffer, row) @lookup ||= {} selector = row.selector.keys setter = row.setter.keys key = [buffer.parent.table_name, selector, setter] @lookup[key] ||= new(buffer, selector, setter) end def clear(buffer) connection = buffer.parent.connection # connection.execute <<-EOS CREATE OR REPLACE FUNCTION pg_temp.upsert_delfunc(text) RETURNS void AS $BODY$ DECLARE _sql text; BEGIN FOR _sql IN SELECT 'DROP FUNCTION ' || quote_ident(n.nspname) || '.' || quote_ident(p.proname) || '(' || pg_catalog.pg_get_function_identity_arguments(p.oid) || ');' FROM pg_catalog.pg_proc p LEFT JOIN pg_catalog.pg_namespace n ON n.oid = p.pronamespace WHERE p.proname = $1 AND pg_catalog.pg_function_is_visible(p.oid) -- you may or may not want this LOOP EXECUTE _sql; END LOOP; END; $BODY$ LANGUAGE plpgsql; EOS res = connection.execute(%{SELECT proname FROM pg_proc WHERE proname LIKE 'upsert_%'}) res.each do |row| k = row['proname'] next if k == 'upsert_delfunc' %{[upsert] Dropping function #{k.inspect}} connection.execute %{SELECT pg_temp.upsert_delfunc('#{k}')} end end end MAX_NAME_LENGTH = 63 attr_reader :buffer attr_reader :selector attr_reader :setter def initialize(buffer, selector, setter) @buffer = buffer @selector = selector @setter = setter create! end def name @name ||= MergeFunction.unique_name table_name, selector, setter end def execute(row) first_try = true bind_selector_values = bind_setter_values = begin connection.execute sql, (bind_selector_values + bind_setter_values) rescue PG::Error => pg_error if pg_error.message =~ /function #{name}.* does not exist/i if first_try %{[upsert] Function #{name.inspect} went missing, trying to recreate} first_try = false create! retry else %{[upsert] Failed to create function #{name.inspect} for some reason} raise pg_error end else raise pg_error end end end private def sql @sql ||= begin bind_params = [] 1.upto(selector.length + setter.length) { |i| bind_params << "$#{i}" } %{SELECT #{name}(#{bind_params.join(', ')})} end end def connection buffer.parent.connection end def table_name buffer.parent.table_name end def quoted_table_name buffer.parent.quoted_table_name end # the "canonical example" from # differentiate between selector and setter def create! "[upsert] Creating or replacing database function #{name.inspect} on table #{table_name.inspect} for selector #{', ')} and setter #{', ')}" column_definitions = ColumnDefinition.all buffer, table_name selector_column_definitions = { |cd| selector.include?( } setter_column_definitions = { |cd| setter.include?( } connection.execute <<-EOS CREATE OR REPLACE FUNCTION #{name}(#{( +', ')}) RETURNS VOID AS $$ DECLARE first_try INTEGER := 1; BEGIN LOOP -- first try to update the key UPDATE #{quoted_table_name} SET #{', ')} WHERE #{' AND ') }; IF found THEN RETURN; END IF; -- not there, so try to insert the key -- if someone else inserts the same key concurrently, -- we could get a unique-key failure BEGIN INSERT INTO #{quoted_table_name}(#{', ')}) VALUES (#{', ')}); RETURN; EXCEPTION WHEN unique_violation THEN -- seamusabshere 9/20/12 only retry once IF (first_try = 1) THEN first_try := 0; ELSE RETURN; END IF; -- Do nothing, and loop to try the UPDATE again. END; END LOOP; END; $$ LANGUAGE plpgsql; EOS end end end end end