lib/bunny/queue08.rb in celldee-bunny-0.5.1 vs lib/bunny/queue08.rb in celldee-bunny-0.5.2

- old
+ new

@@ -212,21 +212,25 @@ message from the client and will re-queue the message if it does not receive one within a time specified by the server. * <tt>:exclusive => true or false (_default_)</tt> - Request exclusive consumer access, meaning only this consumer can access the queue. * <tt>:nowait => true or false (_default_)</tt> - Ignored by Bunny, always _false_. +* <tt>:timeout => number of seconds (default = 0 no timeout) - The subscribe loop will continue to wait for + messages until terminated (Ctrl-C or kill command) or this timeout interval is reached. ==== RETURNS: If <tt>:header => true</tt> returns hash <tt>{:header, :delivery_details, :payload}</tt> for each message. <tt>:delivery_details</tt> is a hash <tt>{:consumer_tag, :delivery_tag, :redelivered, :exchange, :routing_key}</tt>. If <tt>:header => false</tt> only message payload is returned. +If <tt>:timeout => > 0</tt> is reached returns :timed_out =end def subscribe(opts = {}, &blk) consumer_tag = opts[:consumer_tag] || name + secs = opts[:timeout] || 0 # ignore the :nowait option if passed, otherwise program will hang waiting for a # response from the server causing an error. opts.delete(:nowait) @@ -245,17 +249,23 @@ raise Bunny::ProtocolError, "Error subscribing to queue #{name}" unless client.next_method.is_a?(Qrack::Protocol::Basic::ConsumeOk) - while true - method = client.next_method + loop do + begin + Timeout::timeout(secs) do + @method = client.next_method + end + rescue Timeout::Error + return :timed_out + end - break if method.is_a?(Qrack::Protocol::Basic::CancelOk) + break if @method.is_a?(Qrack::Protocol::Basic::CancelOk) # get delivery tag to use for acknowledge - self.delivery_tag = method.delivery_tag if ack + self.delivery_tag = @method.delivery_tag if ack header = client.next_payload msg = client.next_payload raise Bunny::MessageError, 'unexpected length' if msg.length < header.size @@ -285,9 +295,13 @@ # ignore the :nowait option if passed, otherwise program will hang waiting for a # response from the server causing an error opts.delete(:nowait) client.send_frame( Qrack::Protocol::Basic::Cancel.new({ :consumer_tag => consumer_tag }.merge(opts))) + + raise Bunny::ProtocolError, + "Error unsubscribing from queue #{name}" unless + client.next_method.is_a?(Qrack::Protocol::Basic::CancelOk) end =begin rdoc