lib/amqp/channel.rb in amqp-1.0.4 vs lib/amqp/channel.rb in amqp-1.1.0.pre1

- old
+ new

@@ -211,11 +211,10 @@ id = self.class.next_channel_id end super(@connection, id, options) - @rpcs = Hash.new # we need this deferrable to mimic what AMQP gem 0.7 does to enable # the following (pseudo-synchronous) style of programming some people use in their # existing codebases: # # connection = AMQP.connect @@ -342,14 +341,10 @@ # @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should # not wait for a reply method. If the server could not complete the # method it will raise a channel or connection exception. # # - # @raise [AMQP::Error] Raised when exchange is redeclared with parameters different from original declaration. - # @raise [AMQP::Error] Raised when exchange is declared with :passive => true and the exchange does not exist. - # - # # @example Using default pre-declared direct exchange and no callbacks (pseudo-synchronous style) # # # an exchange application A will be using to publish updates # # to some search index # exchange = channel.direct("index.updates") @@ -473,15 +468,10 @@ # # @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should # not wait for a reply method. If the server could not complete the # method it will raise a channel or connection exception. # - # - # @raise [AMQP::Error] Raised when exchange is redeclared with parameters different from original declaration. - # @raise [AMQP::Error] Raised when exchange is declared with :passive => true and the exchange does not exist. - # - # # @example Using fanout exchange to deliver messages to multiple consumers # # # open up a channel # # declare a fanout exchange # # declare 3 queues, binds them @@ -538,15 +528,10 @@ # # @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should # not wait for a reply method. If the server could not complete the # method it will raise a channel or connection exception. # - # - # @raise [AMQP::Error] Raised when exchange is redeclared with parameters different from original declaration. - # @raise [AMQP::Error] Raised when exchange is declared with :passive => true and the exchange does not exist. - # - # # @example Using topic exchange to deliver relevant news updates # AMQP.connect do |connection| # channel = AMQP::Channel.new(connection) # exchange = channel.topic("pub/sub") # @@ -654,14 +639,10 @@ # @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should # not wait for a reply method. If the server could not complete the # method it will raise a channel or connection exception. # # - # @raise [AMQP::Error] Raised when exchange is redeclared with parameters different from original declaration. - # @raise [AMQP::Error] Raised when exchange is declared with :passive => true and the exchange does not exist. - # - # # @example Using headers exchange to route messages based on multiple attributes (OS, architecture, # of cores) # # puts "=> Headers routing example" # puts # AMQP.start do |connection| @@ -799,16 +780,10 @@ # # @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should # not wait for a reply method. If the server could not complete the # method it will raise a channel or connection exception. # - # - # @raise [AMQP::Error] Raised when queue is redeclared with parameters different from original declaration. - # @raise [AMQP::Error] Raised when queue is declared with :passive => true and the queue does not exist. - # @raise [AMQP::Error] Raised when queue is declared with :exclusive => true and queue with that name already exist. - # - # # @yield [queue, declare_ok] Yields successfully declared queue instance and AMQP method (queue.declare-ok) instance. The latter is optional. # @yieldparam [Queue] queue Queue that is successfully declared and is ready to be used. # @yieldparam [AMQP::Protocol::Queue::DeclareOk] declare_ok AMQP queue.declare-ok) instance. # # @see Queue @@ -867,54 +842,11 @@ # @endgroup - # Instantiates and returns an RPC instance associated with this channel. - # - # The optional object may be a class name, module name or object - # instance. When given a class or module name, the object is instantiated - # during this setup. The passed queue is automatically subscribed to so - # it passes all messages (and their arguments) to the object. - # - # Marshalling and unmarshalling the objects is handled internally. This - # marshalling is subject to the same restrictions as defined in the - # [http://ruby-doc.org/core/classes/Marshal.html Marshal module} in the Ruby standard - # library. - # - # When the optional object is not passed, the returned rpc reference is - # used to send messages and arguments to the queue. See {AMQP::RPC#method_missing} - # which does all of the heavy lifting with the proxy. Some client - # elsewhere must call this method *with* the optional block so that - # there is a valid destination. Failure to do so will just enqueue - # marshalled messages that are never consumed. - # - # @example Use of RPC - # - # # TODO - # - # - # @param [String, Queue] Queue to be used by RPC server. - # @return [RPC] - # @api public - def rpc(name, obj = nil) - RPC.new(self, name, obj) - end - - - # Returns a hash of all rpc proxy objects. - # - # Most of the time, this method is not - # called by application code. - # @api plugin - def rpcs - @rpcs.values - end - - - # @group Channel lifecycle # Opens AMQP channel. # # @note Instantiated channels are opened by default. This method should only be used for error recovery after network connection loss. @@ -1090,24 +1022,22 @@ # @api public def on_error(&block) super(&block) end + # @endgroup - # Defines a global callback to be run on channel-level exception across - # all channels. Consider using Channel#on_error instead. This method is here for sake - # of backwards compatibility with 0.6.x and 0.7.x releases. - # @see AMQP::Channel#on_error - # @deprecated - # @api public - def self.on_error(&block) - self.error(&block) - end # self.on_error(&block) - # @endgroup + # @group Publisher Confirms + def confirm_select(nowait = false, &block) + self.once_open do + super(nowait, &block) + end + end + # @endgroup # # Implementation # @@ -1175,11 +1105,10 @@ # @private # @api plugin def reset_state! super - @rpcs = Hash.new end # reset_state! # Overrides superclass method to also re-create @channel_is_open_deferrable # @@ -1245,49 +1174,9 @@ # TODO: ideally, this should be in agreement with agreed max number of channels of the connection, # but it is possible that value either not yet available. MK. max_channel = (1 << 16) - 1 @int_allocator ||= IntAllocator.new(1, max_channel) end # self.initialize_channel_id_allocator - - # @private - # @api plugin - def register_rpc(rpc) - raise ArgumentError, "argument is nil!" unless rpc - - @rpcs[rpc.name] = rpc - end # register_rpc(rpc) - - # @private - # @api plugin - def find_rpc(name) - @rpcs[name] - end - - - # - # Backwards compatibility with 0.6.x - # - - # unique identifier of the default thread-local channel - # @deprecated - # @private - def self.id - Thread.current[:mq_id] ||= "#{`hostname`.strip}-#{Process.pid}-#{Thread.current.object_id}" - end - - # @private - # @deprecated - def self.default - # TODO: clear this when connection is closed - Thread.current[:mq] ||= AMQP::Channel.new - end - - # Allows for calls to all MQ instance methods. This implicitly calls - # AMQP::Channel.new so that a new channel is allocated for subsequent operations. - # @deprecated - def self.method_missing(meth, *args, &blk) - self.default.__send__(meth, *args, &blk) - end protected @private