lib/amqp/channel.rb in amqp-0.9.10 vs lib/amqp/channel.rb in amqp-1.0.0.pre1
- old
+ new
@@ -5,11 +5,11 @@
require "amqp/queue"
module AMQP
# h2. What are AMQP channels
#
- # To quote {http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification}:
+ # To quote {http://bit.ly/hw2ELX AMQP 0.9.1 specification}:
#
# AMQP is a multi-channelled protocol. Channels provide a way to multiplex
# a heavyweight TCP/IP connection into several light weight connections.
# This makes the protocol more “firewall friendly” since port usage is predictable.
# It also means that traffic shaping and other network QoS features can be easily employed.
@@ -138,11 +138,11 @@
# h2. RabbitMQ extensions.
#
# AMQP gem supports several RabbitMQ extensions taht extend Channel functionality.
# Learn more in {file:docs/VendorSpecificExtensions.textile}
#
- # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 2.2.5)
+ # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.2.5)
class Channel < AMQ::Client::Channel
#
# API
#
@@ -261,13 +261,10 @@
#
# @api plugin
def auto_recover
return unless auto_recovering?
- @channel_is_open_deferrable.fail
- @channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new
-
self.open do
@channel_is_open_deferrable.succeed
# re-establish prefetch
self.prefetch(@options[:prefetch], false) if @options[:prefetch]
@@ -287,13 +284,10 @@
# must release after we allocate a new id, otherwise we will end up
# with the same value. MK.
@id = self.class.next_channel_id
self.class.release_channel_id(old_id)
- @channel_is_open_deferrable.fail
- @channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new
-
self.open do
@channel_is_open_deferrable.succeed
# re-establish prefetch
self.prefetch(@options[:prefetch], false) if @options[:prefetch]
@@ -377,11 +371,11 @@
#
#
# @see Channel#default_exchange
# @see Exchange
# @see Exchange#initialize
- # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 3.1.3.1)
+ # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 3.1.3.1)
#
# @return [Exchange]
# @api public
def direct(name = 'amq.direct', opts = {}, &block)
if exchange = find_exchange(name)
@@ -430,11 +424,11 @@
# end
#
#
#
# @see Exchange
- # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 2.1.2.4)
+ # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.1.2.4)
#
# @return [Exchange]
# @api public
def default_exchange
@default_exchange ||= Exchange.default(self)
@@ -485,11 +479,11 @@
# # publish a message
#
# @see Exchange
# @see Exchange#initialize
# @see Channel#default_exchange
- # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 3.1.3.2)
+ # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 3.1.3.2)
#
# @return [Exchange]
# @api public
def fanout(name = 'amq.fanout', opts = {}, &block)
if exchange = find_exchange(name)
@@ -601,11 +595,11 @@
# end
#
# @see Exchange
# @see Exchange#initialize
# @see http://www.rabbitmq.com/faq.html#Binding-and-Routing RabbitMQ FAQ on routing & wildcards
- # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 3.1.3.3)
+ # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 3.1.3.3)
#
# @return [Exchange]
# @api public
def topic(name = 'amq.topic', opts = {}, &block)
if exchange = find_exchange(name)
@@ -707,11 +701,11 @@
#
#
# @see Exchange
# @see Exchange#initialize
# @see Channel#default_exchange
- # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 3.1.3.3)
+ # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 3.1.3.3)
#
# @return [Exchange]
# @api public
def headers(name = 'amq.match', opts = {}, &block)
if exchange = find_exchange(name)
@@ -808,11 +802,11 @@
# @yieldparam [Queue] queue Queue that is successfully declared and is ready to be used.
# @yieldparam [AMQP::Protocol::Queue::DeclareOk] declare_ok AMQP queue.declare-ok) instance.
#
# @see Queue
# @see Queue#initialize
- # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 2.1.4)
+ # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.1.4)
#
# @return [Queue]
# @api public
def queue(name = AMQ::Protocol::EMPTY_STRING, opts = {}, &block)
raise ArgumentError.new("queue name must not be nil; if you want broker to generate queue name for you, pass an empty string") if name.nil?
@@ -933,31 +927,20 @@
# Multiple callbacks are supported. If when this moment is called, channel is already
# open, block is executed immediately.
#
# @api public
def once_open(&block)
- @channel_is_open_deferrable.callback do
- # guards against cases when deferred operations
- # don't complete before the channel is closed
- block.call if open?
- end
+ @channel_is_open_deferrable.callback(&block)
end # once_open(&block)
alias once_opened once_open
- # @return [Boolean]
- # @api public
- def closing?
- self.status == :closing
- end
-
# Closes AMQP channel.
#
# @api public
def close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block)
- self.status = :closing
r = super(reply_code, reply_text, class_id, method_id, &block)
-
+
r
end
# @endgroup
@@ -972,11 +955,11 @@
# this method is not intended for window control. It does not affect contents returned to
# Queue#get callers.
#
# @param [Boolean] Desired flow state.
#
- # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.5.2.3.)
+ # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.5.2.3.)
# @api public
def flow(active = false, &block)
super(active, &block)
end
@@ -1013,32 +996,32 @@
# Acknowledge one or all messages on the channel.
#
# @api public
# @see #reject
# @see #recover
- # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.8.3.13.)
+ # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.13.)
def acknowledge(delivery_tag, multiple = false)
super(delivery_tag, multiple)
end # acknowledge(delivery_tag, multiple = false)
# Reject a message with given delivery tag.
#
# @api public
# @see #acknowledge
# @see #recover
- # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.8.3.14.)
+ # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.14.)
def reject(delivery_tag, requeue = true)
super(delivery_tag, requeue)
end # reject(delivery_tag, requeue = true)
# Notifies AMQ broker that consumer has recovered and unacknowledged messages need
# to be redelivered.
#
# @return [Channel] self
#
# @note RabbitMQ as of 2.3.1 does not support basic.recover with requeue = false.
- # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.8.3.16.)
+ # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.16.)
# @see #acknowledge
# @api public
def recover(requeue = true, &block)
super(requeue, &block)
end # recover(requeue = false, &block)
@@ -1135,12 +1118,11 @@
# @api private
def handle_close(method)
super(method)
self.class.error(method.reply_text)
- self.class.release_channel_id(@id)
end
-
+
# Overrides AMQ::Client::Channel version to also release the channel id
#
# @private
# @api private
def handle_close_ok(method)