Sha256: e52ade3d002ab2d61b7a7445c7c50b82ee6b1ee4f32579ecaf584bfb11871004

Contents?: true

Size: 1.37 KB

Versions: 1

Compression:

Stored size: 1.37 KB

Contents

require 'sidekiq/debounce/version'
require 'sidekiq/api'

module Sidekiq
  class Debounce
    def call(worker, msg, _queue, redis_pool)
      @worker = worker.is_a?(String) ? worker.constantize : worker
      @msg = msg

      return yield unless debounce?

      redis_pool.with do |conn|
        # Get JID of the already-scheduled job, if there is one
        scheduled_jid = conn.get(debounce_key)

        # Reschedule the old job to when this new job is scheduled for
        # Or yield if there isn't one scheduled yet
        jid = scheduled_jid ? reschedule(scheduled_jid, @msg['at']) : yield

        store_expiry(conn, jid, @msg['at'])
        return false if scheduled_jid
        jid
      end
    end

    private

    def store_expiry(conn, job, time)
      jid = job.respond_to?(:has_key?) && job.key?('jid') ? job['jid'] : job
      conn.set(debounce_key, jid)
      conn.expireat(debounce_key, time.to_i)
    end

    def debounce_key
      hash = Digest::MD5.hexdigest(@msg['args'].to_json)
      @debounce_key ||= "sidekiq_debounce:#{@worker.name}:#{hash}"
    end

    def scheduled_set
      @scheduled_set ||= Sidekiq::ScheduledSet.new
    end

    def reschedule(jid, at)
      job = scheduled_set.find_job(jid)
      job.reschedule(at) unless job.nil?
      jid
    end

    def debounce?
      (@msg['at'] && @worker.get_sidekiq_options['debounce']) || false
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
sidekiq-debounce-1.0.2 lib/sidekiq/debounce.rb