Sha256: d6b27fd803228913d700a4df4a26f23ca3a86ef08c7ddb83801daac6497952f7

Contents?: true

Size: 633 Bytes

Versions: 7

Compression:

Stored size: 633 Bytes

Contents

require 'mq'

class JobQueue::AMQPAdapter
  def initialize(options = {})
    amq = MQ.new
    @exchange = amq.direct('photo', :durable => true)
    @queue = amq.queue('photo_worker', :durable => true)
    @queue.bind(@exchange)
  end
  
  def put(string)
    @queue.publish(string, :persistent => true)
  end
  
  def subscribe(error_report, &block)
    EM.add_periodic_timer(0) do
      begin
        @queue.pop do |header, body| 
          next unless body
          JobQueue.logger.debug "AMQP received #{body}"
          yield body
        end
      rescue => e
        error_report.call(job.body, e)
      end
    end
  end
end

Version data entries

7 entries across 7 versions & 2 rubygems

Version Path
mloughran-job_queue-0.0.10 lib/job_queue/adapters/amqp_adapter.rb
mloughran-job_queue-0.0.11 lib/job_queue/adapters/amqp_adapter.rb
mloughran-job_queue-0.0.9 lib/job_queue/adapters/amqp_adapter.rb
job_queue-0.0.12 lib/job_queue/adapters/amqp_adapter.rb
job_queue-0.0.11 lib/job_queue/adapters/amqp_adapter.rb
job_queue-0.0.10 lib/job_queue/adapters/amqp_adapter.rb
job_queue-0.0.9 lib/job_queue/adapters/amqp_adapter.rb