lib/amqp/queue.rb in amqp-1.0.1 vs lib/amqp/queue.rb in amqp-1.0.2
- old
+ new
@@ -183,37 +183,32 @@
# a deferrable that we use to delay operations until this queue is actually declared.
# one reason for this is to support a case when a server-named queue is immediately bound.
# it's crazy, but 0.7.x supports it, so... MK.
@declaration_deferrable = AMQ::Client::EventMachineClient::Deferrable.new
- if @opts[:nowait]
- @status = :opened
- block.call(self) if block
- else
- @status = :opening
- end
-
super(channel.connection, channel, name)
shim = Proc.new do |q, declare_ok|
- @declaration_deferrable.succeed
-
case block.arity
when 1 then block.call(q)
else
block.call(q, declare_ok)
end
end
@channel.once_open do
+ if @opts[:nowait]
+ @declaration_deferrable.succeed
+ block.call(self) if block
+ end
+
if block
self.declare(@opts[:passive], @opts[:durable], @opts[:exclusive], @opts[:auto_delete], @opts[:nowait], @opts[:arguments], &shim)
else
- injected_callback = Proc.new { @declaration_deferrable.succeed }
# we cannot pass :nowait as true here, AMQ::Client::Queue will (rightfully) raise an exception because
# it has no idea about crazy edge cases we are trying to support for sake of backwards compatibility. MK.
- self.declare(@opts[:passive], @opts[:durable], @opts[:exclusive], @opts[:auto_delete], false, @opts[:arguments], &injected_callback)
+ self.declare(@opts[:passive], @opts[:durable], @opts[:exclusive], @opts[:auto_delete], false, @opts[:arguments])
end
end
end
# Defines a callback that will be executed once queue is declared. More than one callback can be defined.
@@ -317,11 +312,10 @@
@channel.queues.delete(old_name)
end
self.redeclare do
- @declaration_deferrable.succeed
self.rebind
@consumers.each { |tag, consumer| consumer.auto_recover }
self.exec_callback_yielding_self(:after_recovery)
@@ -888,9 +882,15 @@
def handle_connection_interruption(method = nil)
super(method)
@declaration_deferrable = EventMachine::DefaultDeferrable.new
end
+
+ def handle_declare_ok(method)
+ super(method)
+ @declaration_deferrable.succeed
+ end
+
protected
# @private
def self.add_default_options(name, opts, block)