Sha256: 4c531431395a7e0c1bde198db45fbfb32c7316e003e7970096ec83f6708da586

Contents?: true

Size: 931 Bytes

Versions: 3

Compression:

Stored size: 931 Bytes

Contents

module Sidekiq
  module ThrottledWorker
    class Fetch < Sidekiq::BasicFetch
      alias original_retrieve_work retrieve_work

      def retrieve_work
        work = original_retrieve_work
        return work if work.nil?

        job_hash = Sidekiq.load_json(work.job)
        worker_class = Object.const_get(job_hash['class'])
        return work if worker_class.get_sidekiq_options["concurrency"].nil?

        if Concurrency.limit?(worker_class, job_hash['jid'])
          job_hash["concurrency_limit_count"] ||= 0
          job_hash["concurrency_limit_count"] += 1
          sleep(rand * [(job_hash["concurrency_limit_count"] - 1) * 0.01, 0.1].min)
          Sidekiq.redis do |conn|
            conn.lpush("queue:#{work.queue_name}", Sidekiq.dump_json(job_hash)) # put in queue end to auto wait some time in busy sidekiq cluster
          end
          nil
        else
          work
        end
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
sidekiq-throttled-worker-0.1.3 lib/sidekiq/throttled_worker/fetch.rb
sidekiq-throttled-worker-0.1.2 lib/sidekiq/throttled_worker/fetch.rb
sidekiq-throttled-worker-0.1.1 lib/sidekiq/throttled_worker/fetch.rb