lib/amqp/queue.rb in amqp-0.7.1 vs lib/amqp/queue.rb in amqp-0.7.2
- old
+ new
@@ -1,11 +1,11 @@
# encoding: utf-8
module AMQP
class Queue
def self.add_default_options(name, opts, block)
- { :queue => name, :nowait => block.nil? }.merge(opts)
+ { :queue => name, :nowait => (block.nil? && !name.empty?) }.merge(opts)
end
# Queues store and forward messages. Queues can be configured in the server
# or created at runtime. Queues must be attached to at least one exchange
# in order to receive messages from publishers.
@@ -64,15 +64,23 @@
# If set, the server will not respond to the method. The client should
# not wait for a reply method. If the server could not complete the
# method it will raise a channel or connection exception.
#
def initialize(mq, name, opts = {}, &block)
- @mq = mq
- @opts = self.class.add_default_options(name, opts, block)
+ raise ArgumentError, "queue name must not be nil. Use '' (empty string) for server-named queues." if name.nil?
+
+ @mq = mq
+ @opts = self.class.add_default_options(name, opts, block)
@bindings ||= {}
- @name = name unless name.empty?
@status = @opts[:nowait] ? :unknown : :unfinished
+
+ if name.empty?
+ @mq.queues_awaiting_declare_ok.push(self)
+ else
+ @name = name
+ end
+
@mq.callback {
@mq.send Protocol::Queue::Declare.new(@opts)
}
self.callback = block
@@ -464,9 +472,12 @@
@mq.queues.delete(@name) if @opts[:auto_delete]
@consumer_tag = nil
end
def reset
+ unsubscribe
+ cancelled
+
@deferred_status = nil
initialize @mq, @name, @opts
binds = @bindings
@bindings = {}