Sha256: b8a3b729fa9a819b2073a3d32884761774f64761782b7a85263634b2b7785ce0
Contents?: true
Size: 1019 Bytes
Versions: 1
Compression:
Stored size: 1019 Bytes
Contents
class Sneakers::Queue attr_reader :name, :opts, :exchange def initialize(name, opts) @name = name @opts = opts @handler_klass = Sneakers::Config[:handler] end # # :exchange # :heartbeat_interval # :prefetch # :durable # :ack # def subscribe(worker) @bunny = Bunny.new(:heartbeat_interval => @opts[:heartbeat_interval]) @bunny.start(@opts[:amqp]) @channel = @bunny.create_channel @channel.prefetch(@opts[:prefetch]) @exchange = @channel.exchange(@opts[:exchange], :type => :direct, :durable => @opts[:durable]) handler = @handler_klass.new(@channel) queue = @channel.queue(@name, :durable => @opts[:durable]) queue.bind(@exchange, :routing_key => @name) @consumer = queue.subscribe(:block => false, :ack => @opts[:ack]) do | hdr, props, msg | worker.do_work(hdr, props, msg, handler) end nil end def unsubscribe # XXX can we cancel bunny and channel too? @consumer.cancel if @consumer @consumer = nil end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
sneakers-0.0.3 | lib/sneakers/queue.rb |