lib/amqp/channel.rb in amqp-0.9.10 vs lib/amqp/channel.rb in amqp-1.0.0.pre1

- old
+ new

@@ -5,11 +5,11 @@ require "amqp/queue" module AMQP # h2. What are AMQP channels # - # To quote {http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification}: + # To quote {http://bit.ly/hw2ELX AMQP 0.9.1 specification}: # # AMQP is a multi-channelled protocol. Channels provide a way to multiplex # a heavyweight TCP/IP connection into several light weight connections. # This makes the protocol more “firewall friendly” since port usage is predictable. # It also means that traffic shaping and other network QoS features can be easily employed. @@ -138,11 +138,11 @@ # h2. RabbitMQ extensions. # # AMQP gem supports several RabbitMQ extensions taht extend Channel functionality. # Learn more in {file:docs/VendorSpecificExtensions.textile} # - # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 2.2.5) + # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.2.5) class Channel < AMQ::Client::Channel # # API # @@ -261,13 +261,10 @@ # # @api plugin def auto_recover return unless auto_recovering? - @channel_is_open_deferrable.fail - @channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new - self.open do @channel_is_open_deferrable.succeed # re-establish prefetch self.prefetch(@options[:prefetch], false) if @options[:prefetch] @@ -287,13 +284,10 @@ # must release after we allocate a new id, otherwise we will end up # with the same value. MK. @id = self.class.next_channel_id self.class.release_channel_id(old_id) - @channel_is_open_deferrable.fail - @channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new - self.open do @channel_is_open_deferrable.succeed # re-establish prefetch self.prefetch(@options[:prefetch], false) if @options[:prefetch] @@ -377,11 +371,11 @@ # # # @see Channel#default_exchange # @see Exchange # @see Exchange#initialize - # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 3.1.3.1) + # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 3.1.3.1) # # @return [Exchange] # @api public def direct(name = 'amq.direct', opts = {}, &block) if exchange = find_exchange(name) @@ -430,11 +424,11 @@ # end # # # # @see Exchange - # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 2.1.2.4) + # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.1.2.4) # # @return [Exchange] # @api public def default_exchange @default_exchange ||= Exchange.default(self) @@ -485,11 +479,11 @@ # # publish a message # # @see Exchange # @see Exchange#initialize # @see Channel#default_exchange - # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 3.1.3.2) + # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 3.1.3.2) # # @return [Exchange] # @api public def fanout(name = 'amq.fanout', opts = {}, &block) if exchange = find_exchange(name) @@ -601,11 +595,11 @@ # end # # @see Exchange # @see Exchange#initialize # @see http://www.rabbitmq.com/faq.html#Binding-and-Routing RabbitMQ FAQ on routing & wildcards - # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 3.1.3.3) + # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 3.1.3.3) # # @return [Exchange] # @api public def topic(name = 'amq.topic', opts = {}, &block) if exchange = find_exchange(name) @@ -707,11 +701,11 @@ # # # @see Exchange # @see Exchange#initialize # @see Channel#default_exchange - # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 3.1.3.3) + # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 3.1.3.3) # # @return [Exchange] # @api public def headers(name = 'amq.match', opts = {}, &block) if exchange = find_exchange(name) @@ -808,11 +802,11 @@ # @yieldparam [Queue] queue Queue that is successfully declared and is ready to be used. # @yieldparam [AMQP::Protocol::Queue::DeclareOk] declare_ok AMQP queue.declare-ok) instance. # # @see Queue # @see Queue#initialize - # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 2.1.4) + # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.1.4) # # @return [Queue] # @api public def queue(name = AMQ::Protocol::EMPTY_STRING, opts = {}, &block) raise ArgumentError.new("queue name must not be nil; if you want broker to generate queue name for you, pass an empty string") if name.nil? @@ -933,31 +927,20 @@ # Multiple callbacks are supported. If when this moment is called, channel is already # open, block is executed immediately. # # @api public def once_open(&block) - @channel_is_open_deferrable.callback do - # guards against cases when deferred operations - # don't complete before the channel is closed - block.call if open? - end + @channel_is_open_deferrable.callback(&block) end # once_open(&block) alias once_opened once_open - # @return [Boolean] - # @api public - def closing? - self.status == :closing - end - # Closes AMQP channel. # # @api public def close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block) - self.status = :closing r = super(reply_code, reply_text, class_id, method_id, &block) - + r end # @endgroup @@ -972,11 +955,11 @@ # this method is not intended for window control. It does not affect contents returned to # Queue#get callers. # # @param [Boolean] Desired flow state. # - # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.5.2.3.) + # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.5.2.3.) # @api public def flow(active = false, &block) super(active, &block) end @@ -1013,32 +996,32 @@ # Acknowledge one or all messages on the channel. # # @api public # @see #reject # @see #recover - # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.8.3.13.) + # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.13.) def acknowledge(delivery_tag, multiple = false) super(delivery_tag, multiple) end # acknowledge(delivery_tag, multiple = false) # Reject a message with given delivery tag. # # @api public # @see #acknowledge # @see #recover - # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.8.3.14.) + # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.14.) def reject(delivery_tag, requeue = true) super(delivery_tag, requeue) end # reject(delivery_tag, requeue = true) # Notifies AMQ broker that consumer has recovered and unacknowledged messages need # to be redelivered. # # @return [Channel] self # # @note RabbitMQ as of 2.3.1 does not support basic.recover with requeue = false. - # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.8.3.16.) + # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.16.) # @see #acknowledge # @api public def recover(requeue = true, &block) super(requeue, &block) end # recover(requeue = false, &block) @@ -1135,12 +1118,11 @@ # @api private def handle_close(method) super(method) self.class.error(method.reply_text) - self.class.release_channel_id(@id) end - + # Overrides AMQ::Client::Channel version to also release the channel id # # @private # @api private def handle_close_ok(method)