Sha256: 7e5d417237544a159c968d953b9427b8e5f74b6cef2bf34cebdc270c2685ffcf

Contents?: true

Size: 632 Bytes

Versions: 9

Compression:

Stored size: 632 Bytes

Contents

require 'mq'

class JobQueue::AMQPAdapter
  def initialize(options = {})
    amq = MQ.new
    @exchange = amq.direct('photo', :durable => true)
    @queue = amq.queue('photo_worker', :durable => true)
    @queue.bind(@exchange)
  end
  
  def put(string)
    @queue.publish(string, :persistent => true)
  end
  
  def subscribe(error_report, &block)
    EM.add_periodic_timer(0) do
      begin
        @queue.pop do |header, body| 
          next unless body
          JobQueue.logger.info "AMQP received #{body}"
          yield body
        end
      rescue => e
        error_report.call(job.body, e)
      end
    end
  end
end

Version data entries

9 entries across 9 versions & 2 rubygems

Version Path
mloughran-job_queue-0.0.4 lib/job_queue/adapters/amqp_adapter.rb
mloughran-job_queue-0.0.5 lib/job_queue/adapters/amqp_adapter.rb
mloughran-job_queue-0.0.6 lib/job_queue/adapters/amqp_adapter.rb
mloughran-job_queue-0.0.7 lib/job_queue/adapters/amqp_adapter.rb
mloughran-job_queue-0.0.8 lib/job_queue/adapters/amqp_adapter.rb
newbamboo-job_queue-0.0.5 lib/job_queue/adapters/amqp_adapter.rb
newbamboo-job_queue-0.0.6 lib/job_queue/adapters/amqp_adapter.rb
newbamboo-job_queue-0.0.7 lib/job_queue/adapters/amqp_adapter.rb
newbamboo-job_queue-0.0.8 lib/job_queue/adapters/amqp_adapter.rb