lib/amqp/channel.rb in amqp-1.0.0.pre2 vs lib/amqp/channel.rb in amqp-1.0.0

- old
+ new

@@ -5,11 +5,11 @@ require "amqp/queue" module AMQP # h2. What are AMQP channels # - # To quote {http://bit.ly/hw2ELX AMQP 0.9.1 specification}: + # To quote {http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification}: # # AMQP is a multi-channelled protocol. Channels provide a way to multiplex # a heavyweight TCP/IP connection into several light weight connections. # This makes the protocol more “firewall friendly” since port usage is predictable. # It also means that traffic shaping and other network QoS features can be easily employed. @@ -135,14 +135,14 @@ # <script src="https://gist.github.com/939483.js?file=gistfile1.rb"></script> # # # h2. RabbitMQ extensions. # - # AMQP gem supports several RabbitMQ extensions taht extend Channel functionality. + # AMQP gem supports several RabbitMQ extensions that extend Channel functionality. # Learn more in {file:docs/VendorSpecificExtensions.textile} # - # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.2.5) + # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 2.2.5) class Channel < AMQ::Client::Channel # # API # @@ -261,10 +261,13 @@ # # @api plugin def auto_recover return unless auto_recovering? + @channel_is_open_deferrable.fail + @channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new + self.open do @channel_is_open_deferrable.succeed # re-establish prefetch self.prefetch(@options[:prefetch], false) if @options[:prefetch] @@ -284,10 +287,13 @@ # must release after we allocate a new id, otherwise we will end up # with the same value. MK. @id = self.class.next_channel_id self.class.release_channel_id(old_id) + @channel_is_open_deferrable.fail + @channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new + self.open do @channel_is_open_deferrable.succeed # re-establish prefetch self.prefetch(@options[:prefetch], false) if @options[:prefetch] @@ -371,11 +377,11 @@ # # # @see Channel#default_exchange # @see Exchange # @see Exchange#initialize - # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 3.1.3.1) + # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 3.1.3.1) # # @return [Exchange] # @api public def direct(name = 'amq.direct', opts = {}, &block) if exchange = find_exchange(name) @@ -424,11 +430,11 @@ # end # # # # @see Exchange - # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.1.2.4) + # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 2.1.2.4) # # @return [Exchange] # @api public def default_exchange @default_exchange ||= Exchange.default(self) @@ -479,11 +485,11 @@ # # publish a message # # @see Exchange # @see Exchange#initialize # @see Channel#default_exchange - # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 3.1.3.2) + # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 3.1.3.2) # # @return [Exchange] # @api public def fanout(name = 'amq.fanout', opts = {}, &block) if exchange = find_exchange(name) @@ -595,11 +601,11 @@ # end # # @see Exchange # @see Exchange#initialize # @see http://www.rabbitmq.com/faq.html#Binding-and-Routing RabbitMQ FAQ on routing & wildcards - # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 3.1.3.3) + # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 3.1.3.3) # # @return [Exchange] # @api public def topic(name = 'amq.topic', opts = {}, &block) if exchange = find_exchange(name) @@ -701,11 +707,11 @@ # # # @see Exchange # @see Exchange#initialize # @see Channel#default_exchange - # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 3.1.3.3) + # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 3.1.3.3) # # @return [Exchange] # @api public def headers(name = 'amq.match', opts = {}, &block) if exchange = find_exchange(name) @@ -802,11 +808,11 @@ # @yieldparam [Queue] queue Queue that is successfully declared and is ready to be used. # @yieldparam [AMQP::Protocol::Queue::DeclareOk] declare_ok AMQP queue.declare-ok) instance. # # @see Queue # @see Queue#initialize - # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.1.4) + # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 2.1.4) # # @return [Queue] # @api public def queue(name = AMQ::Protocol::EMPTY_STRING, opts = {}, &block) raise ArgumentError.new("queue name must not be nil; if you want broker to generate queue name for you, pass an empty string") if name.nil? @@ -927,20 +933,31 @@ # Multiple callbacks are supported. If when this moment is called, channel is already # open, block is executed immediately. # # @api public def once_open(&block) - @channel_is_open_deferrable.callback(&block) + @channel_is_open_deferrable.callback do + # guards against cases when deferred operations + # don't complete before the channel is closed + block.call if open? + end end # once_open(&block) alias once_opened once_open + # @return [Boolean] + # @api public + def closing? + self.status == :closing + end + # Closes AMQP channel. # # @api public def close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block) + self.status = :closing r = super(reply_code, reply_text, class_id, method_id, &block) - + r end # @endgroup @@ -955,11 +972,11 @@ # this method is not intended for window control. It does not affect contents returned to # Queue#get callers. # # @param [Boolean] Desired flow state. # - # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.5.2.3.) + # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.5.2.3.) # @api public def flow(active = false, &block) super(active, &block) end @@ -996,32 +1013,32 @@ # Acknowledge one or all messages on the channel. # # @api public # @see #reject # @see #recover - # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.13.) + # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.8.3.13.) def acknowledge(delivery_tag, multiple = false) super(delivery_tag, multiple) end # acknowledge(delivery_tag, multiple = false) # Reject a message with given delivery tag. # # @api public # @see #acknowledge # @see #recover - # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.14.) + # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.8.3.14.) def reject(delivery_tag, requeue = true) super(delivery_tag, requeue) end # reject(delivery_tag, requeue = true) # Notifies AMQ broker that consumer has recovered and unacknowledged messages need # to be redelivered. # # @return [Channel] self # # @note RabbitMQ as of 2.3.1 does not support basic.recover with requeue = false. - # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.16.) + # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.8.3.16.) # @see #acknowledge # @api public def recover(requeue = true, &block) super(requeue, &block) end # recover(requeue = false, &block) @@ -1118,11 +1135,12 @@ # @api private def handle_close(method) super(method) self.class.error(method.reply_text) + self.class.release_channel_id(@id) end - + # Overrides AMQ::Client::Channel version to also release the channel id # # @private # @api private def handle_close_ok(method)