lib/good_job/lockable.rb in good_job-1.11.0 vs lib/good_job/lockable.rb in good_job-1.11.1

- old
+ new

@@ -21,24 +21,24 @@ # database session. RecordAlreadyAdvisoryLockedError = Class.new(StandardError) included do # Default column to be used when creating Advisory Locks - cattr_accessor(:advisory_lockable_column, instance_accessor: false) { primary_key } + class_attribute :advisory_lockable_column, instance_accessor: false, default: Concurrent::Delay.new { primary_key } # Default Postgres function to be used for Advisory Locks - cattr_accessor(:advisory_lockable_function) { "pg_try_advisory_lock" } + class_attribute :advisory_lockable_function, default: "pg_try_advisory_lock" # Attempt to acquire an advisory lock on the selected records and # return only those records for which a lock could be acquired. - # @!method advisory_lock(column: advisory_lockable_column, function: advisory_lockable_function) + # @!method advisory_lock(column: _advisory_lockable_column, function: advisory_lockable_function) # @!scope class # @param column [String, Symbol] column values to Advisory Lock against # @param function [String, Symbol] Postgres Advisory Lock function name to use # @return [ActiveRecord::Relation] # A relation selecting only the records that were locked. - scope :advisory_lock, (lambda do |column: advisory_lockable_column, function: advisory_lockable_function| + scope :advisory_lock, (lambda do |column: _advisory_lockable_column, function: advisory_lockable_function| original_query = self cte_table = Arel::Table.new(:rows) cte_query = original_query.select(primary_key, column).except(:limit) cte_type = if supports_cte_materialization_specifiers? @@ -62,17 +62,17 @@ # data about existing locks) such that each row in the main query joins # to all the advisory locks associated with that row. # # For details on +pg_locks+, see # {https://www.postgresql.org/docs/current/view-pg-locks.html}. - # @!method joins_advisory_locks(column: advisory_lockable_column) + # @!method joins_advisory_locks(column: _advisory_lockable_column) # @!scope class # @param column [String, Symbol] column values to Advisory Lock against # @return [ActiveRecord::Relation] # @example Get the records that have a session awaiting a lock: # MyLockableRecord.joins_advisory_locks.where("pg_locks.granted = ?", false) - scope :joins_advisory_locks, (lambda do |column: advisory_lockable_column| + scope :joins_advisory_locks, (lambda do |column: _advisory_lockable_column| join_sql = <<~SQL.squish LEFT JOIN pg_locks ON pg_locks.locktype = 'advisory' AND pg_locks.objsubid = 1 AND pg_locks.classid = ('x' || substr(md5(:table_name || #{quoted_table_name}.#{connection.quote_column_name(column)}::text), 1, 16))::bit(32)::int AND pg_locks.objid = (('x' || substr(md5(:table_name || #{quoted_table_name}.#{connection.quote_column_name(column)}::text), 1, 16))::bit(64) << 32)::bit(32)::int @@ -80,30 +80,30 @@ joins(sanitize_sql_for_conditions([join_sql, { table_name: table_name }])) end) # Find records that do not have an advisory lock on them. - # @!method advisory_unlocked(column: advisory_lockable_column) + # @!method advisory_unlocked(column: _advisory_lockable_column) # @!scope class # @param column [String, Symbol] column values to Advisory Lock against # @return [ActiveRecord::Relation] - scope :advisory_unlocked, ->(column: advisory_lockable_column) { joins_advisory_locks(column: column).where(pg_locks: { locktype: nil }) } + scope :advisory_unlocked, ->(column: _advisory_lockable_column) { joins_advisory_locks(column: column).where(pg_locks: { locktype: nil }) } # Find records that have an advisory lock on them. - # @!method advisory_locked(column: advisory_lockable_column) + # @!method advisory_locked(column: _advisory_lockable_column) # @!scope class # @param column [String, Symbol] column values to Advisory Lock against # @return [ActiveRecord::Relation] - scope :advisory_locked, ->(column: advisory_lockable_column) { joins_advisory_locks(column: column).where.not(pg_locks: { locktype: nil }) } + scope :advisory_locked, ->(column: _advisory_lockable_column) { joins_advisory_locks(column: column).where.not(pg_locks: { locktype: nil }) } # Find records with advisory locks owned by the current Postgres # session/connection. - # @!method advisory_locked(column: advisory_lockable_column) + # @!method advisory_locked(column: _advisory_lockable_column) # @!scope class # @param column [String, Symbol] column values to Advisory Lock against # @return [ActiveRecord::Relation] - scope :owns_advisory_locked, ->(column: advisory_lockable_column) { joins_advisory_locks(column: column).where('"pg_locks"."pid" = pg_backend_pid()') } + scope :owns_advisory_locked, ->(column: _advisory_lockable_column) { joins_advisory_locks(column: column).where('"pg_locks"."pid" = pg_backend_pid()') } # Whether an advisory lock should be acquired in the same transaction # that created the record. # # This helps prevent another thread or database session from acquiring a @@ -141,28 +141,34 @@ # # @example Work on the first two +MyLockableRecord+ objects that could be locked: # MyLockableRecord.order(created_at: :asc).limit(2).with_advisory_lock do |record| # do_something_with record # end - def with_advisory_lock(column: advisory_lockable_column, function: advisory_lockable_function, unlock_session: false) + def with_advisory_lock(column: _advisory_lockable_column, function: advisory_lockable_function, unlock_session: false) raise ArgumentError, "Must provide a block" unless block_given? records = advisory_lock(column: column, function: function).to_a begin yield(records) ensure if unlock_session advisory_unlock_session else records.each do |record| - key = [table_name, record[advisory_lockable_column]].join + key = [table_name, record[_advisory_lockable_column]].join record.advisory_unlock(key: key, function: advisory_unlockable_function(function)) end end end end + # Allow advisory_lockable_column to be a `Concurrent::Delay` + def _advisory_lockable_column + column = advisory_lockable_column + column.respond_to?(:value) ? column.value : column + end + def supports_cte_materialization_specifiers? return @_supports_cte_materialization_specifiers if defined?(@_supports_cte_materialization_specifiers) @_supports_cte_materialization_specifiers = connection.postgresql_version >= 120000 end @@ -306,10 +312,10 @@ end # Default Advisory Lock key # @return [String] def lockable_key - [self.class.table_name, self[self.class.advisory_lockable_column]].join + [self.class.table_name, self[self.class._advisory_lockable_column]].join end delegate :pg_or_jdbc_query, to: :class end end