Sha256: b26db4c7197744fb8e5ea8b141e9ff9a50c5751a0a302441fa512c4d8d7e37b2

Contents?: true

Size: 1.43 KB

Versions: 2

Compression:

Stored size: 1.43 KB

Contents

require 'sidekiq/launcher'
require 'timeout'

module Sidekiq
  class Simulator
    extend Forwardable
    def_delegator SidekiqUniqueJobs, :logger

    attr_reader :queues, :launcher

    def self.process_queue(queue)
      new(queue).process_queue { yield }
    end

    def initialize(queue)
      @queues = [queue].flatten.uniq
      @launcher = Sidekiq::Launcher.new(sidekiq_options(queues))
    end

    def process_queue
      run_launcher { yield }
    ensure
      terminate_launcher
    end

    private

    def run_launcher
      using_timeout(15) do
        launcher.run
        sleep 0.001 until alive?
      end
    rescue Timeout::Error => e
      logger.warn { "Timeout while running #{__method__}" }
      logger.warn { e }
    ensure
      yield
    end

    def terminate_launcher
      if launcher.respond_to?(:alive?)
        launcher.terminate # Better to be fast than graceful for our purposes
      else
        launcher.stop # New sidekiq works better
      end
    end

    def alive?
      if launcher.respond_to?(:alive?)
        launcher.alive?
      else
        launcher.manager.workers.any?
      end
    end

    def stopped?
      !alive?
    end

    def using_timeout(value)
      Timeout.timeout(value) do
        yield
      end
    end

    def sidekiq_options(queues = [])
      { queues: queues,
        concurrency: 3,
        timeout: 3,
        verbose: false,
        logfile: './tmp/sidekiq.log' }
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
sidekiq-unique-jobs-5.0.2 lib/sidekiq/simulator.rb
sidekiq-unique-jobs-5.0.1 lib/sidekiq/simulator.rb