Sha256: 1a122b69ba8e36f9458412fab80e148f7ebf4cc1a4b6bf5509740e8d4c12ae7f

Contents?: true

Size: 1.7 KB

Versions: 3

Compression:

Stored size: 1.7 KB

Contents

module QPush
  module Server
    # The Delay worker requeues any jobs that have been delayed on our Redis
    # server. Delayed jobs are pulled by a 'zrangebyscore', with the score
    # representing the time the job should be performed.
    #
    class Delay
      def initialize
        @done = false
        @conn = nil
      end

      # Starts our delay process. This will run until instructed to stop.
      #
      def start
        until @done
          QPush.redis.with do |conn|
            @conn = conn
            watch_delay { retrieve_delays }
          end
          sleep 2
        end
      end

      # Shutsdown our dleay process.
      #
      def shutdown
        @done = true
      end

      private

      # Retrieves delayed jobs based on the time they should be performed.
      # If any are found, begin to update them.
      #
      def retrieve_delays
        delays = @conn.zrangebyscore(QPush.config.delay_namespace, 0, Time.now.to_i)
        delays.any? ? update_delays(delays) : @conn.unwatch
      end

      # Removes jobs that have been retrieved and sets them up to be performed.
      #
      def update_delays(delays)
        @conn.multi do |multi|
          multi.zrem(QPush.config.delay_namespace, delays)
          delays.each { |job| perform_job(job) }
        end
      end

      # Add a delayed job to the appropriate perform list.
      #
      def perform_job(json)
        job = Job.new(JSON.parse(json))
        job.api.perform
      rescue => e
        raise ServerError, e.message
      end

      # Performs a watch on our delay list
      #
      def watch_delay
        @conn.watch(QPush.config.delay_namespace) do
          yield if block_given?
        end
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
qpush-0.1.2 lib/qpush/server/delay.rb
qpush-0.1.1 lib/qpush/server/delay.rb
qpush-0.1.0 lib/qpush/server/delay.rb