lib/amqp/queue.rb in famoseagle-carrot-0.2.0 vs lib/amqp/queue.rb in famoseagle-carrot-0.3.0
- old
+ new
@@ -1,6 +1,6 @@
-module AMQP
+module Carrot::AMQP
class Queue
attr_reader :name, :server
attr_accessor :delivery_tag
def initialize(server, name, opts = {})
@@ -8,27 +8,19 @@
@opts = opts
@name = name
server.send_frame(
Protocol::Queue::Declare.new({ :queue => name, :nowait => true }.merge(opts))
)
- nil
end
- def delete(opts = {})
- server.send_frame(
- Protocol::Queue::Delete.new({ :queue => name, :nowait => true }.merge(opts))
- )
- nil
- end
-
def pop(opts = {})
self.delivery_tag = nil
server.send_frame(
Protocol::Basic::Get.new({ :queue => name, :consumer_tag => name, :no_ack => !opts.delete(:ack), :nowait => true }.merge(opts))
)
method = server.next_method
- return if method.is_a?(Protocol::Basic::GetEmpty)
+ return unless method.is_a?(Protocol::Basic::GetOk)
self.delivery_tag = method.delivery_tag
header = server.next_payload
msg = server.next_payload
@@ -61,11 +53,40 @@
)
method = server.next_method
[method.message_count, method.consumer_count]
end
+ def bind(exchange, opts = {})
+ exchange = exchange.respond_to?(:name) ? exchange.name : exchange
+ bindings[exchange] = opts
+ server.send_frame(
+ Protocol::Queue::Bind.new({ :queue => name, :exchange => exchange, :routing_key => opts.delete(:key), :nowait => true }.merge(opts))
+ )
+ end
+
+ def unbind(exchange, opts = {})
+ exchange = exchange.respond_to?(:name) ? exchange.name : exchange
+ bindings.delete(exchange)
+
+ server.send_frame(
+ Protocol::Queue::Unbind.new({
+ :queue => name, :exchange => exchange, :routing_key => opts.delete(:key), :nowait => true }.merge(opts)
+ )
+ )
+ end
+
+ def delete(opts = {})
+ server.send_frame(
+ Protocol::Queue::Delete.new({ :queue => name, :nowait => true }.merge(opts))
+ )
+ end
+
private
def exchange
@exchange ||= Exchange.new(server, :direct, '', :key => name)
+ end
+
+ def bindings
+ @bindings ||= {}
end
end
end