Sha256: 7e5d417237544a159c968d953b9427b8e5f74b6cef2bf34cebdc270c2685ffcf
Contents?: true
Size: 632 Bytes
Versions: 9
Compression:
Stored size: 632 Bytes
Contents
require 'mq' class JobQueue::AMQPAdapter def initialize(options = {}) amq = MQ.new @exchange = amq.direct('photo', :durable => true) @queue = amq.queue('photo_worker', :durable => true) @queue.bind(@exchange) end def put(string) @queue.publish(string, :persistent => true) end def subscribe(error_report, &block) EM.add_periodic_timer(0) do begin @queue.pop do |header, body| next unless body JobQueue.logger.info "AMQP received #{body}" yield body end rescue => e error_report.call(job.body, e) end end end end
Version data entries
9 entries across 9 versions & 2 rubygems