lib/mq.rb in brontes3d-amqp-0.6.4.3 vs lib/mq.rb in brontes3d-amqp-0.6.7.1
- old
+ new
@@ -1,9 +1,8 @@
#:main: README
#
-$:.unshift File.expand_path(File.dirname(File.expand_path(__FILE__)))
require 'amqp'
class MQ
%w[ exchange queue rpc header ].each do |file|
require "mq/#{file}"
@@ -197,11 +196,11 @@
else
MQ.error "Basic.CancelOk for invalid consumer tag: #{method.consumer_tag}"
end
when Protocol::Queue::DeclareOk
- queues[ method.queue ].recieve_status method
+ queues[ method.queue ].receive_status method
when Protocol::Basic::Deliver, Protocol::Basic::GetOk
@method = method
@header = nil
@body = ''
@@ -228,10 +227,17 @@
@closing = false
conn.callback{ |c|
c.channels.delete @channel
c.close if c.channels.empty?
}
+
+ when Protocol::Basic::ConsumeOk
+ if @consumer = consumers[ method.consumer_tag ]
+ @consumer.confirm_subscribe
+ else
+ MQ.error "Basic.ConsumeOk for invalid consumer tag: #{method.consumer_tag}"
+ end
end
end
end
def send *args
@@ -731,13 +737,28 @@
@error_callback.call(msg) if @error_callback and msg
end
end
def prefetch(size)
+ @prefetch_size = size
send Protocol::Basic::Qos.new(:prefetch_size => 0, :prefetch_count => size, :global => false)
+ self
end
+ # Asks the broker to redeliver all unacknowledged messages on this
+ # channel.
+ #
+ # * requeue (default false)
+ # If this parameter is false, the message will be redelivered to the original recipient.
+ # If this flag is true, the server will attempt to requeue the message, potentially then
+ # delivering it to an alternative subscriber.
+ #
+ def recover requeue = false
+ send Protocol::Basic::Recover.new(:requeue => requeue)
+ self
+ end
+
# Returns a hash of all the exchange proxy objects.
#
# Not typically called by client code.
def exchanges
@exchanges ||= {}
@@ -784,10 +805,12 @@
exs.each{ |_,e| e.reset } if exs
qus = @queues
@queues = {}
qus.each{ |_,q| q.reset } if qus
+
+ prefetch(@prefetch_size) if @prefetch_size
end
private
def log *args
@@ -818,6 +841,6 @@
class MQ
# unique identifier
def MQ.id
Thread.current[:mq_id] ||= "#{`hostname`.strip}-#{Process.pid}-#{Thread.current.object_id}"
end
-end
\ No newline at end of file
+end