lib/bunny/queue08.rb in celldee-bunny-0.5.2 vs lib/bunny/queue08.rb in celldee-bunny-0.5.3
- old
+ new
@@ -7,13 +7,11 @@
Queues store and forward messages. Queues can be configured in the server or created at runtime.
Queues must be attached to at least one exchange in order to receive messages from publishers.
=end
- class Queue
- attr_reader :name, :client
- attr_accessor :delivery_tag
+ class Queue < Qrack::Queue
def initialize(client, name, opts = {})
# check connection to server
raise Bunny::ConnectionError, 'Not connected to server' if client.status == :not_connected
@@ -78,10 +76,91 @@
=begin rdoc
=== DESCRIPTION:
+Binds a queue to an exchange. Until a queue is bound it will not receive any messages. Queues are
+bound to the direct exchange '' by default. If error occurs, a _Bunny_::_ProtocolError_ is raised.
+
+* <tt>:key => 'routing key'* <tt>:key => 'routing_key'</tt> - Specifies the routing key for
+ the binding. The routing key is used for routing messages depending on the exchange configuration.
+* <tt>:nowait => true or false (_default_)</tt> - Ignored by Bunny, always _false_.
+
+==== RETURNS:
+
+<tt>:bind_ok</tt> if successful.
+
+=end
+
+ def bind(exchange, opts = {})
+ exchange = exchange.respond_to?(:name) ? exchange.name : exchange
+
+ # ignore the :nowait option if passed, otherwise program will hang waiting for a
+ # response that will not be sent by the server
+ opts.delete(:nowait)
+
+ client.send_frame(
+ Qrack::Protocol::Queue::Bind.new({ :queue => name,
+ :exchange => exchange,
+ :routing_key => opts.delete(:key),
+ :nowait => false }.merge(opts))
+ )
+
+ raise Bunny::ProtocolError,
+ "Error binding queue #{name}" unless
+ client.next_method.is_a?(Qrack::Protocol::Queue::BindOk)
+
+ # return message
+ :bind_ok
+ end
+
+=begin rdoc
+
+=== DESCRIPTION:
+
+Requests that a queue is deleted from broker/server. When a queue is deleted any pending messages
+are sent to a dead-letter queue if this is defined in the server configuration. Removes reference
+from queues if successful. If an error occurs raises _Bunny_::_ProtocolError_.
+
+==== Options:
+
+* <tt>:if_unused => true or false (_default_)</tt> - If set to _true_, the server will only
+ delete the queue if it has no consumers. If the queue has consumers the server does not
+ delete it but raises a channel exception instead.
+* <tt>:if_empty => true or false (_default_)</tt> - If set to _true_, the server will only
+ delete the queue if it has no messages. If the queue is not empty the server raises a channel
+ exception.
+* <tt>:nowait => true or false (_default_)</tt> - Ignored by Bunny, always _false_.
+
+==== Returns:
+
+<tt>:delete_ok</tt> if successful
+=end
+
+ def delete(opts = {})
+ # ignore the :nowait option if passed, otherwise program will hang waiting for a
+ # response that will not be sent by the server
+ opts.delete(:nowait)
+
+ client.send_frame(
+ Qrack::Protocol::Queue::Delete.new({ :queue => name, :nowait => false }.merge(opts))
+ )
+
+ raise Bunny::ProtocolError,
+ "Error deleting queue #{name}" unless
+ client.next_method.is_a?(Qrack::Protocol::Queue::DeleteOk)
+
+ client.queues.delete(name)
+
+ # return confirmation
+ :delete_ok
+ end
+
+=begin rdoc
+
+=== DESCRIPTION:
+
Gets a message from a queue in a synchronous way. If error occurs, raises _Bunny_::_ProtocolError_.
==== OPTIONS:
* <tt>:header => true or false (_default_)</tt> - If set to _true_,
@@ -124,58 +203,53 @@
# get delivery tag to use for acknowledge
self.delivery_tag = method.delivery_tag if ack
header = client.next_payload
- msg = client.next_payload
- raise Bunny::MessageError, 'unexpected length' if msg.length < header.size
+
+ # If maximum frame size is smaller than message payload body then message
+ # will have a message header and several message bodies
+ msg = ''
+ while msg.length < header.size
+ msg += client.next_payload
+ end
# Return message with additional info if requested
hdr ? {:header => header, :payload => msg, :delivery_details => method.arguments} : msg
end
-
+
=begin rdoc
=== DESCRIPTION:
-Publishes a message to the queue via the default nameless '' direct exchange.
+Removes all messages from a queue. It does not cancel consumers. Purged messages are deleted
+without any formal "undo" mechanism. If an error occurs raises _Bunny_::_ProtocolError_.
-==== RETURNS:
+==== Options:
-nil
+* <tt>:nowait => true or false (_default_)</tt> - Ignored by Bunny, always _false_.
-=end
+==== Returns:
- def publish(data, opts = {})
- exchange.publish(data, opts)
- end
-
-=begin rdoc
-
-=== DESCRIPTION:
-
-Returns message count from Queue#status.
-
+<tt>:purge_ok</tt> if successful
=end
- def message_count
- s = status
- s[:message_count]
- end
+ def purge(opts = {})
+ # ignore the :nowait option if passed, otherwise program will hang waiting for a
+ # response that will not be sent by the server
+ opts.delete(:nowait)
-=begin rdoc
+ client.send_frame(
+ Qrack::Protocol::Queue::Purge.new({ :queue => name, :nowait => false }.merge(opts))
+ )
-=== DESCRIPTION:
+ raise Bunny::ProtocolError, "Error purging queue #{name}" unless client.next_method.is_a?(Qrack::Protocol::Queue::PurgeOk)
-Returns consumer count from Queue#status.
+ # return confirmation
+ :purge_ok
-=end
-
- def consumer_count
- s = status
- s[:consumer_count]
end
=begin rdoc
=== DESCRIPTION:
@@ -212,25 +286,31 @@
message from the client and will re-queue the message if it does not receive one within a time specified
by the server.
* <tt>:exclusive => true or false (_default_)</tt> - Request exclusive consumer access, meaning
only this consumer can access the queue.
* <tt>:nowait => true or false (_default_)</tt> - Ignored by Bunny, always _false_.
-* <tt>:timeout => number of seconds (default = 0 no timeout) - The subscribe loop will continue to wait for
+* <tt>:timeout => number of seconds - The subscribe loop will continue to wait for
messages until terminated (Ctrl-C or kill command) or this timeout interval is reached.
+* <tt>:message_max => max number messages to process</tt> - When the required number of messages
+ is processed subscribe loop is exited.
==== RETURNS:
If <tt>:header => true</tt> returns hash <tt>{:header, :delivery_details, :payload}</tt> for each message.
<tt>:delivery_details</tt> is a hash <tt>{:consumer_tag, :delivery_tag, :redelivered, :exchange, :routing_key}</tt>.
If <tt>:header => false</tt> only message payload is returned.
-If <tt>:timeout => > 0</tt> is reached returns :timed_out
+If <tt>:timeout => > 0</tt> is reached Qrack::ClientTimeout is raised
=end
def subscribe(opts = {}, &blk)
- consumer_tag = opts[:consumer_tag] || name
- secs = opts[:timeout] || 0
+ # Get maximum amount of messages to process
+ message_max = opts[:message_max] || nil
+ return if message_max == 0
+
+ # If a consumer tag is not passed in the server will generate one
+ consumer_tag = opts[:consumer_tag] || nil
# ignore the :nowait option if passed, otherwise program will hang waiting for a
# response from the server causing an error.
opts.delete(:nowait)
@@ -249,30 +329,36 @@
raise Bunny::ProtocolError,
"Error subscribing to queue #{name}" unless
client.next_method.is_a?(Qrack::Protocol::Basic::ConsumeOk)
+ # Initialize message counter
+ counter = 0
+
loop do
- begin
- Timeout::timeout(secs) do
- @method = client.next_method
- end
- rescue Timeout::Error
- return :timed_out
- end
-
- break if @method.is_a?(Qrack::Protocol::Basic::CancelOk)
+ method = client.next_method(:timeout => opts[:timeout])
# get delivery tag to use for acknowledge
- self.delivery_tag = @method.delivery_tag if ack
+ self.delivery_tag = method.delivery_tag if ack
header = client.next_payload
- msg = client.next_payload
- raise Bunny::MessageError, 'unexpected length' if msg.length < header.size
+
+ # If maximum frame size is smaller than message payload body then message
+ # will have a message header and several message bodies
+ msg = ''
+ while msg.length < header.size
+ msg += client.next_payload
+ end
# pass the message and related info, if requested, to the block for processing
blk.call(hdr ? {:header => header, :payload => msg, :delivery_details => method.arguments} : msg)
+
+ # Increment message counter
+ counter += 1
+
+ # Exit loop if message_max condition met
+ break if !message_max.nil? and counter == message_max
end
end
=begin rdoc
@@ -285,10 +371,14 @@
==== OPTIONS:
* <tt>:consumer_tag => '_tag_'</tt> - Specifies the identifier for the consumer.
* <tt>:nowait => true or false (_default_)</tt> - Ignored by Bunny, always _false_.
+==== Returns:
+
+<tt>:unsubscribe_ok</tt> if successful
+
=end
def unsubscribe(opts = {})
consumer_tag = opts[:consumer_tag] || name
@@ -299,57 +389,19 @@
client.send_frame( Qrack::Protocol::Basic::Cancel.new({ :consumer_tag => consumer_tag }.merge(opts)))
raise Bunny::ProtocolError,
"Error unsubscribing from queue #{name}" unless
client.next_method.is_a?(Qrack::Protocol::Basic::CancelOk)
-
+
+ # return confirmation
+ :unsubscribe_ok
end
=begin rdoc
=== DESCRIPTION:
-Binds a queue to an exchange. Until a queue is bound it will not receive any messages. Queues are
-bound to the direct exchange '' by default. If error occurs, a _Bunny_::_ProtocolError_ is raised.
-
-* <tt>:key => 'routing key'* <tt>:key => 'routing_key'</tt> - Specifies the routing key for
- the binding. The routing key is used for routing messages depending on the exchange configuration.
-* <tt>:nowait => true or false (_default_)</tt> - Ignored by Bunny, always _false_.
-
-==== RETURNS:
-
-<tt>:bind_ok</tt> if successful.
-
-=end
-
- def bind(exchange, opts = {})
- exchange = exchange.respond_to?(:name) ? exchange.name : exchange
-
- # ignore the :nowait option if passed, otherwise program will hang waiting for a
- # response that will not be sent by the server
- opts.delete(:nowait)
-
- bindings[exchange] = opts
- client.send_frame(
- Qrack::Protocol::Queue::Bind.new({ :queue => name,
- :exchange => exchange,
- :routing_key => opts.delete(:key),
- :nowait => false }.merge(opts))
- )
-
- raise Bunny::ProtocolError,
- "Error binding queue #{name}" unless
- client.next_method.is_a?(Qrack::Protocol::Queue::BindOk)
-
- # return message
- :bind_ok
- end
-
-=begin rdoc
-
-=== DESCRIPTION:
-
Removes a queue binding from an exchange. If error occurs, a _Bunny_::_ProtocolError_ is raised.
==== OPTIONS:
* <tt>:key => 'routing key'* <tt>:key => 'routing_key'</tt> - Specifies the routing key for
the binding.
@@ -365,12 +417,10 @@
exchange = exchange.respond_to?(:name) ? exchange.name : exchange
# ignore the :nowait option if passed, otherwise program will hang waiting for a
# response that will not be sent by the server
opts.delete(:nowait)
-
- bindings.delete(exchange)
client.send_frame(
Qrack::Protocol::Queue::Unbind.new({ :queue => name,
:exchange => exchange,
:routing_key => opts.delete(:key),
@@ -384,90 +434,14 @@
# return message
:unbind_ok
end
-=begin rdoc
-
-=== DESCRIPTION:
-
-Requests that a queue is deleted from broker/server. When a queue is deleted any pending messages
-are sent to a dead-letter queue if this is defined in the server configuration. Removes reference
-from queues if successful. If an error occurs raises _Bunny_::_ProtocolError_.
-
-==== Options:
-
-* <tt>:if_unused => true or false (_default_)</tt> - If set to _true_, the server will only
- delete the queue if it has no consumers. If the queue has consumers the server does not
- delete it but raises a channel exception instead.
-* <tt>:if_empty => true or false (_default_)</tt> - If set to _true_, the server will only
- delete the queue if it has no messages. If the queue is not empty the server raises a channel
- exception.
-* <tt>:nowait => true or false (_default_)</tt> - Ignored by Bunny, always _false_.
-
-==== Returns:
-
-<tt>:delete_ok</tt> if successful
-=end
-
- def delete(opts = {})
- # ignore the :nowait option if passed, otherwise program will hang waiting for a
- # response that will not be sent by the server
- opts.delete(:nowait)
-
- client.send_frame(
- Qrack::Protocol::Queue::Delete.new({ :queue => name, :nowait => false }.merge(opts))
- )
-
- raise Bunny::ProtocolError,
- "Error deleting queue #{name}" unless
- client.next_method.is_a?(Qrack::Protocol::Queue::DeleteOk)
-
- client.queues.delete(name)
-
- # return confirmation
- :delete_ok
- end
-
-=begin rdoc
-
-=== DESCRIPTION:
-
-Removes all messages from a queue. It does not cancel consumers. Purged messages are deleted
-without any formal "undo" mechanism. If an error occurs raises _Bunny_::_ProtocolError_.
-
-==== Options:
-
-* <tt>:nowait => true or false (_default_)</tt> - Ignored by Bunny, always _false_.
-
-==== Returns:
-
-<tt>:purge_ok</tt> if successful
-=end
-
- def purge(opts = {})
- # ignore the :nowait option if passed, otherwise program will hang waiting for a
- # response that will not be sent by the server
- opts.delete(:nowait)
+ private
- client.send_frame(
- Qrack::Protocol::Queue::Purge.new({ :queue => name, :nowait => false }.merge(opts))
- )
-
- raise Bunny::ProtocolError, "Error purging queue #{name}" unless client.next_method.is_a?(Qrack::Protocol::Queue::PurgeOk)
-
- # return confirmation
- :purge_ok
-
- end
-
- private
- def exchange
+ def exchange
@exchange ||= Bunny::Exchange.new(client, '', {:type => :direct, :key => name})
end
-
- def bindings
- @bindings ||= {}
- end
+
end
end