Sha256: 69a265552172e026e582de1a51a0d6fd4c1e46540fc3ae665c3ba807b90c29f0

Contents?: true

Size: 1.42 KB

Versions: 1

Compression:

Stored size: 1.42 KB

Contents

require 'digest'
require 'forwardable'

module SidekiqUniqueJobs
  module Server
    class Middleware
      extend Forwardable
      def_delegators :Sidekiq, :logger
      def_instance_delegator :@worker, :class, :worker_class

      include OptionsWithFallback

      def call(worker, item, queue, redis_pool = nil, &blk)
        @worker = worker
        @redis_pool = redis_pool
        @queue = queue
        @item = item

        send(unique_lock, &blk)
      end

      private

      attr_reader :redis_pool, :worker, :item, :worker_class

      def until_executing
        unlock
        yield
      end

      def until_executed(&block)
        operative = true
        after_yield_yield(&block)
      rescue Sidekiq::Shutdown
        operative = false
        raise
      ensure
        unlock if operative
      end

      def after_yield_yield
        yield
      end

      def while_executing
        lock.synchronize do
          yield
        end
      rescue SidekiqUniqueJobs::RunLockFailed
        return reschedule if reschedule_on_lock_fail
        raise
      end

      def until_timeout
        yield if block_given?
      end

      protected

      def unlock
        after_unlock_hook if lock.unlock(:server)
      end

      def after_unlock_hook
        worker.after_unlock if worker.respond_to?(:after_unlock)
      end

      def reschedule
        Sidekiq::Client.new(redis_pool).raw_push([item])
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
sidekiq-unique-jobs-4.0.0 lib/sidekiq_unique_jobs/server/middleware.rb