Sha256: fcc75d3c73be1a39abff4c47c30d08595de7ad3ed1f12304ed033901e018d77f

Contents?: true

Size: 618 Bytes

Versions: 2

Compression:

Stored size: 618 Bytes

Contents

require 'mq'

class JobQueue::AMQPAdapter
  def initialize
    amq = MQ.new
    @exchange = amq.direct('photo', :durable => true)
    @queue = amq.queue('photo_worker', :durable => true)
    @queue.bind(@exchange)
  end
  
  def put(string)
    @queue.publish(string, :persistent => true)
  end
  
  def subscribe(error_report, &block)
    EM.add_periodic_timer(0) do
      begin
        @queue.pop do |header, body| 
          next unless body
          JobQueue.logger.info "AMQP received #{body}"
          yield body
        end
      rescue => e
        error_report.call(job.body, e)
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 2 rubygems

Version Path
mloughran-job_queue-0.0.3 lib/job_queue/adapters/amqp_adapter.rb
strobemonkey-job_queue-0.0.5 lib/job_queue/adapters/amqp_adapter.rb