Sha256: 1913195d047d45af2b5fa3af2b4ee61bd2b48f5fd7eadaac61a658bc613ba3d0
Contents?: true
Size: 1.24 KB
Versions: 1
Compression:
Stored size: 1.24 KB
Contents
require 'sidekiq/debounce/version' require 'sidekiq/api' module Sidekiq class Debounce def call(worker, msg, _queue, redis_pool) @worker = worker @msg = msg return yield unless debounce? redis_pool.with do |conn| # Get JID of the already-scheduled job, if there is one scheduled_jid = conn.get(debounce_key) # Reschedule the old job to when this new job is scheduled for # Or yield if there isn't one scheduled yet jid = scheduled_jid ? reschedule(scheduled_jid, @msg['at']) : yield store_expiry(conn, jid, @msg['at']) return false if scheduled_jid jid end end private def store_expiry(conn, jid, time) conn.set(debounce_key, jid) conn.expireat(debounce_key, time.to_i) end def debounce_key hash = Digest::MD5.hexdigest(@msg['args'].to_json) @debounce_key ||= "sidekiq_debounce:#{@worker.name}:#{hash}" end def scheduled_set @scheduled_set ||= Sidekiq::ScheduledSet.new end def reschedule(jid, at) job = scheduled_set.find_job(jid) job.reschedule(at) jid end def debounce? (@msg['at'] && @worker.get_sidekiq_options['debounce']) || false end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
sidekiq-debounce-1.0.0 | lib/sidekiq/debounce.rb |