lib/bunny/queue08.rb in celldee-bunny-0.5.1 vs lib/bunny/queue08.rb in celldee-bunny-0.5.2
- old
+ new
@@ -212,21 +212,25 @@
message from the client and will re-queue the message if it does not receive one within a time specified
by the server.
* <tt>:exclusive => true or false (_default_)</tt> - Request exclusive consumer access, meaning
only this consumer can access the queue.
* <tt>:nowait => true or false (_default_)</tt> - Ignored by Bunny, always _false_.
+* <tt>:timeout => number of seconds (default = 0 no timeout) - The subscribe loop will continue to wait for
+ messages until terminated (Ctrl-C or kill command) or this timeout interval is reached.
==== RETURNS:
If <tt>:header => true</tt> returns hash <tt>{:header, :delivery_details, :payload}</tt> for each message.
<tt>:delivery_details</tt> is a hash <tt>{:consumer_tag, :delivery_tag, :redelivered, :exchange, :routing_key}</tt>.
If <tt>:header => false</tt> only message payload is returned.
+If <tt>:timeout => > 0</tt> is reached returns :timed_out
=end
def subscribe(opts = {}, &blk)
consumer_tag = opts[:consumer_tag] || name
+ secs = opts[:timeout] || 0
# ignore the :nowait option if passed, otherwise program will hang waiting for a
# response from the server causing an error.
opts.delete(:nowait)
@@ -245,17 +249,23 @@
raise Bunny::ProtocolError,
"Error subscribing to queue #{name}" unless
client.next_method.is_a?(Qrack::Protocol::Basic::ConsumeOk)
- while true
- method = client.next_method
+ loop do
+ begin
+ Timeout::timeout(secs) do
+ @method = client.next_method
+ end
+ rescue Timeout::Error
+ return :timed_out
+ end
- break if method.is_a?(Qrack::Protocol::Basic::CancelOk)
+ break if @method.is_a?(Qrack::Protocol::Basic::CancelOk)
# get delivery tag to use for acknowledge
- self.delivery_tag = method.delivery_tag if ack
+ self.delivery_tag = @method.delivery_tag if ack
header = client.next_payload
msg = client.next_payload
raise Bunny::MessageError, 'unexpected length' if msg.length < header.size
@@ -285,9 +295,13 @@
# ignore the :nowait option if passed, otherwise program will hang waiting for a
# response from the server causing an error
opts.delete(:nowait)
client.send_frame( Qrack::Protocol::Basic::Cancel.new({ :consumer_tag => consumer_tag }.merge(opts)))
+
+ raise Bunny::ProtocolError,
+ "Error unsubscribing from queue #{name}" unless
+ client.next_method.is_a?(Qrack::Protocol::Basic::CancelOk)
end
=begin rdoc