lib/mq.rb in amqp-0.6.5 vs lib/mq.rb in amqp-0.6.6

- old
+ new

@@ -197,11 +197,11 @@ else MQ.error "Basic.CancelOk for invalid consumer tag: #{method.consumer_tag}" end when Protocol::Queue::DeclareOk - queues[ method.queue ].recieve_status method + queues[ method.queue ].receive_status method when Protocol::Basic::Deliver, Protocol::Basic::GetOk @method = method @header = nil @body = '' @@ -228,10 +228,17 @@ @closing = false conn.callback{ |c| c.channels.delete @channel c.close if c.channels.empty? } + + when Protocol::Basic::ConsumeOk + if @consumer = consumers[ method.consumer_tag ] + @consumer.confirm_subscribe + else + MQ.error "Basic.ConsumeOk for invalid consumer tag: #{method.consumer_tag}" + end end end end def send *args @@ -732,9 +739,23 @@ end end def prefetch(size) send Protocol::Basic::Qos.new(:prefetch_size => 0, :prefetch_count => size, :global => false) + self + end + + # Asks the broker to redeliver all unacknowledged messages on this + # channel. + # + # * requeue (default false) + # If this parameter is false, the message will be redelivered to the original recipient. + # If this flag is true, the server will attempt to requeue the message, potentially then + # delivering it to an alternative subscriber. + # + def recover requeue = false + send Protocol::Basic::Recover.new(:requeue => requeue) + self end # Returns a hash of all the exchange proxy objects. # # Not typically called by client code. \ No newline at end of file