lib/good_job/lockable.rb in good_job-1.9.6 vs lib/good_job/lockable.rb in good_job-1.10.0

- old
+ new

@@ -20,32 +20,45 @@ # Indicates an advisory lock is already held on a record by another # database session. RecordAlreadyAdvisoryLockedError = Class.new(StandardError) included do + # Default column to be used when creating Advisory Locks + cattr_accessor(:advisory_lockable_column, instance_accessor: false) { primary_key } + + # Default Postgres function to be used for Advisory Locks + cattr_accessor(:advisory_lockable_function) { "pg_try_advisory_lock" } + # Attempt to acquire an advisory lock on the selected records and # return only those records for which a lock could be acquired. - # @!method advisory_lock + # @!method advisory_lock(column: advisory_lockable_column, function: advisory_lockable_function) # @!scope class + # @param column [String, Symbol] column values to Advisory Lock against + # @param function [String, Symbol] Postgres Advisory Lock function name to use # @return [ActiveRecord::Relation] # A relation selecting only the records that were locked. - scope :advisory_lock, (lambda do + scope :advisory_lock, (lambda do |column: advisory_lockable_column, function: advisory_lockable_function| original_query = self cte_table = Arel::Table.new(:rows) - cte_query = original_query.select(primary_key).except(:limit) + cte_query = original_query.select(primary_key, column).except(:limit) cte_type = if supports_cte_materialization_specifiers? 'MATERIALIZED' else '' end composed_cte = Arel::Nodes::As.new(cte_table, Arel::Nodes::SqlLiteral.new([cte_type, "(", cte_query.to_sql, ")"].join(' '))) + # In addition to an advisory lock, there is also a FOR UPDATE SKIP LOCKED + # because this causes the query to skip jobs that were completed (and deleted) + # by another session in the time since the table snapshot was taken. + # In rare cases under high concurrency levels, leaving this out can result in double executions. query = cte_table.project(cte_table[:id]) .with(composed_cte) - .where(Arel.sql(sanitize_sql_for_conditions(["pg_try_advisory_lock(('x' || substr(md5(:table_name || #{connection.quote_table_name(cte_table.name)}.#{quoted_primary_key}::text), 1, 16))::bit(64)::bigint)", { table_name: table_name }]))) + .where(Arel.sql(sanitize_sql_for_conditions(["#{function}(('x' || substr(md5(:table_name || #{connection.quote_table_name(cte_table.name)}.#{connection.quote_column_name(column)}::text), 1, 16))::bit(64)::bigint)", { table_name: table_name }]))) + .lock(Arel.sql("FOR UPDATE SKIP LOCKED")) limit = original_query.arel.ast.limit query.limit = limit.value if limit.present? unscoped.where(arel_table[primary_key].in(query)).merge(original_query.only(:order)) @@ -55,44 +68,48 @@ # data about existing locks) such that each row in the main query joins # to all the advisory locks associated with that row. # # For details on +pg_locks+, see # {https://www.postgresql.org/docs/current/view-pg-locks.html}. - # @!method joins_advisory_locks + # @!method joins_advisory_locks(column: advisory_lockable_column) # @!scope class + # @param column [String, Symbol] column values to Advisory Lock against # @return [ActiveRecord::Relation] # @example Get the records that have a session awaiting a lock: # MyLockableRecord.joins_advisory_locks.where("pg_locks.granted = ?", false) - scope :joins_advisory_locks, (lambda do + scope :joins_advisory_locks, (lambda do |column: advisory_lockable_column| join_sql = <<~SQL.squish LEFT JOIN pg_locks ON pg_locks.locktype = 'advisory' AND pg_locks.objsubid = 1 - AND pg_locks.classid = ('x' || substr(md5(:table_name || #{quoted_table_name}.#{quoted_primary_key}::text), 1, 16))::bit(32)::int - AND pg_locks.objid = (('x' || substr(md5(:table_name || #{quoted_table_name}.#{quoted_primary_key}::text), 1, 16))::bit(64) << 32)::bit(32)::int + AND pg_locks.classid = ('x' || substr(md5(:table_name || #{quoted_table_name}.#{connection.quote_column_name(column)}::text), 1, 16))::bit(32)::int + AND pg_locks.objid = (('x' || substr(md5(:table_name || #{quoted_table_name}.#{connection.quote_column_name(column)}::text), 1, 16))::bit(64) << 32)::bit(32)::int SQL joins(sanitize_sql_for_conditions([join_sql, { table_name: table_name }])) end) # Find records that do not have an advisory lock on them. - # @!method advisory_unlocked + # @!method advisory_unlocked(column: advisory_lockable_column) # @!scope class + # @param column [String, Symbol] column values to Advisory Lock against # @return [ActiveRecord::Relation] - scope :advisory_unlocked, -> { joins_advisory_locks.where(pg_locks: { locktype: nil }) } + scope :advisory_unlocked, ->(column: advisory_lockable_column) { joins_advisory_locks(column: column).where(pg_locks: { locktype: nil }) } # Find records that have an advisory lock on them. - # @!method advisory_locked + # @!method advisory_locked(column: advisory_lockable_column) # @!scope class + # @param column [String, Symbol] column values to Advisory Lock against # @return [ActiveRecord::Relation] - scope :advisory_locked, -> { joins_advisory_locks.where.not(pg_locks: { locktype: nil }) } + scope :advisory_locked, ->(column: advisory_lockable_column) { joins_advisory_locks(column: column).where.not(pg_locks: { locktype: nil }) } # Find records with advisory locks owned by the current Postgres # session/connection. - # @!method advisory_locked + # @!method advisory_locked(column: advisory_lockable_column) # @!scope class + # @param column [String, Symbol] column values to Advisory Lock against # @return [ActiveRecord::Relation] - scope :owns_advisory_locked, -> { joins_advisory_locks.where('"pg_locks"."pid" = pg_backend_pid()') } + scope :owns_advisory_locked, ->(column: advisory_lockable_column) { joins_advisory_locks(column: column).where('"pg_locks"."pid" = pg_backend_pid()') } # Whether an advisory lock should be acquired in the same transaction # that created the record. # # This helps prevent another thread or database session from acquiring a @@ -120,140 +137,179 @@ # Note that this will not block and wait for locks to be acquired. # Instead, it will acquire a lock on all the selected records that it # can (as in {Lockable.advisory_lock}) and only pass those that could be # locked to the block. # + # @param column [String, Symbol] name of advisory lock or unlock function + # @param function [String, Symbol] Postgres Advisory Lock function name to use + # @param unlock_session [Boolean] Whether to unlock all advisory locks in the session afterwards # @yield [Array<Lockable>] the records that were successfully locked. # @return [Object] the result of the block. # # @example Work on the first two +MyLockableRecord+ objects that could be locked: # MyLockableRecord.order(created_at: :asc).limit(2).with_advisory_lock do |record| # do_something_with record # end - def with_advisory_lock + def with_advisory_lock(column: advisory_lockable_column, function: advisory_lockable_function, unlock_session: false) raise ArgumentError, "Must provide a block" unless block_given? - records = advisory_lock.to_a + records = advisory_lock(column: column, function: function).to_a begin yield(records) ensure - records.each(&:advisory_unlock) + if unlock_session + advisory_unlock_session + else + records.each do |record| + key = [table_name, record[advisory_lockable_column]].join + record.advisory_unlock(key: key, function: advisory_unlockable_function(function)) + end + end end end def supports_cte_materialization_specifiers? return @_supports_cte_materialization_specifiers if defined?(@_supports_cte_materialization_specifiers) @_supports_cte_materialization_specifiers = connection.postgresql_version >= 120000 end + + # Postgres advisory unlocking function for the class + # @param function [String, Symbol] name of advisory lock or unlock function + # @return [Boolean] + def advisory_unlockable_function(function = advisory_lockable_function) + function.to_s.sub("_lock", "_unlock").sub("_try_", "_") + end + + # Unlocks all advisory locks active in the current database session/connection + # @return [void] + def advisory_unlock_session + connection.exec_query("SELECT pg_advisory_unlock_all()::text AS unlocked", 'GoodJob::Lockable Unlock Session').first[:unlocked] + end + + # Converts SQL query strings between PG-compatible and JDBC-compatible syntax + # @param query [String] + # @return [Boolean] + def pg_or_jdbc_query(query) + if Concurrent.on_jruby? + # Replace $1 bind parameters with ? + query.gsub(/\$\d*/, '?') + else + query + end + end end # Acquires an advisory lock on this record if it is not already locked by # another database session. Be careful to ensure you release the lock when # you are done with {#advisory_unlock} (or {#advisory_unlock!} to release # all remaining locks). + # @param key [String, Symbol] Key to Advisory Lock against + # @param function [String, Symbol] Postgres Advisory Lock function name to use # @return [Boolean] whether the lock was acquired. - def advisory_lock + def advisory_lock(key: lockable_key, function: advisory_lockable_function) query = <<~SQL.squish - SELECT 1 AS one - WHERE pg_try_advisory_lock(('x'||substr(md5($1 || $2::text), 1, 16))::bit(64)::bigint) + SELECT #{function}(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint) AS locked SQL - binds = [[nil, self.class.table_name], [nil, send(self.class.primary_key)]] - self.class.connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Advisory Lock', binds).any? + binds = [[nil, key]] + self.class.connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Advisory Lock', binds).first['locked'] end # Releases an advisory lock on this record if it is locked by this database # session. Note that advisory locks stack, so you must call # {#advisory_unlock} and {#advisory_lock} the same number of times. + # @param key [String, Symbol] Key to lock against + # @param function [String, Symbol] Postgres Advisory Lock function name to use # @return [Boolean] whether the lock was released. - def advisory_unlock + def advisory_unlock(key: lockable_key, function: self.class.advisory_unlockable_function(advisory_lockable_function)) query = <<~SQL.squish - SELECT 1 AS one - WHERE pg_advisory_unlock(('x'||substr(md5($1 || $2::text), 1, 16))::bit(64)::bigint) + SELECT #{function}(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint) AS unlocked SQL - binds = [[nil, self.class.table_name], [nil, send(self.class.primary_key)]] - self.class.connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Advisory Unlock', binds).any? + binds = [[nil, key]] + self.class.connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Advisory Unlock', binds).first['unlocked'] end # Acquires an advisory lock on this record or raises # {RecordAlreadyAdvisoryLockedError} if it is already locked by another # database session. + # @param key [String, Symbol] Key to lock against + # @param function [String, Symbol] Postgres Advisory Lock function name to use # @raise [RecordAlreadyAdvisoryLockedError] # @return [Boolean] +true+ - def advisory_lock! - result = advisory_lock + def advisory_lock!(key: lockable_key, function: advisory_lockable_function) + result = advisory_lock(key: key, function: function) result || raise(RecordAlreadyAdvisoryLockedError) end # Acquires an advisory lock on this record and safely releases it after the # passed block is completed. If the record is locked by another database # session, this raises {RecordAlreadyAdvisoryLockedError}. - # + # @param key [String, Symbol] Key to lock against + # @param function [String, Symbol] Postgres Advisory Lock function name to use # @yield Nothing # @return [Object] The result of the block. # # @example # record = MyLockableRecord.first # record.with_advisory_lock do # do_something_with record # end - def with_advisory_lock + def with_advisory_lock(key: lockable_key, function: advisory_lockable_function) raise ArgumentError, "Must provide a block" unless block_given? - advisory_lock! + advisory_lock!(key: key, function: function) yield ensure - advisory_unlock unless $ERROR_INFO.is_a? RecordAlreadyAdvisoryLockedError + advisory_unlock(key: key, function: self.class.advisory_unlockable_function(function)) unless $ERROR_INFO.is_a? RecordAlreadyAdvisoryLockedError end # Tests whether this record has an advisory lock on it. + # @param key [String, Symbol] Key to test lock against # @return [Boolean] - def advisory_locked? + def advisory_locked?(key: lockable_key) query = <<~SQL.squish SELECT 1 AS one FROM pg_locks WHERE pg_locks.locktype = 'advisory' AND pg_locks.objsubid = 1 - AND pg_locks.classid = ('x' || substr(md5($1 || $2::text), 1, 16))::bit(32)::int - AND pg_locks.objid = (('x' || substr(md5($3 || $4::text), 1, 16))::bit(64) << 32)::bit(32)::int + AND pg_locks.classid = ('x' || substr(md5($1::text), 1, 16))::bit(32)::int + AND pg_locks.objid = (('x' || substr(md5($2::text), 1, 16))::bit(64) << 32)::bit(32)::int SQL - binds = [[nil, self.class.table_name], [nil, send(self.class.primary_key)], [nil, self.class.table_name], [nil, send(self.class.primary_key)]] + binds = [[nil, key], [nil, key]] self.class.connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Advisory Locked?', binds).any? end # Tests whether this record is locked by the current database session. + # @param key [String, Symbol] Key to test lock against # @return [Boolean] - def owns_advisory_lock? + def owns_advisory_lock?(key: lockable_key) query = <<~SQL.squish SELECT 1 AS one FROM pg_locks WHERE pg_locks.locktype = 'advisory' AND pg_locks.objsubid = 1 - AND pg_locks.classid = ('x' || substr(md5($1 || $2::text), 1, 16))::bit(32)::int - AND pg_locks.objid = (('x' || substr(md5($3 || $4::text), 1, 16))::bit(64) << 32)::bit(32)::int + AND pg_locks.classid = ('x' || substr(md5($1::text), 1, 16))::bit(32)::int + AND pg_locks.objid = (('x' || substr(md5($2::text), 1, 16))::bit(64) << 32)::bit(32)::int AND pg_locks.pid = pg_backend_pid() SQL - binds = [[nil, self.class.table_name], [nil, send(self.class.primary_key)], [nil, self.class.table_name], [nil, send(self.class.primary_key)]] + binds = [[nil, key], [nil, key]] self.class.connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Owns Advisory Lock?', binds).any? end # Releases all advisory locks on the record that are held by the current # database session. + # @param key [String, Symbol] Key to lock against + # @param function [String, Symbol] Postgres Advisory Lock function name to use # @return [void] - def advisory_unlock! - advisory_unlock while advisory_locked? + def advisory_unlock!(key: lockable_key, function: self.class.advisory_unlockable_function(advisory_lockable_function)) + advisory_unlock(key: key, function: function) while advisory_locked? end - private - - # @param query [String] - # @return [Boolean] - def pg_or_jdbc_query(query) - if Concurrent.on_jruby? - # Replace $1 bind parameters with ? - query.gsub(/\$\d*/, '?') - else - query - end + # Default Advisory Lock key + # @return [String] + def lockable_key + [self.class.table_name, self[self.class.advisory_lockable_column]].join end + + delegate :pg_or_jdbc_query, to: :class end end