lib/mq.rb in amqp-0.6.5 vs lib/mq.rb in amqp-0.6.6
- old
+ new
@@ -197,11 +197,11 @@
else
MQ.error "Basic.CancelOk for invalid consumer tag: #{method.consumer_tag}"
end
when Protocol::Queue::DeclareOk
- queues[ method.queue ].recieve_status method
+ queues[ method.queue ].receive_status method
when Protocol::Basic::Deliver, Protocol::Basic::GetOk
@method = method
@header = nil
@body = ''
@@ -228,10 +228,17 @@
@closing = false
conn.callback{ |c|
c.channels.delete @channel
c.close if c.channels.empty?
}
+
+ when Protocol::Basic::ConsumeOk
+ if @consumer = consumers[ method.consumer_tag ]
+ @consumer.confirm_subscribe
+ else
+ MQ.error "Basic.ConsumeOk for invalid consumer tag: #{method.consumer_tag}"
+ end
end
end
end
def send *args
@@ -732,9 +739,23 @@
end
end
def prefetch(size)
send Protocol::Basic::Qos.new(:prefetch_size => 0, :prefetch_count => size, :global => false)
+ self
+ end
+
+ # Asks the broker to redeliver all unacknowledged messages on this
+ # channel.
+ #
+ # * requeue (default false)
+ # If this parameter is false, the message will be redelivered to the original recipient.
+ # If this flag is true, the server will attempt to requeue the message, potentially then
+ # delivering it to an alternative subscriber.
+ #
+ def recover requeue = false
+ send Protocol::Basic::Recover.new(:requeue => requeue)
+ self
end
# Returns a hash of all the exchange proxy objects.
#
# Not typically called by client code.
\ No newline at end of file