Sha256: 64318d6f7b4c1275caf48829fdbbe15152ed57efdc5732528b389e2cd7c61938

Contents?: true

Size: 1.94 KB

Versions: 6

Compression:

Stored size: 1.94 KB

Contents

require 'sidekiq'
require 'sidekiq/util'
require 'celluloid'
require 'multi_json'

module Sidekiq
  ##
  # Sidekiq's retry support assumes a typical development lifecycle:
  # 0. push some code changes with a bug in it
  # 1. bug causes message processing to fail, sidekiq's middleware captures
  #    the message and pushes it onto a retry queue
  # 2. sidekiq retries messages in the retry queue multiple times with
  #    an exponential delay, the message continues to fail
  # 3. after a few days, a developer deploys a fix.  the message is
  #    reprocessed successfully.
  # 4. if 3 never happens, sidekiq will eventually give up and throw the
  #    message away.
  module Retry

    # delayed_job uses the same basic formula
    MAX_COUNT = 25
    DELAY = proc { |count| (count ** 4) + 15 }
    POLL_INTERVAL = 15

    ##
    # The Poller checks Redis every N seconds for messages in the retry
    # set have passed their retry timestamp and should be retried.  If so, it
    # just pops the message back onto its original queue so the
    # workers can pick it up like any other message.
    class Poller
      include Celluloid
      include Sidekiq::Util

      def poll
        watchdog('retry poller thread died!') do

          Sidekiq.redis do |conn|
            # A message's "score" in Redis is the time at which it should be retried.
            # Just check Redis for the set of messages with a timestamp before now.
            messages = nil
            now = Time.now.to_f.to_s
            (messages, _) = conn.multi do
              conn.zrangebyscore('retry', '-inf', now)
              conn.zremrangebyscore('retry', '-inf', now)
            end

            messages.each do |message|
              logger.debug { "Retrying #{message}" }
              msg = Sidekiq.load_json(message)
              conn.rpush("queue:#{msg['queue']}", message)
            end
          end

          after(POLL_INTERVAL) { poll }
        end
      end
    end
  end
end

Version data entries

6 entries across 6 versions & 2 rubygems

Version Path
kulesa-sidekiq-1.2.3 lib/sidekiq/retry.rb
kulesa-sidekiq-1.2.2 lib/sidekiq/retry.rb
sidekiq-1.2.1 lib/sidekiq/retry.rb
sidekiq-1.2.0 lib/sidekiq/retry.rb
sidekiq-1.1.4 lib/sidekiq/retry.rb
sidekiq-1.1.3 lib/sidekiq/retry.rb