Sha256: 1a15f587f19c8a6a27572a5e1f1fbe13c7cc161e1cc2d67cbf4fe31b6d0951cb
Contents?: true
Size: 1.39 KB
Versions: 2
Compression:
Stored size: 1.39 KB
Contents
require 'thread' require 'time' module RCelery class Pool def initialize(options={}) @task_queue = Queue.new RCelery.start(options) unless RCelery.running? end def start subscribe end def subscribe # amqp-client has a nice fat TODO in the delivery handler to # ack if necessary; we'll just manually do it, however, the # call to subscribe still needs :ack => true so the server # expects our ack RCelery.queue.subscribe(:ack => true) do |header, payload| begin message = JSON.parse(payload) RCelery::Events.task_received(message['id'], message['task'], message['args'], message['kwargs'], nil, message['eta']) if message['eta'] && Time.parse(message['eta']) > Time.now defer({:message => message, :header => header}) else @task_queue.push({:message => message, :header => header}) end rescue JSON::ParserError # not a message we care about header.ack end end end def defer(task) time_difference = (Time.parse(task[:message]['eta']) - Time.now).to_i EM.add_timer(time_difference) do @task_queue.push(task) end end def poll @task_queue.pop end def unsubscribe RCelery.queue.unsubscribe end def stop unsubscribe RCelery.stop end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
rcelery-1.0.1 | lib/rcelery/pool.rb |
rcelery-1.0.0 | lib/rcelery/pool.rb |