lib/mq.rb in brontes3d-amqp-0.6.4.3 vs lib/mq.rb in brontes3d-amqp-0.6.7.1

- old
+ new

@@ -1,9 +1,8 @@ #:main: README # -$:.unshift File.expand_path(File.dirname(File.expand_path(__FILE__))) require 'amqp' class MQ %w[ exchange queue rpc header ].each do |file| require "mq/#{file}" @@ -197,11 +196,11 @@ else MQ.error "Basic.CancelOk for invalid consumer tag: #{method.consumer_tag}" end when Protocol::Queue::DeclareOk - queues[ method.queue ].recieve_status method + queues[ method.queue ].receive_status method when Protocol::Basic::Deliver, Protocol::Basic::GetOk @method = method @header = nil @body = '' @@ -228,10 +227,17 @@ @closing = false conn.callback{ |c| c.channels.delete @channel c.close if c.channels.empty? } + + when Protocol::Basic::ConsumeOk + if @consumer = consumers[ method.consumer_tag ] + @consumer.confirm_subscribe + else + MQ.error "Basic.ConsumeOk for invalid consumer tag: #{method.consumer_tag}" + end end end end def send *args @@ -731,13 +737,28 @@ @error_callback.call(msg) if @error_callback and msg end end def prefetch(size) + @prefetch_size = size send Protocol::Basic::Qos.new(:prefetch_size => 0, :prefetch_count => size, :global => false) + self end + # Asks the broker to redeliver all unacknowledged messages on this + # channel. + # + # * requeue (default false) + # If this parameter is false, the message will be redelivered to the original recipient. + # If this flag is true, the server will attempt to requeue the message, potentially then + # delivering it to an alternative subscriber. + # + def recover requeue = false + send Protocol::Basic::Recover.new(:requeue => requeue) + self + end + # Returns a hash of all the exchange proxy objects. # # Not typically called by client code. def exchanges @exchanges ||= {} @@ -784,10 +805,12 @@ exs.each{ |_,e| e.reset } if exs qus = @queues @queues = {} qus.each{ |_,q| q.reset } if qus + + prefetch(@prefetch_size) if @prefetch_size end private def log *args @@ -818,6 +841,6 @@ class MQ # unique identifier def MQ.id Thread.current[:mq_id] ||= "#{`hostname`.strip}-#{Process.pid}-#{Thread.current.object_id}" end -end \ No newline at end of file +end