lib/amqp/queue.rb in amqp-1.0.1 vs lib/amqp/queue.rb in amqp-1.0.2

- old
+ new

@@ -183,37 +183,32 @@ # a deferrable that we use to delay operations until this queue is actually declared. # one reason for this is to support a case when a server-named queue is immediately bound. # it's crazy, but 0.7.x supports it, so... MK. @declaration_deferrable = AMQ::Client::EventMachineClient::Deferrable.new - if @opts[:nowait] - @status = :opened - block.call(self) if block - else - @status = :opening - end - super(channel.connection, channel, name) shim = Proc.new do |q, declare_ok| - @declaration_deferrable.succeed - case block.arity when 1 then block.call(q) else block.call(q, declare_ok) end end @channel.once_open do + if @opts[:nowait] + @declaration_deferrable.succeed + block.call(self) if block + end + if block self.declare(@opts[:passive], @opts[:durable], @opts[:exclusive], @opts[:auto_delete], @opts[:nowait], @opts[:arguments], &shim) else - injected_callback = Proc.new { @declaration_deferrable.succeed } # we cannot pass :nowait as true here, AMQ::Client::Queue will (rightfully) raise an exception because # it has no idea about crazy edge cases we are trying to support for sake of backwards compatibility. MK. - self.declare(@opts[:passive], @opts[:durable], @opts[:exclusive], @opts[:auto_delete], false, @opts[:arguments], &injected_callback) + self.declare(@opts[:passive], @opts[:durable], @opts[:exclusive], @opts[:auto_delete], false, @opts[:arguments]) end end end # Defines a callback that will be executed once queue is declared. More than one callback can be defined. @@ -317,11 +312,10 @@ @channel.queues.delete(old_name) end self.redeclare do - @declaration_deferrable.succeed self.rebind @consumers.each { |tag, consumer| consumer.auto_recover } self.exec_callback_yielding_self(:after_recovery) @@ -888,9 +882,15 @@ def handle_connection_interruption(method = nil) super(method) @declaration_deferrable = EventMachine::DefaultDeferrable.new end + + def handle_declare_ok(method) + super(method) + @declaration_deferrable.succeed + end + protected # @private def self.add_default_options(name, opts, block)