Sha256: d31781432be3d7a7d0807962c641e6f5186538f56395177b5118cc4428df45fc

Contents?: true

Size: 1.55 KB

Versions: 1

Compression:

Stored size: 1.55 KB

Contents

module Reqless
  module JobReservers
    module Strategies
    end
  end
end

# This module provides the different kinds of queue sources used by qmore
module Reqless::JobReservers::Strategies::Sources
  # Direct source uses a client to generate the queues we should
  # pull work from. Ignores any queues that do not have tasks available.
  def self.direct(client)
    Enumerator.new do |yielder|
      queues = client.queues.counts.select do |queue|
        %w(waiting recurring depends stalled scheduled).any? {|state| queue[state].to_i > 0 }
      end

      queues.each do |queue|
        yielder << client.queues[queue['name']]
      end
    end
  end

  # Background Queue source runs in a background thread
  # to periodically update the queues available.
  class Background
    include Enumerable
    attr_reader :delegate, :delay
    # @param [Enumerator] delegate queue source to load the queues from.
    # @param [Integer] delay - how long between updates
    def initialize(delegate, delay)
      @delegate = delegate
      @delay = delay
    end

    # Spawns a thread to periodically update the
    # queues.
    # @return [Thread] returns the spawned thread.
    def start
      @stop   = false
      @queues = delegate.to_a
      Thread.new do
        begin
          loop do
            sleep delay
            break if @stop
            @queues = delegate.to_a
          end
        rescue => e
          retry
        end
      end
    end

    def stop
      @stop = true
    end

    def each(&block)
      @queues.each { |q| block.call(q) }
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
reqless-0.0.5 lib/reqless/job_reservers/strategies/sources.rb