lib/amqp/queue.rb in famoseagle-carrot-0.2.0 vs lib/amqp/queue.rb in famoseagle-carrot-0.3.0

- old
+ new

@@ -1,6 +1,6 @@ -module AMQP +module Carrot::AMQP class Queue attr_reader :name, :server attr_accessor :delivery_tag def initialize(server, name, opts = {}) @@ -8,27 +8,19 @@ @opts = opts @name = name server.send_frame( Protocol::Queue::Declare.new({ :queue => name, :nowait => true }.merge(opts)) ) - nil end - def delete(opts = {}) - server.send_frame( - Protocol::Queue::Delete.new({ :queue => name, :nowait => true }.merge(opts)) - ) - nil - end - def pop(opts = {}) self.delivery_tag = nil server.send_frame( Protocol::Basic::Get.new({ :queue => name, :consumer_tag => name, :no_ack => !opts.delete(:ack), :nowait => true }.merge(opts)) ) method = server.next_method - return if method.is_a?(Protocol::Basic::GetEmpty) + return unless method.is_a?(Protocol::Basic::GetOk) self.delivery_tag = method.delivery_tag header = server.next_payload msg = server.next_payload @@ -61,11 +53,40 @@ ) method = server.next_method [method.message_count, method.consumer_count] end + def bind(exchange, opts = {}) + exchange = exchange.respond_to?(:name) ? exchange.name : exchange + bindings[exchange] = opts + server.send_frame( + Protocol::Queue::Bind.new({ :queue => name, :exchange => exchange, :routing_key => opts.delete(:key), :nowait => true }.merge(opts)) + ) + end + + def unbind(exchange, opts = {}) + exchange = exchange.respond_to?(:name) ? exchange.name : exchange + bindings.delete(exchange) + + server.send_frame( + Protocol::Queue::Unbind.new({ + :queue => name, :exchange => exchange, :routing_key => opts.delete(:key), :nowait => true }.merge(opts) + ) + ) + end + + def delete(opts = {}) + server.send_frame( + Protocol::Queue::Delete.new({ :queue => name, :nowait => true }.merge(opts)) + ) + end + private def exchange @exchange ||= Exchange.new(server, :direct, '', :key => name) + end + + def bindings + @bindings ||= {} end end end