lib/amqp/channel.rb in amqp-0.9.9 vs lib/amqp/channel.rb in amqp-0.9.10

- old
+ new

@@ -261,10 +261,13 @@ # # @api plugin def auto_recover return unless auto_recovering? + @channel_is_open_deferrable.fail + @channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new + self.open do @channel_is_open_deferrable.succeed # re-establish prefetch self.prefetch(@options[:prefetch], false) if @options[:prefetch] @@ -284,10 +287,13 @@ # must release after we allocate a new id, otherwise we will end up # with the same value. MK. @id = self.class.next_channel_id self.class.release_channel_id(old_id) + @channel_is_open_deferrable.fail + @channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new + self.open do @channel_is_open_deferrable.succeed # re-establish prefetch self.prefetch(@options[:prefetch], false) if @options[:prefetch] @@ -927,20 +933,31 @@ # Multiple callbacks are supported. If when this moment is called, channel is already # open, block is executed immediately. # # @api public def once_open(&block) - @channel_is_open_deferrable.callback(&block) + @channel_is_open_deferrable.callback do + # guards against cases when deferred operations + # don't complete before the channel is closed + block.call if open? + end end # once_open(&block) alias once_opened once_open + # @return [Boolean] + # @api public + def closing? + self.status == :closing + end + # Closes AMQP channel. # # @api public def close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block) + self.status = :closing r = super(reply_code, reply_text, class_id, method_id, &block) - + r end # @endgroup @@ -1120,10 +1137,10 @@ super(method) self.class.error(method.reply_text) self.class.release_channel_id(@id) end - + # Overrides AMQ::Client::Channel version to also release the channel id # # @private # @api private def handle_close_ok(method)