lib/good_job/lockable.rb in good_job-1.2.4 vs lib/good_job/lockable.rb in good_job-1.2.5

- old
+ new

@@ -1,49 +1,129 @@ module GoodJob + # + # Adds Postgres advisory locking capabilities to an ActiveRecord record. + # For details on advisory locks, see the Postgres documentation: + # - {https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS Advisory Locks Overview} + # - {https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS Advisory Locks Functions} + # + # @example Add this concern to a +MyRecord+ class: + # class MyRecord < ActiveRecord::Base + # include Lockable + # + # def my_method + # ... + # end + # end + # module Lockable extend ActiveSupport::Concern + # Indicates an advisory lock is already held on a record by another + # database session. RecordAlreadyAdvisoryLockedError = Class.new(StandardError) included do + # Attempt to acquire an advisory lock on the selected records and + # return only those records for which a lock could be acquired. + # @!method advisory_lock + # @!scope class + # @return [ActiveRecord::Relation] + # A relation selecting only the records that were locked. scope :advisory_lock, (lambda do original_query = self cte_table = Arel::Table.new(:rows) composed_cte = Arel::Nodes::As.new(cte_table, original_query.select(primary_key).except(:limit).arel) query = cte_table.project(cte_table[:id]) .with(composed_cte) - .where(Arel.sql(sanitize_sql_for_conditions(["pg_try_advisory_lock(('x'||substr(md5(:table_name || \"#{cte_table.name}\".\"#{primary_key}\"::text), 1, 16))::bit(64)::bigint)", { table_name: table_name }]))) + .where(Arel.sql(sanitize_sql_for_conditions(["pg_try_advisory_lock(('x' || substr(md5(:table_name || #{connection.quote_table_name(cte_table.name)}.#{quoted_primary_key}::text), 1, 16))::bit(64)::bigint)", { table_name: table_name }]))) limit = original_query.arel.ast.limit query.limit = limit.value if limit.present? - unscoped.where(arel_table[:id].in(query)).merge(original_query.only(:order)) + unscoped.where(arel_table[primary_key].in(query)).merge(original_query.only(:order)) end) + # Joins the current query with Postgres's +pg_locks+ table (it provides + # data about existing locks) such that each row in the main query joins + # to all the advisory locks associated with that row. + # + # For details on +pg_locks+, see + # {https://www.postgresql.org/docs/current/view-pg-locks.html}. + # @!method joins_advisory_locks + # @!scope class + # @return [ActiveRecord::Relation] + # @example Get the records that have a session awaiting a lock: + # MyLockableRecord.joins_advisory_locks.where("pg_locks.granted = ?", false) scope :joins_advisory_locks, (lambda do join_sql = <<~SQL LEFT JOIN pg_locks ON pg_locks.locktype = 'advisory' AND pg_locks.objsubid = 1 - AND pg_locks.classid = ('x'||substr(md5(:table_name || #{quoted_table_name}.#{quoted_primary_key}::text), 1, 16))::bit(32)::int - AND pg_locks.objid = (('x'||substr(md5(:table_name || #{quoted_table_name}.#{quoted_primary_key}::text), 1, 16))::bit(64) << 32)::bit(32)::int + AND pg_locks.classid = ('x' || substr(md5(:table_name || #{quoted_table_name}.#{quoted_primary_key}::text), 1, 16))::bit(32)::int + AND pg_locks.objid = (('x' || substr(md5(:table_name || #{quoted_table_name}.#{quoted_primary_key}::text), 1, 16))::bit(64) << 32)::bit(32)::int SQL joins(sanitize_sql_for_conditions([join_sql, { table_name: table_name }])) end) + # Find records that do not have an advisory lock on them. + # @!method advisory_unlocked + # @!scope class + # @return [ActiveRecord::Relation] scope :advisory_unlocked, -> { joins_advisory_locks.where(pg_locks: { locktype: nil }) } + + # Find records that have an advisory lock on them. + # @!method advisory_locked + # @!scope class + # @return [ActiveRecord::Relation] scope :advisory_locked, -> { joins_advisory_locks.where.not(pg_locks: { locktype: nil }) } + + # Find records with advisory locks owned by the current Postgres + # session/connection. + # @!method advisory_locked + # @!scope class + # @return [ActiveRecord::Relation] scope :owns_advisory_locked, -> { joins_advisory_locks.where('"pg_locks"."pid" = pg_backend_pid()') } + # @!attribute [r] create_with_advisory_lock + # @return [Boolean] + # Whether an advisory lock should be acquired in the same transaction + # that created the record. + # + # This helps prevent another thread or database session from acquiring a + # lock on the record between the time you create it and the time you + # request a lock, since other sessions will not be able to see the new + # record until the transaction that creates it is completed (at which + # point you have already acquired the lock). + # + # @example + # record = MyLockableRecord.create(create_with_advisory_lock: true) + # record.advisory_locked? + # => true attr_accessor :create_with_advisory_lock after_create -> { advisory_lock }, if: :create_with_advisory_lock end class_methods do + # Acquires an advisory lock on the selected record(s) and safely releases + # it after the passed block is completed. The block will be passed an + # array of the locked records as its first argument. + # + # Note that this will not block and wait for locks to be acquired. + # Instead, it will acquire a lock on all the selected records that it + # can (as in {Lockable.advisory_lock}) and only pass those that could be + # locked to the block. + # + # @yield [Array<Lockable>] the records that were successfully locked. + # @return [Object] the result of the block. + # + # @example Work on the first two +MyLockableRecord+ objects that could be locked: + # MyLockableRecord.order(created_at: :asc).limit(2).with_advisory_lock do |record| + # do_something_with record + # end def with_advisory_lock raise ArgumentError, "Must provide a block" unless block_given? records = advisory_lock.to_a begin @@ -52,45 +132,78 @@ records.each(&:advisory_unlock) end end end + # Acquires an advisory lock on this record if it is not already locked by + # another database session. Be careful to ensure you release the lock when + # you are done with {#advisory_unlock} (or {#advisory_unlock!} to release + # all remaining locks). + # @return [Boolean] whether the lock was acquired. def advisory_lock where_sql = <<~SQL - pg_try_advisory_lock(('x'||substr(md5(:table_name || :id::text), 1, 16))::bit(64)::bigint) + pg_try_advisory_lock(('x' || substr(md5(:table_name || :id::text), 1, 16))::bit(64)::bigint) SQL self.class.unscoped.where(where_sql, { table_name: self.class.table_name, id: send(self.class.primary_key) }).exists? end + # Releases an advisory lock on this record if it is locked by this database + # session. Note that advisory locks stack, so you must call + # {#advisory_unlock} and {#advisory_lock} the same number of times. + # @return [Boolean] whether the lock was released. def advisory_unlock where_sql = <<~SQL - pg_advisory_unlock(('x'||substr(md5(:table_name || :id::text), 1, 16))::bit(64)::bigint) + pg_advisory_unlock(('x' || substr(md5(:table_name || :id::text), 1, 16))::bit(64)::bigint) SQL self.class.unscoped.where(where_sql, { table_name: self.class.table_name, id: send(self.class.primary_key) }).exists? end + # Acquires an advisory lock on this record or raises + # {RecordAlreadyAdvisoryLockedError} if it is already locked by another + # database session. + # @raise [RecordAlreadyAdvisoryLockedError] + # @return [Boolean] +true+ def advisory_lock! result = advisory_lock result || raise(RecordAlreadyAdvisoryLockedError) end + # Acquires an advisory lock on this record and safely releases it after the + # passed block is completed. If the record is locked by another database + # session, this raises {RecordAlreadyAdvisoryLockedError}. + # + # @yield Nothing + # @return [Object] The result of the block. + # + # @example + # record = MyLockableRecord.first + # record.with_advisory_lock do + # do_something_with record + # end def with_advisory_lock raise ArgumentError, "Must provide a block" unless block_given? advisory_lock! yield ensure advisory_unlock unless $ERROR_INFO.is_a? RecordAlreadyAdvisoryLockedError end + # Tests whether this record has an advisory lock on it. + # @return [Boolean] def advisory_locked? self.class.unscoped.advisory_locked.where(id: send(self.class.primary_key)).exists? end + # Tests whether this record is locked by the current database session. + # @return [Boolean] def owns_advisory_lock? self.class.unscoped.owns_advisory_locked.where(id: send(self.class.primary_key)).exists? end + # Releases all advisory locks on the record that are held by the current + # database session. + # @return [void] def advisory_unlock! advisory_unlock while advisory_locked? end private