Sha256: 7931568ba2213fc82968b182e1377a183d3a8ddace59ed60af7eaa541a74c413

Contents?: true

Size: 1.76 KB

Versions: 1

Compression:

Stored size: 1.76 KB

Contents

# frozen_string_literal: true

module ActiveJob
  module TrafficControl
    module Concurrency
      extend ::ActiveSupport::Concern

      CONCURRENCY_REENQUEUE_DELAY = ENV["RACK_ENV"] == "test" ? 1...5 : 30...(60 * 5)

      class_methods do
        attr_accessor :job_concurrency

        def concurrency(threshold, drop: true, key: nil, wait_timeout: 0.1, stale_timeout: 60 * 10)
          raise ArgumentError, "Concurrent jobs needs to be an integer > 0" if threshold.to_i < 1

          @job_concurrency = {
            threshold: threshold.to_i,
            drop: drop,
            wait_timeout: wait_timeout.to_f,
            stale_timeout: stale_timeout.to_f,
            key: key
          }
        end

        def concurrency_lock_key(job)
          lock_key("concurrency", job, job_concurrency)
        end
      end

      included do
        include ActiveJob::TrafficControl::Base

        around_perform do |job, block|
          if self.class.job_concurrency.present?
            lock_options = {
              resources: self.class.job_concurrency[:threshold],
              acquisition_lock: self.class.job_concurrency[:wait_timeout],
              stale_lock_expiration: self.class.job_concurrency[:stale_timeout]
            }

            with_lock_client(self.class.concurrency_lock_key(job), lock_options) do |client|
              locked = client.lock do
                block.call
                true
              end

              unless locked
                if self.class.job_concurrency[:drop]
                  drop("concurrency")
                else
                  reenqueue(CONCURRENCY_REENQUEUE_DELAY, "concurrency")
                end
              end
            end
          else
            block.call
          end
        end
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
activejob-traffic_control-0.1.1 lib/active_job/traffic_control/concurrency.rb