Sha256: 9e09dbe834a711d87e56dd6eaca9c0dc887cda7706ce91710939fc4e727b3d14
Contents?: true
Size: 961 Bytes
Versions: 7
Compression:
Stored size: 961 Bytes
Contents
module Toro class Fetcher include Actor def initialize(options={}) defaults = { queues: [Toro.options[:default_queue]] } options.reverse_merge!(defaults) @queues = options[:queues] @manager = options[:manager] raise 'No manager provided' if @manager.blank? end def notify if @manager.is_ready? job = retrieve @manager.assign(job) if job end end def fetch job = retrieve @manager.async.assign(job) if job end def retrieve job = nil queue_list = @queues.map { |queue| "'#{queue}'" }.join(', ') sql = "SELECT * FROM toro_pop(ARRAY[#{queue_list}]::TEXT[], '#{Toro.process_identity}')" result = nil Toro::Database.with_connection do result = Toro::Database.query(sql).first result = nil if result['id'].nil? end return nil if result.nil? Job.instantiate(result) end end end
Version data entries
7 entries across 7 versions & 1 rubygems