lib/mq.rb in arvicco-amqp-0.6.11 vs lib/mq.rb in arvicco-amqp-0.6.13
- old
+ new
@@ -146,10 +146,11 @@
send Protocol::Channel::Open.new
}
end
attr_reader :channel, :connection
+ alias :conn :connection
# May raise a MQ::Error exception when the frame payload contains a
# Protocol::Channel::Close object.
#
# This usually occurs when a client attempts to perform an illegal
@@ -241,10 +242,13 @@
end
end
end
end
+ # Sends each argument through @connection, setting its *ticket* property to the @ticket
+ # received in most recent Protocol::Access::RequestOk. This operation is Mutex-synchronized.
+ #
def send *args
conn.callback { |c|
(@_send_mutex ||= Mutex.new).synchronize do
args.each do |data|
data.ticket = @ticket if @ticket and data.respond_to? :ticket=
@@ -675,14 +679,15 @@
# If set, the server will not respond to the method. The client should
# not wait for a reply method. If the server could not complete the
# method it will raise a channel or connection exception.
#
def queue name, opts = {}
+ #noinspection RubyArgCount
queues[name] ||= Queue.new(self, name, opts)
end
- # Takes a channel, queue and optional object.
+ # Takes a queue name and optional object.
#
# The optional object may be a class name, module name or object
# instance. When given a class or module name, the object is instantiated
# during this setup. The passed queue is automatically subscribed to so
# it passes all messages (and their arguments) to the object.
@@ -718,10 +723,13 @@
#
def rpc name, obj = nil
rpcs[name] ||= RPC.new(self, name, obj)
end
+ # Schedules the request to close the channel to be sent. Actual closing of
+ # the channels happens when Protocol::Channel::CloseOk is received from broker.
+ #
def close
if @deferred_status == :succeeded
send Protocol::Channel::Close.new(:reply_code => 200,
:reply_text => 'bye',
:method_id => 0,
@@ -730,28 +738,32 @@
@closing = true
end
end
# Define a message and callback block to be executed on all errors.
+ #
def self.error msg = nil, &blk
if blk
@error_callback = blk
else
@error_callback.call(msg) if @error_callback and msg
end
end
- def prefetch(size)
- @prefetch_size = size
- send Protocol::Basic::Qos.new(:prefetch_size => 0, :prefetch_count => size, :global => false)
+ # Asks the broker to set prefetch_count (size of the prefetch buffer) that the broker
+ # will maintain for outstanding unacknowledged messages on a this channel. This is
+ # Applications typically set the prefetch count to 1, which means the processing speed
+ # of the consumer exerts complete backpressure on the flow of messages in that channel.
+ #
+ def prefetch(count)
+ @prefetch_count = count
+ send Protocol::Basic::Qos.new(:prefetch_size => 0, :prefetch_count => count, :global => false)
self
end
- # Asks the broker to redeliver all unacknowledged messages on this
- # channel.
- #
- # * requeue (default false)
+ # Asks the broker to redeliver all unacknowledged messages on this channel.
+ # * :requeue - (default false)
# If this parameter is false, the message will be redelivered to the original recipient.
# If this flag is true, the server will attempt to requeue the message, potentially then
# delivering it to an alternative subscriber.
#
def recover requeue = false
@@ -771,10 +783,14 @@
# Not typically called by client code.
def queues
@queues ||= {}
end
+ # Yields a (Mutex-synchronized) FIFO queue of consumers that issued
+ # Protocol::Basic::Get requests (that is, called Queue#pop)
+ #
+ # Not typically called by client code.
def get_queue
if block_given?
(@get_queue_mutex ||= Mutex.new).synchronize {
yield(@get_queue ||= [])
}
@@ -793,10 +809,12 @@
# Not typically called by client code.
def consumers
@consumers ||= {}
end
+ # Resets and reinitializes the channel and its queues/exchanges
+ #
def reset
@deferred_status = nil
@channel = nil
initialize @connection
@@ -808,15 +826,17 @@
qus = @queues
@queues = {}
qus.each { |_, q| q.reset } if qus
- prefetch(@prefetch_size) if @prefetch_size
+ prefetch(@prefetch_count) if @prefetch_count
end
+ # Tests connection status of associated AMQP connection
+ #
def connected?
- connection.connected?
+ @connection.connected?
end
private
def check_content_completion
@@ -828,33 +848,28 @@
end
def log *args
return unless MQ.logging
pp args
- puts
+ puts ''
end
-
- attr_reader :connection
- alias :conn :connection
end
#-- convenience wrapper (read: HACK) for thread-local MQ object
class MQ
+ # unique identifier
+ def MQ.id
+ Thread.current[:mq_id] ||= "#{`hostname`.strip}-#{Process.pid}-#{Thread.current.object_id}"
+ end
+
def MQ.default
- #-- XXX clear this when connection is closed
+ # TODO: clear this when connection is closed
Thread.current[:mq] ||= MQ.new
end
# Allows for calls to all MQ instance methods. This implicitly calls
# MQ.new so that a new channel is allocated for subsequent operations.
def MQ.method_missing meth, *args, &blk
MQ.default.__send__(meth, *args, &blk)
end
end
-
-class MQ
- # unique identifier
- def MQ.id
- Thread.current[:mq_id] ||= "#{`hostname`.strip}-#{Process.pid}-#{Thread.current.object_id}"
- end
-end
\ No newline at end of file