lib/good_job/lockable.rb in good_job-0.3.0 vs lib/good_job/lockable.rb in good_job-0.4.0

- old
+ new

@@ -3,81 +3,89 @@ extend ActiveSupport::Concern RecordAlreadyAdvisoryLockedError = Class.new(StandardError) included do + scope :advisory_lock, (lambda do + original_query = self + + cte_table = Arel::Table.new(:rows) + composed_cte = Arel::Nodes::As.new(cte_table, original_query.select(primary_key).except(:limit).arel) + + query = cte_table.project(cte_table[:id]) + .with(composed_cte) + .where(Arel.sql(sanitize_sql_for_conditions(["pg_try_advisory_lock(('x'||substr(md5(:table_name || \"#{cte_table.name}\".\"#{primary_key}\"::text), 1, 16))::bit(64)::bigint)", { table_name: table_name }]))) + + limit = original_query.arel.ast.limit + query.limit = limit.value if limit.present? + + unscoped.where(arel_table[:id].in(query)).merge(original_query.only(:order)) + end) + scope :joins_advisory_locks, (lambda do - joins(<<~SQL) + join_sql = <<~SQL LEFT JOIN pg_locks ON pg_locks.locktype = 'advisory' AND pg_locks.objsubid = 1 - AND pg_locks.classid = ('x'||substr(md5(good_jobs.id::text), 1, 16))::bit(32)::int - AND pg_locks.objid = (('x'||substr(md5(good_jobs.id::text), 1, 16))::bit(64) << 32)::bit(32)::int + AND pg_locks.classid = ('x'||substr(md5(:table_name || "#{table_name}"."#{primary_key}"::text), 1, 16))::bit(32)::int + AND pg_locks.objid = (('x'||substr(md5(:table_name || "#{table_name}"."#{primary_key}"::text), 1, 16))::bit(64) << 32)::bit(32)::int SQL + + joins(sanitize_sql_for_conditions([join_sql, { table_name: table_name }])) end) scope :advisory_unlocked, -> { joins_advisory_locks.where(pg_locks: { locktype: nil }) } + scope :advisory_locked, -> { joins_advisory_locks.where.not(pg_locks: { locktype: nil }) } + scope :owns_advisory_locked, -> { joins_advisory_locks.where('"pg_locks"."pid" = pg_backend_pid()') } attr_accessor :create_with_advisory_lock - after_create -> { advisory_lock }, if: :create_with_advisory_lock end class_methods do - def first_advisory_locked_row(query) - find_by_sql(<<~SQL).first - WITH rows AS (#{query.to_sql}) - SELECT rows.* - FROM rows - WHERE pg_try_advisory_lock(('x'||substr(md5(id::text), 1, 16))::bit(64)::bigint) - LIMIT 1 - SQL + def with_advisory_lock(&block) + records = advisory_lock.to_a + begin + block.call(records) + ensure + records.each(&:advisory_unlock) + end end end def advisory_lock - self.class.connection.execute(sanitize_sql_for_conditions(["SELECT 1 as one WHERE pg_try_advisory_lock(('x'||substr(md5(?), 1, 16))::bit(64)::bigint)", id])).ntuples.positive? + query = <<~SQL + SELECT 1 AS one + WHERE pg_try_advisory_lock(('x'||substr(md5(:table_name || :id::text), 1, 16))::bit(64)::bigint) + SQL + self.class.connection.execute(sanitize_sql_for_conditions([query, { table_name: self.class.table_name, id: send(self.class.primary_key) }])).ntuples.positive? end + def advisory_unlock + query = <<~SQL + SELECT 1 AS one + WHERE pg_advisory_unlock(('x'||substr(md5(:table_name || :id::text), 1, 16))::bit(64)::bigint) + SQL + self.class.connection.execute(sanitize_sql_for_conditions([query, { table_name: self.class.table_name, id: send(self.class.primary_key) }])).ntuples.positive? + end + def advisory_lock! result = advisory_lock result || raise(RecordAlreadyAdvisoryLockedError) end def with_advisory_lock advisory_lock! yield - rescue StandardError => e - advisory_unlock unless e.is_a? RecordAlreadyAdvisoryLockedError - raise + ensure + advisory_unlock unless $ERROR_INFO.is_a? RecordAlreadyAdvisoryLockedError end def advisory_locked? - self.class.connection.execute(<<~SQL).ntuples.positive? - SELECT 1 as one - FROM pg_locks - WHERE - locktype = 'advisory' - AND objsubid = 1 - AND classid = ('x'||substr(md5('#{id}'), 1, 16))::bit(32)::int - AND objid = (('x'||substr(md5('#{id}'), 1, 16))::bit(64) << 32)::bit(32)::int - SQL + self.class.advisory_locked.where(id: send(self.class.primary_key)).any? end def owns_advisory_lock? - self.class.connection.execute(<<~SQL).ntuples.positive? - SELECT 1 as one - FROM pg_locks - WHERE - locktype = 'advisory' - AND objsubid = 1 - AND classid = ('x'||substr(md5('#{id}'), 1, 16))::bit(32)::int - AND objid = (('x'||substr(md5('#{id}'), 1, 16))::bit(64) << 32)::bit(32)::int - AND pid = pg_backend_pid() - SQL - end - - def advisory_unlock - self.class.connection.execute("SELECT pg_advisory_unlock(('x'||substr(md5('#{id}'), 1, 16))::bit(64)::bigint)").first["pg_advisory_unlock"] + self.class.owns_advisory_locked.where(id: send(self.class.primary_key)).any? end def advisory_unlock! advisory_unlock while advisory_locked? end