Sha256: d6b27fd803228913d700a4df4a26f23ca3a86ef08c7ddb83801daac6497952f7
Contents?: true
Size: 633 Bytes
Versions: 7
Compression:
Stored size: 633 Bytes
Contents
require 'mq' class JobQueue::AMQPAdapter def initialize(options = {}) amq = MQ.new @exchange = amq.direct('photo', :durable => true) @queue = amq.queue('photo_worker', :durable => true) @queue.bind(@exchange) end def put(string) @queue.publish(string, :persistent => true) end def subscribe(error_report, &block) EM.add_periodic_timer(0) do begin @queue.pop do |header, body| next unless body JobQueue.logger.debug "AMQP received #{body}" yield body end rescue => e error_report.call(job.body, e) end end end end
Version data entries
7 entries across 7 versions & 2 rubygems