lib/amqp/channel.rb in amqp-0.9.9 vs lib/amqp/channel.rb in amqp-0.9.10
- old
+ new
@@ -261,10 +261,13 @@
#
# @api plugin
def auto_recover
return unless auto_recovering?
+ @channel_is_open_deferrable.fail
+ @channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new
+
self.open do
@channel_is_open_deferrable.succeed
# re-establish prefetch
self.prefetch(@options[:prefetch], false) if @options[:prefetch]
@@ -284,10 +287,13 @@
# must release after we allocate a new id, otherwise we will end up
# with the same value. MK.
@id = self.class.next_channel_id
self.class.release_channel_id(old_id)
+ @channel_is_open_deferrable.fail
+ @channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new
+
self.open do
@channel_is_open_deferrable.succeed
# re-establish prefetch
self.prefetch(@options[:prefetch], false) if @options[:prefetch]
@@ -927,20 +933,31 @@
# Multiple callbacks are supported. If when this moment is called, channel is already
# open, block is executed immediately.
#
# @api public
def once_open(&block)
- @channel_is_open_deferrable.callback(&block)
+ @channel_is_open_deferrable.callback do
+ # guards against cases when deferred operations
+ # don't complete before the channel is closed
+ block.call if open?
+ end
end # once_open(&block)
alias once_opened once_open
+ # @return [Boolean]
+ # @api public
+ def closing?
+ self.status == :closing
+ end
+
# Closes AMQP channel.
#
# @api public
def close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block)
+ self.status = :closing
r = super(reply_code, reply_text, class_id, method_id, &block)
-
+
r
end
# @endgroup
@@ -1120,10 +1137,10 @@
super(method)
self.class.error(method.reply_text)
self.class.release_channel_id(@id)
end
-
+
# Overrides AMQ::Client::Channel version to also release the channel id
#
# @private
# @api private
def handle_close_ok(method)