lib/good_job/lockable.rb in good_job-0.2.2 vs lib/good_job/lockable.rb in good_job-0.3.0

- old
+ new

@@ -13,90 +13,81 @@ AND pg_locks.objid = (('x'||substr(md5(good_jobs.id::text), 1, 16))::bit(64) << 32)::bit(32)::int SQL end) scope :advisory_unlocked, -> { joins_advisory_locks.where(pg_locks: { locktype: nil }) } - scope :with_advisory_lock, (lambda do - where(<<~SQL) - pg_try_advisory_lock(('x'||substr(md5(id::text), 1, 16))::bit(64)::bigint) - SQL - end) - def self.first_advisory_locked_row(query) - find_by_sql(<<~SQL) + attr_accessor :create_with_advisory_lock + + after_create -> { advisory_lock }, if: :create_with_advisory_lock + end + + class_methods do + def first_advisory_locked_row(query) + find_by_sql(<<~SQL).first WITH rows AS (#{query.to_sql}) - SELECT rows.id + SELECT rows.* FROM rows WHERE pg_try_advisory_lock(('x'||substr(md5(id::text), 1, 16))::bit(64)::bigint) + LIMIT 1 SQL end - # private_class_method :first_advisory_locked_row + end - # https://www.postgresql.org/docs/9.6/view-pg-locks.html - # Advisory locks can be acquired on keys consisting of either a single bigint value or two integer values. - # A bigint key is displayed with its high-order half in the classid column, its low-order half in the objid column, and objsubid equal to 1. - # The original bigint value can be reassembled with the expression (classid::bigint << 32) | objid::bigint. - # Integer keys are displayed with the first key in the classid column, the second key in the objid column, and objsubid equal to 2. - # The actual meaning of the keys is up to the user. Advisory locks are local to each database, so the database column is meaningful for an advisory lock. - def self.advisory_lock_details - connection.select("SELECT * FROM pg_locks WHERE locktype = 'advisory' AND objsubid = 1") - end + def advisory_lock + self.class.connection.execute(sanitize_sql_for_conditions(["SELECT 1 as one WHERE pg_try_advisory_lock(('x'||substr(md5(?), 1, 16))::bit(64)::bigint)", id])).ntuples.positive? + end - def advisory_lock - self.class.connection.execute(sanitize_sql_for_conditions(["SELECT 1 as one WHERE pg_try_advisory_lock(('x'||substr(md5(?), 1, 16))::bit(64)::bigint)", id])).ntuples.positive? - end + def advisory_lock! + result = advisory_lock + result || raise(RecordAlreadyAdvisoryLockedError) + end - def advisory_lock! - result = advisory_lock - result || raise(RecordAlreadyAdvisoryLockedError) - end + def with_advisory_lock + advisory_lock! + yield + rescue StandardError => e + advisory_unlock unless e.is_a? RecordAlreadyAdvisoryLockedError + raise + end - def with_advisory_lock - advisory_lock! - yield - rescue StandardError => e - advisory_unlock unless e.is_a? RecordAlreadyAdvisoryLockedError - raise - end + def advisory_locked? + self.class.connection.execute(<<~SQL).ntuples.positive? + SELECT 1 as one + FROM pg_locks + WHERE + locktype = 'advisory' + AND objsubid = 1 + AND classid = ('x'||substr(md5('#{id}'), 1, 16))::bit(32)::int + AND objid = (('x'||substr(md5('#{id}'), 1, 16))::bit(64) << 32)::bit(32)::int + SQL + end - def advisory_locked? - self.class.connection.execute(<<~SQL).ntuples.positive? - SELECT 1 as one - FROM pg_locks - WHERE - locktype = 'advisory' - AND objsubid = 1 - AND classid = ('x'||substr(md5('#{id}'), 1, 16))::bit(32)::int - AND objid = (('x'||substr(md5('#{id}'), 1, 16))::bit(64) << 32)::bit(32)::int - SQL - end + def owns_advisory_lock? + self.class.connection.execute(<<~SQL).ntuples.positive? + SELECT 1 as one + FROM pg_locks + WHERE + locktype = 'advisory' + AND objsubid = 1 + AND classid = ('x'||substr(md5('#{id}'), 1, 16))::bit(32)::int + AND objid = (('x'||substr(md5('#{id}'), 1, 16))::bit(64) << 32)::bit(32)::int + AND pid = pg_backend_pid() + SQL + end - def owns_advisory_lock? - self.class.connection.execute(<<~SQL).ntuples.positive? - SELECT 1 as one - FROM pg_locks - WHERE - locktype = 'advisory' - AND objsubid = 1 - AND classid = ('x'||substr(md5('#{id}'), 1, 16))::bit(32)::int - AND objid = (('x'||substr(md5('#{id}'), 1, 16))::bit(64) << 32)::bit(32)::int - AND pid = pg_backend_pid() - SQL - end + def advisory_unlock + self.class.connection.execute("SELECT pg_advisory_unlock(('x'||substr(md5('#{id}'), 1, 16))::bit(64)::bigint)").first["pg_advisory_unlock"] + end - def advisory_unlock - self.class.connection.execute("SELECT pg_advisory_unlock(('x'||substr(md5('#{id}'), 1, 16))::bit(64)::bigint)").first["pg_advisory_unlock"] - end + def advisory_unlock! + advisory_unlock while advisory_locked? + end - def advisory_unlock! - advisory_unlock while advisory_locked? - end + private - private - - def sanitize_sql_for_conditions(*args) - # Made public in Rails 5.2 - self.class.send(:sanitize_sql_for_conditions, *args) - end + def sanitize_sql_for_conditions(*args) + # Made public in Rails 5.2 + self.class.send(:sanitize_sql_for_conditions, *args) end end end