lib/amqp/channel.rb in amqp-0.8.0.rc12 vs lib/amqp/channel.rb in amqp-0.8.0.rc13

- old
+ new

@@ -203,10 +203,11 @@ # @see AMQP::Channel#prefetch # @api public def initialize(connection = nil, id = self.class.next_channel_id, options = {}, &block) raise 'AMQP can only be used from within EM.run {}' unless EM.reactor_running? + @options = options @connection = connection || AMQP.connection || AMQP.start super(@connection, id) @rpcs = Hash.new @@ -239,24 +240,14 @@ self.prefetch(options[:prefetch], false) if options[:prefetch] end # self.open end # @connection.on_open end - # Takes a block that will be deferred till the moment when channel is considered open - # (channel.open-ok is received from the broker). If you need to delay an operation - # till the moment channel is open, this method is what you are looking for. - # - # Multiple callbacks are supported. If when this moment is called, channel is already - # open, block is executed immediately. - # - # @api public - def once_open(&block) - @channel_is_open_deferrable.callback(&block) - end # once_open(&block) - alias once_opened once_open + # @group Declaring exchanges + # Defines, intializes and returns a direct Exchange instance. # # Learn more about direct exchanges in {Exchange Exchange class documentation}. # # @@ -334,10 +325,11 @@ if exchange = find_exchange(name) extended_opts = Exchange.add_default_options(:direct, name, opts, block) validate_parameters_match!(exchange, extended_opts) + block.call(exchange) if block exchange else register_exchange(Exchange.new(self, :direct, name, opts, &block)) end end @@ -441,10 +433,11 @@ if exchange = find_exchange(name) extended_opts = Exchange.add_default_options(:fanout, name, opts, block) validate_parameters_match!(exchange, extended_opts) + block.call(exchange) if block exchange else register_exchange(Exchange.new(self, :fanout, name, opts, &block)) end end @@ -556,10 +549,11 @@ if exchange = find_exchange(name) extended_opts = Exchange.add_default_options(:topic, name, opts, block) validate_parameters_match!(exchange, extended_opts) + block.call(exchange) if block exchange else register_exchange(Exchange.new(self, :topic, name, opts, &block)) end end @@ -599,15 +593,59 @@ # # @raise [AMQP::Error] Raised when exchange is redeclared with parameters different from original declaration. # @raise [AMQP::Error] Raised when exchange is declared with :passive => true and the exchange does not exist. # # - # @example Using fanout exchange to deliver messages to multiple consumers + # @example Using headers exchange to route messages based on multiple attributes (OS, architecture, # of cores) # - # # TODO + # puts "=> Headers routing example" + # puts + # AMQP.start do |connection| + # channel = AMQP::Channel.new(connection) + # channel.on_error do |ch, channel_close| + # puts "A channel-level exception: #{channel_close.inspect}" + # end # + # exchange = channel.headers("amq.match", :durable => true) # + # channel.queue("", :auto_delete => true).bind(exchange, :arguments => { 'x-match' => 'all', :arch => "x64", :os => 'linux' }).subscribe do |metadata, payload| + # puts "[linux/x64] Got a message: #{payload}" + # end + # channel.queue("", :auto_delete => true).bind(exchange, :arguments => { 'x-match' => 'all', :arch => "x32", :os => 'linux' }).subscribe do |metadata, payload| + # puts "[linux/x32] Got a message: #{payload}" + # end + # channel.queue("", :auto_delete => true).bind(exchange, :arguments => { 'x-match' => 'any', :os => 'linux', :arch => "__any__" }).subscribe do |metadata, payload| + # puts "[linux] Got a message: #{payload}" + # end + # channel.queue("", :auto_delete => true).bind(exchange, :arguments => { 'x-match' => 'any', :os => 'macosx', :cores => 8 }).subscribe do |metadata, payload| + # puts "[macosx|octocore] Got a message: #{payload}" + # end + # + # + # EventMachine.add_timer(0.5) do + # exchange.publish "For linux/x64", :headers => { :arch => "x64", :os => 'linux' } + # exchange.publish "For linux/x32", :headers => { :arch => "x32", :os => 'linux' } + # exchange.publish "For linux", :headers => { :os => 'linux' } + # exchange.publish "For OS X", :headers => { :os => 'macosx' } + # exchange.publish "For solaris/x64", :headers => { :os => 'solaris', :arch => 'x64' } + # exchange.publish "For ocotocore", :headers => { :cores => 8 } + # end + # + # + # show_stopper = Proc.new do + # $stdout.puts "Stopping..." + # connection.close { + # EventMachine.stop { exit } + # } + # end + # + # Signal.trap "INT", show_stopper + # EventMachine.add_timer(2, show_stopper) + # end + # + # + # # @see Exchange # @see Exchange#initialize # @see Channel#default_exchange # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 3.1.3.3) # @@ -617,17 +655,23 @@ if exchange = find_exchange(name) extended_opts = Exchange.add_default_options(:headers, name, opts, block) validate_parameters_match!(exchange, extended_opts) + block.call(exchange) if block exchange else register_exchange(Exchange.new(self, :headers, name, opts, &block)) end end + # @endgroup + + # @group Declaring queues + + # Declares and returns a Queue instance associated with this channel. See {Queue Queue class documentation} for # more information about queues. # # To make broker generate queue name for you (a classic example is exclusive # queues that are only used for a short period of time), pass empty string @@ -712,23 +756,17 @@ if name && !name.empty? && (queue = find_queue(name)) extended_opts = Queue.add_default_options(name, opts, block) validate_parameters_match!(queue, extended_opts) + block.call(queue) if block queue else self.queue!(name, opts, &block) end end - # Returns true if channel is not closed. - # @return [Boolean] - # @api public - def open? - self.status == :opened || self.status == :opening - end # open? - # Same as {Channel#queue} but when queue with the same name already exists in this channel # object's cache, this method will replace existing queue with a newly defined one. Consider # using {Channel#queue} instead. # # @see Channel#queue @@ -751,22 +789,25 @@ end register_queue(queue) end + # @endgroup + + # Instantiates and returns an RPC instance associated with this channel. # # The optional object may be a class name, module name or object # instance. When given a class or module name, the object is instantiated # during this setup. The passed queue is automatically subscribed to so # it passes all messages (and their arguments) to the object. # # Marshalling and unmarshalling the objects is handled internally. This # marshalling is subject to the same restrictions as defined in the - # Marshal[http://ruby-doc.org/core/classes/Marshal.html] standard - # library. See that documentation for further reference. + # [http://ruby-doc.org/core/classes/Marshal.html Marshal module} in the Ruby standard + # library. # # When the optional object is not passed, the returned rpc reference is # used to send messages and arguments to the queue. See {AMQP::RPC#method_missing} # which does all of the heavy lifting with the proxy. Some client # elsewhere must call this method *with* the optional block so that @@ -785,11 +826,87 @@ RPC.new(self, name, obj) end + # Returns a hash of all rpc proxy objects. + # + # Most of the time, this method is not + # called by application code. + # @api plugin + def rpcs + @rpcs.values + end + + + # @group Channel lifecycle + + # Opens AMQP channel. + # + # @note Instantiated channels are opened by default. This method should only be used for error recovery after network connection loss. + # @api public + def open(&block) + super(&block) + end + + # @return [Boolean] true if channel is not closed. + # @api public + def open? + self.status == :opened || self.status == :opening + end # open? + + # Takes a block that will be deferred till the moment when channel is considered open + # (channel.open-ok is received from the broker). If you need to delay an operation + # till the moment channel is open, this method is what you are looking for. + # + # Multiple callbacks are supported. If when this moment is called, channel is already + # open, block is executed immediately. + # + # @api public + def once_open(&block) + @channel_is_open_deferrable.callback(&block) + end # once_open(&block) + alias once_opened once_open + + # Closes AMQP channel. + # + # @api public + def close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block) + super(reply_code, reply_text, class_id, method_id, &block) + end + + # @endgroup + + + + + # @group QoS and flow handling + + # Asks the peer to pause or restart the flow of content data sent to a consumer. + # This is a simple flow­control mechanism that a peer can use to avoid overflowing its + # queues or otherwise finding itself receiving more messages than it can process. Note that + # this method is not intended for window control. It does not affect contents returned to + # Queue#get callers. + # + # @param [Boolean] Desired flow state. + # + # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.5.2.3.) + # @api public + def flow(active = false, &block) + super(active, &block) + end + + # @return [Boolean] True if flow in this channel is active (messages will be delivered to consumers that use this channel). + # + # @api public + def flow_is_active? + @flow_is_active + end # flow_is_active? + + + # @param [Fixnum] Message count # @param [Boolean] global (false) # # @return [Channel] self # @@ -801,23 +918,111 @@ end self end + # @endgroup - # Returns a hash of all rpc proxy objects. + + # @group Message acknowledgements + + # Acknowledge one or all messages on the channel. # - # Most of the time, this method is not - # called by application code. - # @api plugin - def rpcs - @rpcs.values + # @api public + # @see #reject + # @see #recover + # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.13.) + def acknowledge(delivery_tag, multiple = false) + super(delivery_tag, multiple) + end # acknowledge(delivery_tag, multiple = false) + + # Reject a message with given delivery tag. + # + # @api public + # @see #acknowledge + # @see #recover + # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.14.) + def reject(delivery_tag, requeue = true) + super(delivery_tag, requeue) + end # reject(delivery_tag, requeue = true) + + # Notifies AMQ broker that consumer has recovered and unacknowledged messages need + # to be redelivered. + # + # @return [Channel] self + # + # @note RabbitMQ as of 2.3.1 does not support basic.recover with requeue = false. + # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.16.) + # @see #acknowledge + # @api public + def recover(requeue = true, &block) + super(requeue, &block) + end # recover(requeue = false, &block) + + # @endgroup + + + + + # @group Transactions + + # Sets the channel to use standard transactions. One must use this method at least + # once on a channel before using #tx_tommit or tx_rollback methods. + # + # @api public + def tx_select(&block) + super(&block) + end # tx_select(&block) + + # Commits AMQP transaction. + # + # @api public + def tx_commit(&block) + super(&block) + end # tx_commit(&block) + + # Rolls AMQP transaction back. + # + # @api public + def tx_rollback(&block) + super(&block) + end # tx_rollback(&block) + + + # @endgroup + + + + + + # @group Error handling + + # Defines a callback that will be executed when channel is closed after + # channel-level exception. + # + # @api public + def on_error(&block) + super(&block) end + # Defines a global callback to be run on channel-level exception across + # all channels. Consider using Channel#on_error instead. This method is here for sake + # of backwards compatibility with 0.6.x and 0.7.x releases. + # @see AMQP::Channel#on_error + # @deprecated + # @api public + def self.on_error(&block) + self.error(&block) + end # self.on_error(&block) + # @endgroup + + + + # # Implementation # @@ -836,19 +1041,10 @@ else @global_error_handler.call(msg) if @global_error_handler && msg end end - # Defines a global callback to be run on channel-level exception across - # all channels. Consider using Channel#on_error instead. This method is here for sake - # of backwards compatibility with 0.6.x and 0.7.x releases. - # @see AMQP::Channel#on_error - # @deprecated - # @api public - def self.on_error(&block) - self.error(&block) - end # self.on_error(&block) # Overrides AMQ::Client::Channel version to also call global callback # (if defined) for backwards compatibility. # # @private @@ -865,16 +1061,23 @@ # Most of the time, this method is not # called by application code. # # @private # @api plugin - def reset + def reset(&block) # See AMQ::Client::Channel self.reset_state! # there is no way to reset a deferrable; we have to use a new instance. MK. @channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new + @channel_is_open_deferrable.callback(&block) + + @connection.on_connection do + @channel_is_open_deferrable.succeed + + self.prefetch(@options[:prefetch], false) if @options[:prefetch] + end end # @private # @api plugin def reset_state! @@ -928,12 +1131,13 @@ # # Backwards compatibility with 0.6.x # - # unique identifier + # unique identifier of the default thread-local channel # @deprecated + # @private def self.id Thread.current[:mq_id] ||= "#{`hostname`.strip}-#{Process.pid}-#{Thread.current.object_id}" end # @private @@ -954,9 +1158,10 @@ protected # @private def validate_parameters_match!(entity, parameters) + parameters.delete(:no_declare) unless entity.opts == parameters || parameters[:passive] raise AMQP::IncompatibleOptionsError.new(entity.name, entity.opts, parameters) end end # validate_parameters_match!(entity, parameters) end # Channel