lib/amqp/channel.rb in amqp-0.8.0.rc12 vs lib/amqp/channel.rb in amqp-0.8.0.rc13
- old
+ new
@@ -203,10 +203,11 @@
# @see AMQP::Channel#prefetch
# @api public
def initialize(connection = nil, id = self.class.next_channel_id, options = {}, &block)
raise 'AMQP can only be used from within EM.run {}' unless EM.reactor_running?
+ @options = options
@connection = connection || AMQP.connection || AMQP.start
super(@connection, id)
@rpcs = Hash.new
@@ -239,24 +240,14 @@
self.prefetch(options[:prefetch], false) if options[:prefetch]
end # self.open
end # @connection.on_open
end
- # Takes a block that will be deferred till the moment when channel is considered open
- # (channel.open-ok is received from the broker). If you need to delay an operation
- # till the moment channel is open, this method is what you are looking for.
- #
- # Multiple callbacks are supported. If when this moment is called, channel is already
- # open, block is executed immediately.
- #
- # @api public
- def once_open(&block)
- @channel_is_open_deferrable.callback(&block)
- end # once_open(&block)
- alias once_opened once_open
+ # @group Declaring exchanges
+
# Defines, intializes and returns a direct Exchange instance.
#
# Learn more about direct exchanges in {Exchange Exchange class documentation}.
#
#
@@ -334,10 +325,11 @@
if exchange = find_exchange(name)
extended_opts = Exchange.add_default_options(:direct, name, opts, block)
validate_parameters_match!(exchange, extended_opts)
+ block.call(exchange) if block
exchange
else
register_exchange(Exchange.new(self, :direct, name, opts, &block))
end
end
@@ -441,10 +433,11 @@
if exchange = find_exchange(name)
extended_opts = Exchange.add_default_options(:fanout, name, opts, block)
validate_parameters_match!(exchange, extended_opts)
+ block.call(exchange) if block
exchange
else
register_exchange(Exchange.new(self, :fanout, name, opts, &block))
end
end
@@ -556,10 +549,11 @@
if exchange = find_exchange(name)
extended_opts = Exchange.add_default_options(:topic, name, opts, block)
validate_parameters_match!(exchange, extended_opts)
+ block.call(exchange) if block
exchange
else
register_exchange(Exchange.new(self, :topic, name, opts, &block))
end
end
@@ -599,15 +593,59 @@
#
# @raise [AMQP::Error] Raised when exchange is redeclared with parameters different from original declaration.
# @raise [AMQP::Error] Raised when exchange is declared with :passive => true and the exchange does not exist.
#
#
- # @example Using fanout exchange to deliver messages to multiple consumers
+ # @example Using headers exchange to route messages based on multiple attributes (OS, architecture, # of cores)
#
- # # TODO
+ # puts "=> Headers routing example"
+ # puts
+ # AMQP.start do |connection|
+ # channel = AMQP::Channel.new(connection)
+ # channel.on_error do |ch, channel_close|
+ # puts "A channel-level exception: #{channel_close.inspect}"
+ # end
#
+ # exchange = channel.headers("amq.match", :durable => true)
#
+ # channel.queue("", :auto_delete => true).bind(exchange, :arguments => { 'x-match' => 'all', :arch => "x64", :os => 'linux' }).subscribe do |metadata, payload|
+ # puts "[linux/x64] Got a message: #{payload}"
+ # end
+ # channel.queue("", :auto_delete => true).bind(exchange, :arguments => { 'x-match' => 'all', :arch => "x32", :os => 'linux' }).subscribe do |metadata, payload|
+ # puts "[linux/x32] Got a message: #{payload}"
+ # end
+ # channel.queue("", :auto_delete => true).bind(exchange, :arguments => { 'x-match' => 'any', :os => 'linux', :arch => "__any__" }).subscribe do |metadata, payload|
+ # puts "[linux] Got a message: #{payload}"
+ # end
+ # channel.queue("", :auto_delete => true).bind(exchange, :arguments => { 'x-match' => 'any', :os => 'macosx', :cores => 8 }).subscribe do |metadata, payload|
+ # puts "[macosx|octocore] Got a message: #{payload}"
+ # end
+ #
+ #
+ # EventMachine.add_timer(0.5) do
+ # exchange.publish "For linux/x64", :headers => { :arch => "x64", :os => 'linux' }
+ # exchange.publish "For linux/x32", :headers => { :arch => "x32", :os => 'linux' }
+ # exchange.publish "For linux", :headers => { :os => 'linux' }
+ # exchange.publish "For OS X", :headers => { :os => 'macosx' }
+ # exchange.publish "For solaris/x64", :headers => { :os => 'solaris', :arch => 'x64' }
+ # exchange.publish "For ocotocore", :headers => { :cores => 8 }
+ # end
+ #
+ #
+ # show_stopper = Proc.new do
+ # $stdout.puts "Stopping..."
+ # connection.close {
+ # EventMachine.stop { exit }
+ # }
+ # end
+ #
+ # Signal.trap "INT", show_stopper
+ # EventMachine.add_timer(2, show_stopper)
+ # end
+ #
+ #
+ #
# @see Exchange
# @see Exchange#initialize
# @see Channel#default_exchange
# @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 3.1.3.3)
#
@@ -617,17 +655,23 @@
if exchange = find_exchange(name)
extended_opts = Exchange.add_default_options(:headers, name, opts, block)
validate_parameters_match!(exchange, extended_opts)
+ block.call(exchange) if block
exchange
else
register_exchange(Exchange.new(self, :headers, name, opts, &block))
end
end
+ # @endgroup
+
+ # @group Declaring queues
+
+
# Declares and returns a Queue instance associated with this channel. See {Queue Queue class documentation} for
# more information about queues.
#
# To make broker generate queue name for you (a classic example is exclusive
# queues that are only used for a short period of time), pass empty string
@@ -712,23 +756,17 @@
if name && !name.empty? && (queue = find_queue(name))
extended_opts = Queue.add_default_options(name, opts, block)
validate_parameters_match!(queue, extended_opts)
+ block.call(queue) if block
queue
else
self.queue!(name, opts, &block)
end
end
- # Returns true if channel is not closed.
- # @return [Boolean]
- # @api public
- def open?
- self.status == :opened || self.status == :opening
- end # open?
-
# Same as {Channel#queue} but when queue with the same name already exists in this channel
# object's cache, this method will replace existing queue with a newly defined one. Consider
# using {Channel#queue} instead.
#
# @see Channel#queue
@@ -751,22 +789,25 @@
end
register_queue(queue)
end
+ # @endgroup
+
+
# Instantiates and returns an RPC instance associated with this channel.
#
# The optional object may be a class name, module name or object
# instance. When given a class or module name, the object is instantiated
# during this setup. The passed queue is automatically subscribed to so
# it passes all messages (and their arguments) to the object.
#
# Marshalling and unmarshalling the objects is handled internally. This
# marshalling is subject to the same restrictions as defined in the
- # Marshal[http://ruby-doc.org/core/classes/Marshal.html] standard
- # library. See that documentation for further reference.
+ # [http://ruby-doc.org/core/classes/Marshal.html Marshal module} in the Ruby standard
+ # library.
#
# When the optional object is not passed, the returned rpc reference is
# used to send messages and arguments to the queue. See {AMQP::RPC#method_missing}
# which does all of the heavy lifting with the proxy. Some client
# elsewhere must call this method *with* the optional block so that
@@ -785,11 +826,87 @@
RPC.new(self, name, obj)
end
+ # Returns a hash of all rpc proxy objects.
+ #
+ # Most of the time, this method is not
+ # called by application code.
+ # @api plugin
+ def rpcs
+ @rpcs.values
+ end
+
+
+ # @group Channel lifecycle
+
+ # Opens AMQP channel.
+ #
+ # @note Instantiated channels are opened by default. This method should only be used for error recovery after network connection loss.
+ # @api public
+ def open(&block)
+ super(&block)
+ end
+
+ # @return [Boolean] true if channel is not closed.
+ # @api public
+ def open?
+ self.status == :opened || self.status == :opening
+ end # open?
+
+ # Takes a block that will be deferred till the moment when channel is considered open
+ # (channel.open-ok is received from the broker). If you need to delay an operation
+ # till the moment channel is open, this method is what you are looking for.
+ #
+ # Multiple callbacks are supported. If when this moment is called, channel is already
+ # open, block is executed immediately.
+ #
+ # @api public
+ def once_open(&block)
+ @channel_is_open_deferrable.callback(&block)
+ end # once_open(&block)
+ alias once_opened once_open
+
+ # Closes AMQP channel.
+ #
+ # @api public
+ def close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block)
+ super(reply_code, reply_text, class_id, method_id, &block)
+ end
+
+ # @endgroup
+
+
+
+
+ # @group QoS and flow handling
+
+ # Asks the peer to pause or restart the flow of content data sent to a consumer.
+ # This is a simple flowcontrol mechanism that a peer can use to avoid overflowing its
+ # queues or otherwise finding itself receiving more messages than it can process. Note that
+ # this method is not intended for window control. It does not affect contents returned to
+ # Queue#get callers.
+ #
+ # @param [Boolean] Desired flow state.
+ #
+ # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.5.2.3.)
+ # @api public
+ def flow(active = false, &block)
+ super(active, &block)
+ end
+
+ # @return [Boolean] True if flow in this channel is active (messages will be delivered to consumers that use this channel).
+ #
+ # @api public
+ def flow_is_active?
+ @flow_is_active
+ end # flow_is_active?
+
+
+
# @param [Fixnum] Message count
# @param [Boolean] global (false)
#
# @return [Channel] self
#
@@ -801,23 +918,111 @@
end
self
end
+ # @endgroup
- # Returns a hash of all rpc proxy objects.
+
+ # @group Message acknowledgements
+
+ # Acknowledge one or all messages on the channel.
#
- # Most of the time, this method is not
- # called by application code.
- # @api plugin
- def rpcs
- @rpcs.values
+ # @api public
+ # @see #reject
+ # @see #recover
+ # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.13.)
+ def acknowledge(delivery_tag, multiple = false)
+ super(delivery_tag, multiple)
+ end # acknowledge(delivery_tag, multiple = false)
+
+ # Reject a message with given delivery tag.
+ #
+ # @api public
+ # @see #acknowledge
+ # @see #recover
+ # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.14.)
+ def reject(delivery_tag, requeue = true)
+ super(delivery_tag, requeue)
+ end # reject(delivery_tag, requeue = true)
+
+ # Notifies AMQ broker that consumer has recovered and unacknowledged messages need
+ # to be redelivered.
+ #
+ # @return [Channel] self
+ #
+ # @note RabbitMQ as of 2.3.1 does not support basic.recover with requeue = false.
+ # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.16.)
+ # @see #acknowledge
+ # @api public
+ def recover(requeue = true, &block)
+ super(requeue, &block)
+ end # recover(requeue = false, &block)
+
+ # @endgroup
+
+
+
+
+ # @group Transactions
+
+ # Sets the channel to use standard transactions. One must use this method at least
+ # once on a channel before using #tx_tommit or tx_rollback methods.
+ #
+ # @api public
+ def tx_select(&block)
+ super(&block)
+ end # tx_select(&block)
+
+ # Commits AMQP transaction.
+ #
+ # @api public
+ def tx_commit(&block)
+ super(&block)
+ end # tx_commit(&block)
+
+ # Rolls AMQP transaction back.
+ #
+ # @api public
+ def tx_rollback(&block)
+ super(&block)
+ end # tx_rollback(&block)
+
+
+ # @endgroup
+
+
+
+
+
+ # @group Error handling
+
+ # Defines a callback that will be executed when channel is closed after
+ # channel-level exception.
+ #
+ # @api public
+ def on_error(&block)
+ super(&block)
end
+ # Defines a global callback to be run on channel-level exception across
+ # all channels. Consider using Channel#on_error instead. This method is here for sake
+ # of backwards compatibility with 0.6.x and 0.7.x releases.
+ # @see AMQP::Channel#on_error
+ # @deprecated
+ # @api public
+ def self.on_error(&block)
+ self.error(&block)
+ end # self.on_error(&block)
+ # @endgroup
+
+
+
+
#
# Implementation
#
@@ -836,19 +1041,10 @@
else
@global_error_handler.call(msg) if @global_error_handler && msg
end
end
- # Defines a global callback to be run on channel-level exception across
- # all channels. Consider using Channel#on_error instead. This method is here for sake
- # of backwards compatibility with 0.6.x and 0.7.x releases.
- # @see AMQP::Channel#on_error
- # @deprecated
- # @api public
- def self.on_error(&block)
- self.error(&block)
- end # self.on_error(&block)
# Overrides AMQ::Client::Channel version to also call global callback
# (if defined) for backwards compatibility.
#
# @private
@@ -865,16 +1061,23 @@
# Most of the time, this method is not
# called by application code.
#
# @private
# @api plugin
- def reset
+ def reset(&block)
# See AMQ::Client::Channel
self.reset_state!
# there is no way to reset a deferrable; we have to use a new instance. MK.
@channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new
+ @channel_is_open_deferrable.callback(&block)
+
+ @connection.on_connection do
+ @channel_is_open_deferrable.succeed
+
+ self.prefetch(@options[:prefetch], false) if @options[:prefetch]
+ end
end
# @private
# @api plugin
def reset_state!
@@ -928,12 +1131,13 @@
#
# Backwards compatibility with 0.6.x
#
- # unique identifier
+ # unique identifier of the default thread-local channel
# @deprecated
+ # @private
def self.id
Thread.current[:mq_id] ||= "#{`hostname`.strip}-#{Process.pid}-#{Thread.current.object_id}"
end
# @private
@@ -954,9 +1158,10 @@
protected
# @private
def validate_parameters_match!(entity, parameters)
+ parameters.delete(:no_declare)
unless entity.opts == parameters || parameters[:passive]
raise AMQP::IncompatibleOptionsError.new(entity.name, entity.opts, parameters)
end
end # validate_parameters_match!(entity, parameters)
end # Channel