lib/amqp/channel.rb in amqp-1.0.4 vs lib/amqp/channel.rb in amqp-1.1.0.pre1
- old
+ new
@@ -211,11 +211,10 @@
id = self.class.next_channel_id
end
super(@connection, id, options)
- @rpcs = Hash.new
# we need this deferrable to mimic what AMQP gem 0.7 does to enable
# the following (pseudo-synchronous) style of programming some people use in their
# existing codebases:
#
# connection = AMQP.connect
@@ -342,14 +341,10 @@
# @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should
# not wait for a reply method. If the server could not complete the
# method it will raise a channel or connection exception.
#
#
- # @raise [AMQP::Error] Raised when exchange is redeclared with parameters different from original declaration.
- # @raise [AMQP::Error] Raised when exchange is declared with :passive => true and the exchange does not exist.
- #
- #
# @example Using default pre-declared direct exchange and no callbacks (pseudo-synchronous style)
#
# # an exchange application A will be using to publish updates
# # to some search index
# exchange = channel.direct("index.updates")
@@ -473,15 +468,10 @@
#
# @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should
# not wait for a reply method. If the server could not complete the
# method it will raise a channel or connection exception.
#
- #
- # @raise [AMQP::Error] Raised when exchange is redeclared with parameters different from original declaration.
- # @raise [AMQP::Error] Raised when exchange is declared with :passive => true and the exchange does not exist.
- #
- #
# @example Using fanout exchange to deliver messages to multiple consumers
#
# # open up a channel
# # declare a fanout exchange
# # declare 3 queues, binds them
@@ -538,15 +528,10 @@
#
# @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should
# not wait for a reply method. If the server could not complete the
# method it will raise a channel or connection exception.
#
- #
- # @raise [AMQP::Error] Raised when exchange is redeclared with parameters different from original declaration.
- # @raise [AMQP::Error] Raised when exchange is declared with :passive => true and the exchange does not exist.
- #
- #
# @example Using topic exchange to deliver relevant news updates
# AMQP.connect do |connection|
# channel = AMQP::Channel.new(connection)
# exchange = channel.topic("pub/sub")
#
@@ -654,14 +639,10 @@
# @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should
# not wait for a reply method. If the server could not complete the
# method it will raise a channel or connection exception.
#
#
- # @raise [AMQP::Error] Raised when exchange is redeclared with parameters different from original declaration.
- # @raise [AMQP::Error] Raised when exchange is declared with :passive => true and the exchange does not exist.
- #
- #
# @example Using headers exchange to route messages based on multiple attributes (OS, architecture, # of cores)
#
# puts "=> Headers routing example"
# puts
# AMQP.start do |connection|
@@ -799,16 +780,10 @@
#
# @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should
# not wait for a reply method. If the server could not complete the
# method it will raise a channel or connection exception.
#
- #
- # @raise [AMQP::Error] Raised when queue is redeclared with parameters different from original declaration.
- # @raise [AMQP::Error] Raised when queue is declared with :passive => true and the queue does not exist.
- # @raise [AMQP::Error] Raised when queue is declared with :exclusive => true and queue with that name already exist.
- #
- #
# @yield [queue, declare_ok] Yields successfully declared queue instance and AMQP method (queue.declare-ok) instance. The latter is optional.
# @yieldparam [Queue] queue Queue that is successfully declared and is ready to be used.
# @yieldparam [AMQP::Protocol::Queue::DeclareOk] declare_ok AMQP queue.declare-ok) instance.
#
# @see Queue
@@ -867,54 +842,11 @@
# @endgroup
- # Instantiates and returns an RPC instance associated with this channel.
- #
- # The optional object may be a class name, module name or object
- # instance. When given a class or module name, the object is instantiated
- # during this setup. The passed queue is automatically subscribed to so
- # it passes all messages (and their arguments) to the object.
- #
- # Marshalling and unmarshalling the objects is handled internally. This
- # marshalling is subject to the same restrictions as defined in the
- # [http://ruby-doc.org/core/classes/Marshal.html Marshal module} in the Ruby standard
- # library.
- #
- # When the optional object is not passed, the returned rpc reference is
- # used to send messages and arguments to the queue. See {AMQP::RPC#method_missing}
- # which does all of the heavy lifting with the proxy. Some client
- # elsewhere must call this method *with* the optional block so that
- # there is a valid destination. Failure to do so will just enqueue
- # marshalled messages that are never consumed.
- #
- # @example Use of RPC
- #
- # # TODO
- #
- #
- # @param [String, Queue] Queue to be used by RPC server.
- # @return [RPC]
- # @api public
- def rpc(name, obj = nil)
- RPC.new(self, name, obj)
- end
-
-
- # Returns a hash of all rpc proxy objects.
- #
- # Most of the time, this method is not
- # called by application code.
- # @api plugin
- def rpcs
- @rpcs.values
- end
-
-
-
# @group Channel lifecycle
# Opens AMQP channel.
#
# @note Instantiated channels are opened by default. This method should only be used for error recovery after network connection loss.
@@ -1090,24 +1022,22 @@
# @api public
def on_error(&block)
super(&block)
end
+ # @endgroup
- # Defines a global callback to be run on channel-level exception across
- # all channels. Consider using Channel#on_error instead. This method is here for sake
- # of backwards compatibility with 0.6.x and 0.7.x releases.
- # @see AMQP::Channel#on_error
- # @deprecated
- # @api public
- def self.on_error(&block)
- self.error(&block)
- end # self.on_error(&block)
- # @endgroup
+ # @group Publisher Confirms
+ def confirm_select(nowait = false, &block)
+ self.once_open do
+ super(nowait, &block)
+ end
+ end
+ # @endgroup
#
# Implementation
#
@@ -1175,11 +1105,10 @@
# @private
# @api plugin
def reset_state!
super
- @rpcs = Hash.new
end # reset_state!
# Overrides superclass method to also re-create @channel_is_open_deferrable
#
@@ -1245,49 +1174,9 @@
# TODO: ideally, this should be in agreement with agreed max number of channels of the connection,
# but it is possible that value either not yet available. MK.
max_channel = (1 << 16) - 1
@int_allocator ||= IntAllocator.new(1, max_channel)
end # self.initialize_channel_id_allocator
-
- # @private
- # @api plugin
- def register_rpc(rpc)
- raise ArgumentError, "argument is nil!" unless rpc
-
- @rpcs[rpc.name] = rpc
- end # register_rpc(rpc)
-
- # @private
- # @api plugin
- def find_rpc(name)
- @rpcs[name]
- end
-
-
- #
- # Backwards compatibility with 0.6.x
- #
-
- # unique identifier of the default thread-local channel
- # @deprecated
- # @private
- def self.id
- Thread.current[:mq_id] ||= "#{`hostname`.strip}-#{Process.pid}-#{Thread.current.object_id}"
- end
-
- # @private
- # @deprecated
- def self.default
- # TODO: clear this when connection is closed
- Thread.current[:mq] ||= AMQP::Channel.new
- end
-
- # Allows for calls to all MQ instance methods. This implicitly calls
- # AMQP::Channel.new so that a new channel is allocated for subsequent operations.
- # @deprecated
- def self.method_missing(meth, *args, &blk)
- self.default.__send__(meth, *args, &blk)
- end
protected
@private