module PgHaMigrations::SafeStatements def safe_create_table(table, options={}, &block) unsafe_create_table(table, options, &block) end def safe_create_enum_type(name, values=nil) case values when nil raise ArgumentError, "safe_create_enum_type expects a set of values; if you want an enum with no values please pass an empty array" when [] unsafe_execute("CREATE TYPE #{PG::Connection.quote_ident(name.to_s)} AS ENUM ()") else escaped_values = values.map do |value| "'#{PG::Connection.escape_string(value.to_s)}'" end unsafe_execute("CREATE TYPE #{PG::Connection.quote_ident(name.to_s)} AS ENUM (#{escaped_values.join(',')})") end end def safe_add_enum_value(name, value) unsafe_execute("ALTER TYPE #{PG::Connection.quote_ident(name.to_s)} ADD VALUE '#{PG::Connection.escape_string(value)}'") end def safe_add_column(table, column, type, options = {}) if options.has_key? :default raise PgHaMigrations::UnsafeMigrationError.new(":default is NOT SAFE! Use safe_change_column_default afterwards then backfill the data to prevent locking the table") end if options[:null] == false raise PgHaMigrations::UnsafeMigrationError.new(":null => false is NOT SAFE if the table has data! If you _really_ want to do this, use unsafe_make_column_not_nullable") end unsafe_add_column(table, column, type, options) end def unsafe_add_column(table, column, type, options = {}) safely_acquire_lock_for_table(table) do super(table, column, type, options) end end def safe_change_column_default(table, column, default_value) escaped_value = case default_value when Proc PG::Connection.escape_string(default_value.call.to_s) else "'#{PG::Connection.escape_string(default_value.to_s)}'" end safely_acquire_lock_for_table(table) do unsafe_execute <<-SQL.strip_heredoc ALTER TABLE #{PG::Connection.quote_ident(table.to_s)} ALTER COLUMN #{PG::Connection.quote_ident(column.to_s)} SET DEFAULT #{escaped_value} SQL end end def safe_make_column_nullable(table, column) safely_acquire_lock_for_table(table) do unsafe_execute "ALTER TABLE #{table} ALTER COLUMN #{column} DROP NOT NULL" end end def unsafe_make_column_not_nullable(table, column, options={}) # options arg is only present for backwards compatiblity safely_acquire_lock_for_table(table) do unsafe_execute "ALTER TABLE #{table} ALTER COLUMN #{column} SET NOT NULL" end end def safe_add_concurrent_index(table, columns, options={}) unsafe_add_index(table, columns, options.merge(:algorithm => :concurrently)) end def safe_remove_concurrent_index(table, options={}) unless options.is_a?(Hash) && options.key?(:name) raise ArgumentError, "Expected safe_remove_concurrent_index to be called with arguments (table_name, :name => ...)" end unless ActiveRecord::Base.connection.postgresql_version >= 90600 raise PgHaMigrations::InvalidMigrationError, "Removing an index concurrently is not supported on Postgres 9.1 databases" end index_size = select_value("SELECT pg_size_pretty(pg_relation_size('#{options[:name]}'))") say "Preparing to drop index #{options[:name]} which is #{index_size} on disk..." unsafe_remove_index(table, options.merge(:algorithm => :concurrently)) end def safe_set_maintenance_work_mem_gb(gigabytes) unsafe_execute("SET maintenance_work_mem = '#{PG::Connection.escape_string(gigabytes.to_s)} GB'") end def _per_migration_caller @_per_migration_caller ||= Kernel.caller end def _check_postgres_adapter! expected_adapter = "PostgreSQL" actual_adapter = ActiveRecord::Base.connection.adapter_name raise PgHaMigrations::UnsupportedAdapter, "This gem only works with the #{expected_adapter} adapter, found #{actual_adapter} instead" unless actual_adapter == expected_adapter end def exec_migration(conn, direction) _check_postgres_adapter! super(conn, direction) end def safely_acquire_lock_for_table(table, &block) _check_postgres_adapter! table = table.to_s quoted_table_name = connection.quote_table_name(table) successfully_acquired_lock = false until successfully_acquired_lock while ( blocking_transactions = PgHaMigrations::BlockingDatabaseTransactions.find_blocking_transactions("#{PgHaMigrations::LOCK_TIMEOUT_SECONDS} seconds") blocking_transactions.any? { |query| query.tables_with_locks.include?(table) } ) say "Waiting on blocking transactions:" blocking_transactions.each do |blocking_transaction| say blocking_transaction.description end sleep(PgHaMigrations::LOCK_TIMEOUT_SECONDS) end connection.transaction do adjust_timeout_method = connection.postgresql_version >= 90300 ? :adjust_lock_timeout : :adjust_statement_timeout begin method(adjust_timeout_method).call(PgHaMigrations::LOCK_TIMEOUT_SECONDS) do connection.execute("LOCK #{quoted_table_name};") end successfully_acquired_lock = true rescue ActiveRecord::StatementInvalid => e if e.message =~ /PG::LockNotAvailable.+ lock timeout/ || e.message =~ /PG::QueryCanceled.+ statement timeout/ sleep_seconds = PgHaMigrations::LOCK_FAILURE_RETRY_DELAY_MULTLIPLIER * PgHaMigrations::LOCK_TIMEOUT_SECONDS say "Timed out trying to acquire an exclusive lock on the #{quoted_table_name} table." say "Sleeping for #{sleep_seconds}s to allow potentially queued up queries to finish before continuing." sleep(sleep_seconds) raise ActiveRecord::Rollback else raise e end end if successfully_acquired_lock block.call end end end end def adjust_lock_timeout(timeout_seconds = PgHaMigrations::LOCK_TIMEOUT_SECONDS, &block) _check_postgres_adapter! original_timeout = ActiveRecord::Base.value_from_sql("SHOW lock_timeout").sub(/s\Z/, '').to_i * 1000 begin connection.execute("SET lock_timeout = #{PG::Connection.escape_string((timeout_seconds * 1000).to_s)};") block.call ensure begin connection.execute("SET lock_timeout = #{original_timeout};") rescue ActiveRecord::StatementInvalid => e if e.message =~ /PG::InFailedSqlTransaction/ # If we're in a failed transaction the `SET lock_timeout` will be rolled back, # so we don't need to worry about cleaning up, and we can't execute SQL anyway. else raise e end end end end def adjust_statement_timeout(timeout_seconds, &block) _check_postgres_adapter! original_timeout = ActiveRecord::Base.value_from_sql("SHOW statement_timeout").sub(/s\Z/, '').to_i * 1000 begin connection.execute("SET statement_timeout = #{PG::Connection.escape_string((timeout_seconds * 1000).to_s)};") block.call ensure begin connection.execute("SET statement_timeout = #{original_timeout};") rescue ActiveRecord::StatementInvalid => e if e.message =~ /PG::InFailedSqlTransaction/ # If we're in a failed transaction the `SET lock_timeout` will be rolled back, # so we don't need to worry about cleaning up, and we can't execute SQL anyway. else raise e end end end end end ActiveRecord::Migration.send(:prepend, PgHaMigrations::SafeStatements)