lib/good_job/lockable.rb in good_job-1.3.4 vs lib/good_job/lockable.rb in good_job-1.3.5
- old
+ new
@@ -30,15 +30,22 @@
# A relation selecting only the records that were locked.
scope :advisory_lock, (lambda do
original_query = self
cte_table = Arel::Table.new(:rows)
- composed_cte = Arel::Nodes::As.new(cte_table, original_query.select(primary_key).except(:limit).arel)
+ cte_query = original_query.select(primary_key).except(:limit)
+ cte_type = if supports_cte_materialization_specifiers?
+ 'MATERIALIZED'
+ else
+ ''
+ end
+ composed_cte = Arel::Nodes::As.new(cte_table, Arel::Nodes::SqlLiteral.new([cte_type, "(", cte_query.to_sql, ")"].join(' ')))
+
query = cte_table.project(cte_table[:id])
- .with(composed_cte)
- .where(Arel.sql(sanitize_sql_for_conditions(["pg_try_advisory_lock(('x' || substr(md5(:table_name || #{connection.quote_table_name(cte_table.name)}.#{quoted_primary_key}::text), 1, 16))::bit(64)::bigint)", { table_name: table_name }])))
+ .with(composed_cte)
+ .where(Arel.sql(sanitize_sql_for_conditions(["pg_try_advisory_lock(('x' || substr(md5(:table_name || #{connection.quote_table_name(cte_table.name)}.#{quoted_primary_key}::text), 1, 16))::bit(64)::bigint)", { table_name: table_name }])))
limit = original_query.arel.ast.limit
query.limit = limit.value if limit.present?
unscoped.where(arel_table[primary_key].in(query)).merge(original_query.only(:order))
@@ -130,10 +137,16 @@
yield(records)
ensure
records.each(&:advisory_unlock)
end
end
+
+ def supports_cte_materialization_specifiers?
+ return @supports_cte_materialization_specifiers if defined?(@supports_cte_materialization_specifiers)
+
+ @supports_cte_materialization_specifiers = ActiveRecord::Base.connection.postgresql_version >= 120000
+ end
end
# Acquires an advisory lock on this record if it is not already locked by
# another database session. Be careful to ensure you release the lock when
# you are done with {#advisory_unlock} (or {#advisory_unlock!} to release
@@ -149,13 +162,14 @@
# Releases an advisory lock on this record if it is locked by this database
# session. Note that advisory locks stack, so you must call
# {#advisory_unlock} and {#advisory_lock} the same number of times.
# @return [Boolean] whether the lock was released.
def advisory_unlock
- where_sql = <<~SQL.squish
- pg_advisory_unlock(('x' || substr(md5(:table_name || :id::text), 1, 16))::bit(64)::bigint)
+ query = <<~SQL.squish
+ SELECT 1 AS one
+ WHERE pg_advisory_unlock(('x'||substr(md5(:table_name || :id::text), 1, 16))::bit(64)::bigint)
SQL
- self.class.unscoped.exists?([where_sql, { table_name: self.class.table_name, id: send(self.class.primary_key) }])
+ self.class.connection.execute(sanitize_sql_for_conditions([query, { table_name: self.class.table_name, id: send(self.class.primary_key) }])).ntuples.positive?
end
# Acquires an advisory lock on this record or raises
# {RecordAlreadyAdvisoryLockedError} if it is already locked by another
# database session.