Sha256: 686dacad73c0195df8b4d2c05a7c95376cd3f865aa2cf3ac732c00b525838e0c
Contents?: true
Size: 1.41 KB
Versions: 4
Compression:
Stored size: 1.41 KB
Contents
class MQ class Queue include AMQP def initialize mq, name, opts = {} @mq = mq @name = name @mq.callback{ @mq.send Protocol::Queue::Declare.new({ :queue => name, :nowait => true }.merge(opts)) } end attr_reader :name def bind exchange, opts = {} @mq.callback{ @mq.send Protocol::Queue::Bind.new({ :queue => name, :exchange => exchange.respond_to?(:name) ? exchange.name : exchange, :routing_key => opts.delete(:key), :nowait => true }.merge(opts)) } self end def subscribe opts = {}, &blk @on_msg = blk @mq.callback{ @mq.send Protocol::Basic::Consume.new({ :queue => name, :consumer_tag => name, :no_ack => true, :nowait => true }.merge(opts)) } self end def publish data, opts = {} exchange.publish(data, opts) end def receive headers, body if @on_msg @on_msg.call *(@on_msg.arity == 1 ? [body] : [headers, body]) end end private def exchange @exchange ||= Exchange.new(@mq, :direct, '', :key => name) end end end
Version data entries
4 entries across 4 versions & 2 rubygems
Version | Path |
---|---|
tmm1-amqp-0.5.1 | lib/mq/queue.rb |
tmm1-amqp-0.5.2 | lib/mq/queue.rb |
amqp-0.5.1 | lib/mq/queue.rb |
amqp-0.5.2 | lib/mq/queue.rb |