Sha256: 7931568ba2213fc82968b182e1377a183d3a8ddace59ed60af7eaa541a74c413
Contents?: true
Size: 1.76 KB
Versions: 1
Compression:
Stored size: 1.76 KB
Contents
# frozen_string_literal: true module ActiveJob module TrafficControl module Concurrency extend ::ActiveSupport::Concern CONCURRENCY_REENQUEUE_DELAY = ENV["RACK_ENV"] == "test" ? 1...5 : 30...(60 * 5) class_methods do attr_accessor :job_concurrency def concurrency(threshold, drop: true, key: nil, wait_timeout: 0.1, stale_timeout: 60 * 10) raise ArgumentError, "Concurrent jobs needs to be an integer > 0" if threshold.to_i < 1 @job_concurrency = { threshold: threshold.to_i, drop: drop, wait_timeout: wait_timeout.to_f, stale_timeout: stale_timeout.to_f, key: key } end def concurrency_lock_key(job) lock_key("concurrency", job, job_concurrency) end end included do include ActiveJob::TrafficControl::Base around_perform do |job, block| if self.class.job_concurrency.present? lock_options = { resources: self.class.job_concurrency[:threshold], acquisition_lock: self.class.job_concurrency[:wait_timeout], stale_lock_expiration: self.class.job_concurrency[:stale_timeout] } with_lock_client(self.class.concurrency_lock_key(job), lock_options) do |client| locked = client.lock do block.call true end unless locked if self.class.job_concurrency[:drop] drop("concurrency") else reenqueue(CONCURRENCY_REENQUEUE_DELAY, "concurrency") end end end else block.call end end end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
activejob-traffic_control-0.1.1 | lib/active_job/traffic_control/concurrency.rb |