Sha256: 9e09dbe834a711d87e56dd6eaca9c0dc887cda7706ce91710939fc4e727b3d14

Contents?: true

Size: 961 Bytes

Versions: 7

Compression:

Stored size: 961 Bytes

Contents

module Toro
  class Fetcher
    include Actor

    def initialize(options={})
      defaults = {
        queues: [Toro.options[:default_queue]]
      }
      options.reverse_merge!(defaults)
      @queues = options[:queues]
      @manager = options[:manager]
      raise 'No manager provided' if @manager.blank?
    end

    def notify
      if @manager.is_ready?
        job = retrieve
        @manager.assign(job) if job
      end
    end

    def fetch
      job = retrieve
      @manager.async.assign(job) if job
    end

    def retrieve
      job = nil
      queue_list = @queues.map { |queue| "'#{queue}'" }.join(', ')
      sql = "SELECT * FROM toro_pop(ARRAY[#{queue_list}]::TEXT[], '#{Toro.process_identity}')"
      result = nil
      Toro::Database.with_connection do
        result = Toro::Database.query(sql).first
        result = nil if result['id'].nil?
      end
      return nil if result.nil?
      Job.instantiate(result)
    end
  end
end

Version data entries

7 entries across 7 versions & 1 rubygems

Version Path
toro-0.2.2 lib/toro/fetcher.rb
toro-0.2.1 lib/toro/fetcher.rb
toro-0.2.0 lib/toro/fetcher.rb
toro-0.1.1 lib/toro/fetcher.rb
toro-0.1.0 lib/toro/fetcher.rb
toro-0.0.3 lib/toro/fetcher.rb
toro-0.0.2 lib/toro/fetcher.rb