Sha256: 2873d6f6ebef32affd59b58d833267b5af29b241a839526455b04f4a78ec3e04

Contents?: true

Size: 1.55 KB

Versions: 1

Compression:

Stored size: 1.55 KB

Contents

module SidekiqUniqueJobs
  module Lock
    class UntilExecuted
      OK ||= 'OK'.freeze

      include SidekiqUniqueJobs::Unlockable

      extend Forwardable
      def_delegators :Sidekiq, :logger

      def initialize(item, redis_pool = nil)
        @item = item
        @redis_pool = redis_pool
      end

      def execute(callback, &blk)
        operative = true
        send(:after_yield_yield, &blk)
      rescue Sidekiq::Shutdown
        operative = false
        raise
      ensure
        if operative && unlock(:server)
          callback.call
        else
          logger.fatal { "the unique_key: #{unique_key} needs to be unlocked manually" }
        end
      end

      def unlock(scope)
        unless [:server, :api, :test].include?(scope)
          raise ArgumentError, "#{scope} middleware can't #{__method__} #{unique_key}"
        end

        unlock_by_key(unique_key, item[JID_KEY], redis_pool)
      end

      def lock(scope)
        if scope.to_sym != :client
          raise ArgumentError, "#{scope} middleware can't #{__method__} #{unique_key}"
        end

        Scripts::AcquireLock.execute(
          redis_pool,
          unique_key,
          item[JID_KEY],
          max_lock_time
        )
      end
      # rubocop:enable MethodLength

      def unique_key
        @unique_key ||= UniqueArgs.digest(item)
      end

      def max_lock_time
        @max_lock_time ||= QueueLockTimeoutCalculator.for_item(item).seconds
      end

      def after_yield_yield
        yield
      end

      private

      attr_reader :item, :redis_pool
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
sidekiq-unique-jobs-5.0.1 lib/sidekiq_unique_jobs/lock/until_executed.rb