lib/mq/queue.rb in tmm1-amqp-0.5.9 vs lib/mq/queue.rb in tmm1-amqp-0.6.0

- old
+ new

@@ -1,84 +1,342 @@ class MQ class Queue include AMQP + # Queues store and forward messages. Queues can be configured in the server + # or created at runtime. Queues must be attached to at least one exchange + # in order to receive messages from publishers. + # + # Like an Exchange, queue names starting with 'amq.' are reserved for + # internal use. Attempts to create queue names in violation of this + # reservation will raise MQ:Error (ACCESS_REFUSED). + # + # When a queue is created without a name, the server will generate a + # unique name internally (not currently supported in this library). + # + # == Options + # * :passive => true | false (default false) + # If set, the server will not create the exchange if it does not + # already exist. The client can use this to check whether an exchange + # exists without modifying the server state. + # + # * :durable => true | false (default false) + # If set when creating a new queue, the queue will be marked as + # durable. Durable queues remain active when a server restarts. + # Non-durable queues (transient queues) are purged if/when a + # server restarts. Note that durable queues do not necessarily + # hold persistent messages, although it does not make sense to + # send persistent messages to a transient queue (though it is + # allowed). + # + # If the queue has already been declared, any redeclaration will + # ignore this setting. A queue may only be declared durable the + # first time when it is created. + # + # * :exclusive => true | false (default false) + # Exclusive queues may only be consumed from by the current connection. + # Setting the 'exclusive' flag always implies 'auto-delete'. Only a + # single consumer is allowed to remove messages from this queue. + # + # The default is a shared queue. Multiple clients may consume messages + # from this queue. + # + # Attempting to redeclare an already-declared queue as :exclusive => true + # will raise MQ:Error. + # + # * :auto_delete = true | false (default false) + # If set, the queue is deleted when all consumers have finished + # using it. Last consumer can be cancelled either explicitly or because + # its channel is closed. If there was no consumer ever on the queue, it + # won't be deleted. + # + # The server waits for a short period of time before + # determining the queue is unused to give time to the client code + # to bind an exchange to it. + # + # If the queue has been previously declared, this option is ignored + # on subsequent declarations. + # + # * :nowait => true | false (default true) + # If set, the server will not respond to the method. The client should + # not wait for a reply method. If the server could not complete the + # method it will raise a channel or connection exception. + # def initialize mq, name, opts = {} @mq = mq + @opts = opts + @bindings ||= {} @mq.queues[@name = name] ||= self @mq.callback{ @mq.send Protocol::Queue::Declare.new({ :queue => name, :nowait => true }.merge(opts)) } end attr_reader :name + # This method binds a queue to an exchange. Until a queue is + # bound it will not receive any messages. In a classic messaging + # model, store-and-forward queues are bound to a dest exchange + # and subscription queues are bound to a dest_wild exchange. + # + # A valid exchange name (or reference) must be passed as the first + # parameter. Both of these are valid: + # exch = MQ.direct('foo exchange') + # queue = MQ.queue('bar queue') + # queue.bind('foo.exchange') # OR + # queue.bind(exch) + # + # It is not valid to call #bind without the +exchange+ parameter. + # + # It is unnecessary to call #bind when the exchange name and queue + # name match exactly (for +direct+ and +fanout+ exchanges only). + # There is an implicit bind which will deliver the messages from + # the exchange to the queue. + # + # == Options + # * :key => 'some string' + # Specifies the routing key for the binding. The routing key is + # used for routing messages depending on the exchange configuration. + # Not all exchanges use a routing key - refer to the specific + # exchange documentation. If the routing key is empty and the queue + # name is empty, the routing key will be the current queue for the + # channel, which is the last declared queue. + # + # * :nowait => true | false (default true) + # If set, the server will not respond to the method. The client should + # not wait for a reply method. If the server could not complete the + # method it will raise a channel or connection exception. + # def bind exchange, opts = {} + exchange = exchange.respond_to?(:name) ? exchange.name : exchange + @bindings[exchange] = opts + @mq.callback{ @mq.send Protocol::Queue::Bind.new({ :queue => name, - :exchange => exchange.respond_to?(:name) ? exchange.name : exchange, + :exchange => exchange, :routing_key => opts.delete(:key), :nowait => true }.merge(opts)) } self end + # Remove the binding between the queue and exchange. The queue will + # not receive any more messages until it is bound to another + # exchange. + # + # Due to the asynchronous nature of the protocol, it is possible for + # "in flight" messages to be received after this call completes. + # Those messages will be serviced by the last block used in a + # #subscribe or #pop call. + # + # * :nowait => true | false (default true) + # If set, the server will not respond to the method. The client should + # not wait for a reply method. If the server could not complete the + # method it will raise a channel or connection exception. + # def unbind exchange, opts = {} + exchange = exchange.respond_to?(:name) ? exchange.name : exchange + @bindings.delete exchange + @mq.callback{ @mq.send Protocol::Queue::Unbind.new({ :queue => name, - :exchange => exchange.respond_to?(:name) ? exchange.name : exchange, + :exchange => exchange, :routing_key => opts.delete(:key), :nowait => true }.merge(opts)) } self end + # This method deletes a queue. When a queue is deleted any pending + # messages are sent to a dead-letter queue if this is defined in the + # server configuration, and all consumers on the queue are cancelled. + # + # == Options + # * :if_unused => true | false (default false) + # If set, the server will only delete the queue if it has no + # consumers. If the queue has consumers the server does does not + # delete it but raises a channel exception instead. + # + # * :if_empty => true | false (default false) + # If set, the server will only delete the queue if it has no + # messages. If the queue is not empty the server raises a channel + # exception. + # + # * :nowait => true | false (default true) + # If set, the server will not respond to the method. The client should + # not wait for a reply method. If the server could not complete the + # method it will raise a channel or connection exception. + # def delete opts = {} @mq.callback{ @mq.send Protocol::Queue::Delete.new({ :queue => name, :nowait => true }.merge(opts)) } @mq.queues.delete @name nil end + # This method provides a direct access to the messages in a queue + # using a synchronous dialogue that is designed for specific types of + # application where synchronous functionality is more important than + # performance. + # + # The provided block is passed a single message each time pop is called. + # + # EM.run do + # exchange = MQ.direct("foo queue") + # EM.add_periodic_timer(1) do + # exchange.publish("random number #{rand(1000)}") + # end + # + # # note that #bind is never called; it is implicit because + # # the exchange and queue names match + # queue = MQ.queue('foo queue') + # queue.pop { |body| puts "received payload [#{body}]" } + # + # EM.add_periodic_timer(1) { queue.pop } + # end + # + # If the block takes 2 parameters, both the +header+ and the +body+ will + # be passed in for processing. The header object is defined by + # AMQP::Protocol::Header. + # + # EM.run do + # exchange = MQ.direct("foo queue") + # EM.add_periodic_timer(1) do + # exchange.publish("random number #{rand(1000)}") + # end + # + # queue = MQ.queue('foo queue') + # queue.pop do |header, body| + # p header + # puts "received payload [#{body}]" + # end + # + # EM.add_periodic_timer(1) { queue.pop } + # end + # + # == Options + # * :ack => true | false (default false) + # If this field is set to false the server does not expect acknowledgments + # for messages. That is, when a message is delivered to the client + # the server automatically and silently acknowledges it on behalf + # of the client. This functionality increases performance but at + # the cost of reliability. Messages can get lost if a client dies + # before it can deliver them to the application. + # + # * :nowait => true | false (default true) + # If set, the server will not respond to the method. The client should + # not wait for a reply method. If the server could not complete the + # method it will raise a channel or connection exception. + # def pop opts = {}, &blk - @ack = opts[:no_ack] === false + if blk + @on_pop = blk + @on_pop_opts = opts + end - @on_pop = blk if blk - @mq.callback{ @mq.send Protocol::Basic::Get.new({ :queue => name, :consumer_tag => name, - :no_ack => true, + :no_ack => !opts.delete(:ack), :nowait => true }.merge(opts)) @mq.get_queue{ |q| q.push(self) } } self end + # Subscribes to asynchronous message delivery. + # + # The provided block is passed a single message each time the + # exchange matches a message to this queue. + # + # EM.run do + # exchange = MQ.direct("foo queue") + # EM.add_periodic_timer(1) do + # exchange.publish("random number #{rand(1000)}") + # end + # + # queue = MQ.queue('foo queue') + # queue.subscribe { |body| puts "received payload [#{body}]" } + # end + # + # If the block takes 2 parameters, both the +header+ and the +body+ will + # be passed in for processing. The header object is defined by + # AMQP::Protocol::Header. + # + # EM.run do + # exchange = MQ.direct("foo queue") + # EM.add_periodic_timer(1) do + # exchange.publish("random number #{rand(1000)}") + # end + # + # # note that #bind is never called; it is implicit because + # # the exchange and queue names match + # queue = MQ.queue('foo queue') + # queue.subscribe do |header, body| + # p header + # puts "received payload [#{body}]" + # end + # end + # + # == Options + # * :ack => true | false (default false) + # If this field is set to false the server does not expect acknowledgments + # for messages. That is, when a message is delivered to the client + # the server automatically and silently acknowledges it on behalf + # of the client. This functionality increases performance but at + # the cost of reliability. Messages can get lost if a client dies + # before it can deliver them to the application. + # + # * :nowait => true | false (default true) + # If set, the server will not respond to the method. The client should + # not wait for a reply method. If the server could not complete the + # method it will raise a channel or connection exception. + # def subscribe opts = {}, &blk @consumer_tag = "#{name}-#{Kernel.rand(999_999_999_999)}" @mq.consumers[@consumer_tag] = self - raise Error, 'already subscribed to the queue' if @on_msg + raise Error, 'already subscribed to the queue' if subscribed? @on_msg = blk - @ack = opts[:no_ack] === false + @on_msg_opts = opts @mq.callback{ @mq.send Protocol::Basic::Consume.new({ :queue => name, :consumer_tag => @consumer_tag, - :no_ack => true, + :no_ack => !opts.delete(:ack), :nowait => true }.merge(opts)) } self end + # Removes the subscription from the queue and cancels the consumer. + # New messages will not be received by the queue. This call is similar + # in result to calling #unbind. + # + # Due to the asynchronous nature of the protocol, it is possible for + # "in flight" messages to be received after this call completes. + # Those messages will be serviced by the last block used in a + # #subscribe or #pop call. + # + # Additionally, if the queue was created with _autodelete_ set to + # true, the server will delete the queue after its wait period + # has expired unless the queue is bound to an active exchange. + # + # The method accepts a block which will be executed when the + # unsubscription request is acknowledged as complete by the server. + # + # * :nowait => true | false (default true) + # If set, the server will not respond to the method. The client should + # not wait for a reply method. If the server could not complete the + # method it will raise a channel or connection exception. + # def unsubscribe opts = {}, &blk @on_msg = nil @on_cancel = blk @mq.callback{ @mq.send Protocol::Basic::Cancel.new({ :consumer_tag => @consumer_tag }.merge(opts)) @@ -87,32 +345,37 @@ end def publish data, opts = {} exchange.publish(data, opts) end + + # Boolean check to see if the current queue has already been subscribed + # to an exchange. + # + # Attempts to #subscribe multiple times to any exchange will raise an + # Exception. Only a single block at a time can be associated with any + # one queue for processing incoming messages. + # + def subscribed? + !!@on_msg + end + # Passes the message to the block passed to pop or subscribe. + # + # Performs an arity check on the block's parameters. If arity == 1, + # pass only the message body. If arity != 1, pass the headers and + # the body to the block. + # + # See AMQP::Protocol::Header for the hash properties available from + # the headers parameter. See #pop or #subscribe for a code example. + # def receive headers, body - if AMQP.closing - #You don't need this if your using ack, and if you aren't it doesn't do much good either - #@mq.callback{ - # @mq.send Protocol::Basic::Reject.new({ - # :delivery_tag => headers.properties[:delivery_tag], - # :requeue => true - # }) - #} - return - end + headers = MQ::Header.new(@mq, headers) if cb = (@on_msg || @on_pop) cb.call *(cb.arity == 1 ? [body] : [headers, body]) end - - if @ack && headers && !AMQP.closing - @mq.callback{ - @mq.send Protocol::Basic::Ack.new({ :delivery_tag => headers.properties[:delivery_tag] }) - } - end end def status opts = {}, &blk @on_status = blk @mq.callback{ @@ -133,9 +396,27 @@ def cancelled @on_cancel.call if @on_cancel @on_cancel = @on_msg = nil @mq.consumers.delete @consumer_tag @consumer_tag = nil + end + + def reset + @deferred_status = nil + initialize @mq, @name, @opts + + binds = @bindings + @bindings = {} + binds.each{|ex,opts| bind(ex, opts) } + + if blk = @on_msg + @on_msg = nil + subscribe @on_msg_opts, &blk + end + + if @on_pop + pop @on_pop_opts, &@on_pop + end end private def exchange \ No newline at end of file