lib/good_job/lockable.rb in good_job-1.3.4 vs lib/good_job/lockable.rb in good_job-1.3.5

- old
+ new

@@ -30,15 +30,22 @@ # A relation selecting only the records that were locked. scope :advisory_lock, (lambda do original_query = self cte_table = Arel::Table.new(:rows) - composed_cte = Arel::Nodes::As.new(cte_table, original_query.select(primary_key).except(:limit).arel) + cte_query = original_query.select(primary_key).except(:limit) + cte_type = if supports_cte_materialization_specifiers? + 'MATERIALIZED' + else + '' + end + composed_cte = Arel::Nodes::As.new(cte_table, Arel::Nodes::SqlLiteral.new([cte_type, "(", cte_query.to_sql, ")"].join(' '))) + query = cte_table.project(cte_table[:id]) - .with(composed_cte) - .where(Arel.sql(sanitize_sql_for_conditions(["pg_try_advisory_lock(('x' || substr(md5(:table_name || #{connection.quote_table_name(cte_table.name)}.#{quoted_primary_key}::text), 1, 16))::bit(64)::bigint)", { table_name: table_name }]))) + .with(composed_cte) + .where(Arel.sql(sanitize_sql_for_conditions(["pg_try_advisory_lock(('x' || substr(md5(:table_name || #{connection.quote_table_name(cte_table.name)}.#{quoted_primary_key}::text), 1, 16))::bit(64)::bigint)", { table_name: table_name }]))) limit = original_query.arel.ast.limit query.limit = limit.value if limit.present? unscoped.where(arel_table[primary_key].in(query)).merge(original_query.only(:order)) @@ -130,10 +137,16 @@ yield(records) ensure records.each(&:advisory_unlock) end end + + def supports_cte_materialization_specifiers? + return @supports_cte_materialization_specifiers if defined?(@supports_cte_materialization_specifiers) + + @supports_cte_materialization_specifiers = ActiveRecord::Base.connection.postgresql_version >= 120000 + end end # Acquires an advisory lock on this record if it is not already locked by # another database session. Be careful to ensure you release the lock when # you are done with {#advisory_unlock} (or {#advisory_unlock!} to release @@ -149,13 +162,14 @@ # Releases an advisory lock on this record if it is locked by this database # session. Note that advisory locks stack, so you must call # {#advisory_unlock} and {#advisory_lock} the same number of times. # @return [Boolean] whether the lock was released. def advisory_unlock - where_sql = <<~SQL.squish - pg_advisory_unlock(('x' || substr(md5(:table_name || :id::text), 1, 16))::bit(64)::bigint) + query = <<~SQL.squish + SELECT 1 AS one + WHERE pg_advisory_unlock(('x'||substr(md5(:table_name || :id::text), 1, 16))::bit(64)::bigint) SQL - self.class.unscoped.exists?([where_sql, { table_name: self.class.table_name, id: send(self.class.primary_key) }]) + self.class.connection.execute(sanitize_sql_for_conditions([query, { table_name: self.class.table_name, id: send(self.class.primary_key) }])).ntuples.positive? end # Acquires an advisory lock on this record or raises # {RecordAlreadyAdvisoryLockedError} if it is already locked by another # database session.