Sha256: 3db5d205ada3969ae2f294da0a01e08d1ce63f54e25f03d7ca81345e4b4184cb

Contents?: true

Size: 1.66 KB

Versions: 1

Compression:

Stored size: 1.66 KB

Contents

require 'sidekiq'
require 'sidekiq/util'
require 'celluloid'

module Sidekiq
  module Scheduled

    POLL_INTERVAL = 15

    ##
    # The Poller checks Redis every N seconds for messages in the retry or scheduled
    # set have passed their timestamp and should be enqueued.  If so, it
    # just pops the message back onto its original queue so the
    # workers can pick it up like any other message.
    class Poller
      include Celluloid
      include Sidekiq::Util

      SETS = %w(retry schedule)

      def poll(first_time=false)
        watchdog('scheduling poller thread died!') do
          add_jitter if first_time

          # A message's "score" in Redis is the time at which it should be processed.
          # Just check Redis for the set of messages with a timestamp before now.
          now = Time.now.to_f.to_s
          Sidekiq.redis do |conn|
            SETS.each do |sorted_set|
              (messages, _) = conn.multi do
                conn.zrangebyscore(sorted_set, '-inf', now)
                conn.zremrangebyscore(sorted_set, '-inf', now)
              end

              messages.each do |message|
                logger.debug { "enqueued #{sorted_set}: #{message}" }
                msg = Sidekiq.load_json(message)
                conn.rpush("queue:#{msg['queue']}", message)
              end
            end
          end

          after(POLL_INTERVAL) { poll }
        end
      end

      private

      def add_jitter
        begin
          sleep(POLL_INTERVAL * rand)
        rescue Celluloid::Task::TerminatedError
          # Hit Ctrl-C when Sidekiq is finished booting and we have a chance
          # to get here.
        end
      end

    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
sidekiq-2.0.3 lib/sidekiq/scheduled.rb