lib/amqp/queue.rb in amqp-0.8.0.rc12 vs lib/amqp/queue.rb in amqp-0.8.0.rc13
- old
+ new
@@ -168,15 +168,20 @@
# @yieldparam [Queue] queue Queue that is successfully declared and is ready to be used.
# @yieldparam [AMQP::Protocol::Queue::DeclareOk] declare_ok AMQP queue.declare-ok) instance.
#
# @api public
def initialize(channel, name = AMQ::Protocol::EMPTY_STRING, opts = {}, &block)
+ raise ArgumentError.new("queue name must not be nil; if you want broker to generate queue name for you, pass an empty string") if name.nil?
+
@channel = channel
name = AMQ::Protocol::EMPTY_STRING if name.nil?
@name = name unless name.empty?
@server_named = name.empty?
@opts = self.class.add_default_options(name, opts, block)
+
+ raise ArgumentError.new("server-named queues (name = '') declaration with :nowait => true makes no sense. If you are not sure what that means, simply drop :nowait => true from opts.") if @server_named && @opts[:nowait]
+
@bindings = Hash.new
# a deferrable that we use to delay operations until this queue is actually declared.
# one reason for this is to support a case when a server-named queue is immediately bound.
# it's crazy, but 0.7.x supports it, so... MK.
@@ -252,10 +257,14 @@
# Not all exchanges use a routing key! Refer to the specific
# exchange documentation. If the routing key is empty and the queue
# name is empty, the routing key will be the current queue for the
# channel, which is the last declared queue.
#
+ # @option opts [Hash] :arguments (nil) A hash of optional arguments with the declaration. Headers exchange type uses these metadata
+ # attributes for routing matching.
+ # In addition, brokers may implement AMQP extensions using x-prefixed declaration arguments.
+ #
# @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should
# not wait for a reply method. If the server could not complete the
# method it will raise a channel or connection exception.
# @return [Queue] Self
#
@@ -470,11 +479,11 @@
# exchange matches a message to this queue.
#
#
# @example Use of callback with a single argument
#
- # EM.run do
+ # EventMachine.run do
# exchange = AMQP::Channel.direct("foo queue")
# EM.add_periodic_timer(1) do
# exchange.publish("random number #{rand(1000)}")
# end
#
@@ -486,23 +495,59 @@
# be passed in for processing. The header object is defined by
# AMQP::Protocol::Header.
#
# @example Use of callback with two arguments
#
- # EM.run do
- # exchange = AMQP::Channel.direct("foo queue")
- # EM.add_periodic_timer(1) do
- # exchange.publish("random number #{rand(1000)}")
+ # EventMachine.run do
+ # connection = AMQP.connect(:host => '127.0.0.1')
+ # puts "Connected to AMQP broker. Running #{AMQP::VERSION} version of the gem..."
+ #
+ # channel = AMQP::Channel.new(connection)
+ # queue = channel.queue("amqpgem.examples.hello_world", :auto_delete => true)
+ # exchange = channel.direct("amq.direct")
+ #
+ # queue.bind(exchange)
+ #
+ # channel.on_error do |ch, channel_close|
+ # puts channel_close.reply_text
+ # connection.close { EventMachine.stop }
# end
#
- # # note that #bind is never called; it is implicit because
- # # the exchange and queue names match
- # queue = AMQP::Channel.queue('foo queue')
- # queue.subscribe do |header, body|
- # p header
- # puts "received payload [#{body}]"
+ # queue.subscribe do |metadata, payload|
+ # puts "metadata.routing_key : #{metadata.routing_key}"
+ # puts "metadata.content_type: #{metadata.content_type}"
+ # puts "metadata.priority : #{metadata.priority}"
+ # puts "metadata.headers : #{metadata.headers.inspect}"
+ # puts "metadata.timestamp : #{metadata.timestamp.inspect}"
+ # puts "metadata.type : #{metadata.type}"
+ # puts "metadata.delivery_tag: #{metadata.delivery_tag}"
+ # puts "metadata.redelivered : #{metadata.redelivered}"
+ #
+ # puts "metadata.app_id : #{metadata.app_id}"
+ # puts "metadata.exchange : #{metadata.exchange}"
+ # puts
+ # puts "Received a message: #{payload}. Disconnecting..."
+ #
+ # connection.close {
+ # EventMachine.stop { exit }
+ # }
# end
+ #
+ # exchange.publish("Hello, world!",
+ # :app_id => "amqpgem.example",
+ # :priority => 8,
+ # :type => "kinda.checkin",
+ # # headers table keys can be anything
+ # :headers => {
+ # :coordinates => {
+ # :latitude => 59.35,
+ # :longitude => 18.066667
+ # },
+ # :participants => 11,
+ # :venue => "Stockholm"
+ # },
+ # :timestamp => Time.now.to_i)
# end
#
#
# @option opts [Boolean ]:ack (false) If this field is set to false the server does not expect acknowledgments
# for messages. That is, when a message is delivered to the client
@@ -514,21 +559,29 @@
# @option opts [Boolean] :nowait (false) If set, the server will not respond to the method. The client should
# not wait for a reply method. If the server could not complete the
# method it will raise a channel or connection exception.
#
# @option opts [#call] :confirm (nil) If set, this proc will be called when the server confirms subscription
- # to the queue with a ConsumeOk message. Setting this option will
+ # to the queue with a basic.consume-ok message. Setting this option will
# automatically set :nowait => false. This is required for the server
# to send a confirmation.
#
+ # @option opts [Boolean] :exclusive (false) Request exclusive consumer access, meaning only this consumer can access the queue.
+ # This is useful when you want a long-lived shared queue to be temporarily accessible by just
+ # one application (or thread, or process). If application exclusive consumer is part of crashes
+ # or loses network connection to the broker, channel is closed and exclusive consumer is thus cancelled.
#
+ #
# @yield [headers, payload] When block only takes one argument, yields payload to it. In case of two arguments, yields headers and payload.
# @yieldparam [AMQP::Header] headers Headers (metadata) associated with this message (for example, routing key).
# @yieldparam [String] payload Message body (content). On Ruby 1.9, you may want to check or enforce content encoding.
#
# @return [Queue] Self
# @api public
+ #
+ # @see file:docs/Queues.textile Documentation guide on queues
+ # @see #unsubscribe
def subscribe(opts = {}, &block)
raise Error, 'already subscribed to the queue' if @consumer_tag
# having initial value for @consumer_tag makes a lot of obscure issues
# go away. It is set to real value once we receive consume-ok (it is handled by
@@ -660,10 +713,10 @@
protected
# @private
def self.add_default_options(name, opts, block)
- { :queue => name, :nowait => block.nil? }.merge(opts)
+ { :queue => name, :nowait => (block.nil? && !name.empty?) }.merge(opts)
end
private
# Default direct exchange that we use to publish messages directly to this queue.