lib/amqp/queue.rb in amqp-0.8.0.rc12 vs lib/amqp/queue.rb in amqp-0.8.0.rc13

- old
+ new

@@ -168,15 +168,20 @@ # @yieldparam [Queue] queue Queue that is successfully declared and is ready to be used. # @yieldparam [AMQP::Protocol::Queue::DeclareOk] declare_ok AMQP queue.declare-ok) instance. # # @api public def initialize(channel, name = AMQ::Protocol::EMPTY_STRING, opts = {}, &block) + raise ArgumentError.new("queue name must not be nil; if you want broker to generate queue name for you, pass an empty string") if name.nil? + @channel = channel name = AMQ::Protocol::EMPTY_STRING if name.nil? @name = name unless name.empty? @server_named = name.empty? @opts = self.class.add_default_options(name, opts, block) + + raise ArgumentError.new("server-named queues (name = '') declaration with :nowait => true makes no sense. If you are not sure what that means, simply drop :nowait => true from opts.") if @server_named && @opts[:nowait] + @bindings = Hash.new # a deferrable that we use to delay operations until this queue is actually declared. # one reason for this is to support a case when a server-named queue is immediately bound. # it's crazy, but 0.7.x supports it, so... MK. @@ -252,10 +257,14 @@ # Not all exchanges use a routing key! Refer to the specific # exchange documentation. If the routing key is empty and the queue # name is empty, the routing key will be the current queue for the # channel, which is the last declared queue. # + # @option opts [Hash] :arguments (nil) A hash of optional arguments with the declaration. Headers exchange type uses these metadata + # attributes for routing matching. + # In addition, brokers may implement AMQP extensions using x-prefixed declaration arguments. + # # @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should # not wait for a reply method. If the server could not complete the # method it will raise a channel or connection exception. # @return [Queue] Self # @@ -470,11 +479,11 @@ # exchange matches a message to this queue. # # # @example Use of callback with a single argument # - # EM.run do + # EventMachine.run do # exchange = AMQP::Channel.direct("foo queue") # EM.add_periodic_timer(1) do # exchange.publish("random number #{rand(1000)}") # end # @@ -486,23 +495,59 @@ # be passed in for processing. The header object is defined by # AMQP::Protocol::Header. # # @example Use of callback with two arguments # - # EM.run do - # exchange = AMQP::Channel.direct("foo queue") - # EM.add_periodic_timer(1) do - # exchange.publish("random number #{rand(1000)}") + # EventMachine.run do + # connection = AMQP.connect(:host => '127.0.0.1') + # puts "Connected to AMQP broker. Running #{AMQP::VERSION} version of the gem..." + # + # channel = AMQP::Channel.new(connection) + # queue = channel.queue("amqpgem.examples.hello_world", :auto_delete => true) + # exchange = channel.direct("amq.direct") + # + # queue.bind(exchange) + # + # channel.on_error do |ch, channel_close| + # puts channel_close.reply_text + # connection.close { EventMachine.stop } # end # - # # note that #bind is never called; it is implicit because - # # the exchange and queue names match - # queue = AMQP::Channel.queue('foo queue') - # queue.subscribe do |header, body| - # p header - # puts "received payload [#{body}]" + # queue.subscribe do |metadata, payload| + # puts "metadata.routing_key : #{metadata.routing_key}" + # puts "metadata.content_type: #{metadata.content_type}" + # puts "metadata.priority : #{metadata.priority}" + # puts "metadata.headers : #{metadata.headers.inspect}" + # puts "metadata.timestamp : #{metadata.timestamp.inspect}" + # puts "metadata.type : #{metadata.type}" + # puts "metadata.delivery_tag: #{metadata.delivery_tag}" + # puts "metadata.redelivered : #{metadata.redelivered}" + # + # puts "metadata.app_id : #{metadata.app_id}" + # puts "metadata.exchange : #{metadata.exchange}" + # puts + # puts "Received a message: #{payload}. Disconnecting..." + # + # connection.close { + # EventMachine.stop { exit } + # } # end + # + # exchange.publish("Hello, world!", + # :app_id => "amqpgem.example", + # :priority => 8, + # :type => "kinda.checkin", + # # headers table keys can be anything + # :headers => { + # :coordinates => { + # :latitude => 59.35, + # :longitude => 18.066667 + # }, + # :participants => 11, + # :venue => "Stockholm" + # }, + # :timestamp => Time.now.to_i) # end # # # @option opts [Boolean ]:ack (false) If this field is set to false the server does not expect acknowledgments # for messages. That is, when a message is delivered to the client @@ -514,21 +559,29 @@ # @option opts [Boolean] :nowait (false) If set, the server will not respond to the method. The client should # not wait for a reply method. If the server could not complete the # method it will raise a channel or connection exception. # # @option opts [#call] :confirm (nil) If set, this proc will be called when the server confirms subscription - # to the queue with a ConsumeOk message. Setting this option will + # to the queue with a basic.consume-ok message. Setting this option will # automatically set :nowait => false. This is required for the server # to send a confirmation. # + # @option opts [Boolean] :exclusive (false) Request exclusive consumer access, meaning only this consumer can access the queue. + # This is useful when you want a long-lived shared queue to be temporarily accessible by just + # one application (or thread, or process). If application exclusive consumer is part of crashes + # or loses network connection to the broker, channel is closed and exclusive consumer is thus cancelled. # + # # @yield [headers, payload] When block only takes one argument, yields payload to it. In case of two arguments, yields headers and payload. # @yieldparam [AMQP::Header] headers Headers (metadata) associated with this message (for example, routing key). # @yieldparam [String] payload Message body (content). On Ruby 1.9, you may want to check or enforce content encoding. # # @return [Queue] Self # @api public + # + # @see file:docs/Queues.textile Documentation guide on queues + # @see #unsubscribe def subscribe(opts = {}, &block) raise Error, 'already subscribed to the queue' if @consumer_tag # having initial value for @consumer_tag makes a lot of obscure issues # go away. It is set to real value once we receive consume-ok (it is handled by @@ -660,10 +713,10 @@ protected # @private def self.add_default_options(name, opts, block) - { :queue => name, :nowait => block.nil? }.merge(opts) + { :queue => name, :nowait => (block.nil? && !name.empty?) }.merge(opts) end private # Default direct exchange that we use to publish messages directly to this queue.