lib/mq.rb in arvicco-amqp-0.6.11 vs lib/mq.rb in arvicco-amqp-0.6.13

- old
+ new

@@ -146,10 +146,11 @@ send Protocol::Channel::Open.new } end attr_reader :channel, :connection + alias :conn :connection # May raise a MQ::Error exception when the frame payload contains a # Protocol::Channel::Close object. # # This usually occurs when a client attempts to perform an illegal @@ -241,10 +242,13 @@ end end end end + # Sends each argument through @connection, setting its *ticket* property to the @ticket + # received in most recent Protocol::Access::RequestOk. This operation is Mutex-synchronized. + # def send *args conn.callback { |c| (@_send_mutex ||= Mutex.new).synchronize do args.each do |data| data.ticket = @ticket if @ticket and data.respond_to? :ticket= @@ -675,14 +679,15 @@ # If set, the server will not respond to the method. The client should # not wait for a reply method. If the server could not complete the # method it will raise a channel or connection exception. # def queue name, opts = {} + #noinspection RubyArgCount queues[name] ||= Queue.new(self, name, opts) end - # Takes a channel, queue and optional object. + # Takes a queue name and optional object. # # The optional object may be a class name, module name or object # instance. When given a class or module name, the object is instantiated # during this setup. The passed queue is automatically subscribed to so # it passes all messages (and their arguments) to the object. @@ -718,10 +723,13 @@ # def rpc name, obj = nil rpcs[name] ||= RPC.new(self, name, obj) end + # Schedules the request to close the channel to be sent. Actual closing of + # the channels happens when Protocol::Channel::CloseOk is received from broker. + # def close if @deferred_status == :succeeded send Protocol::Channel::Close.new(:reply_code => 200, :reply_text => 'bye', :method_id => 0, @@ -730,28 +738,32 @@ @closing = true end end # Define a message and callback block to be executed on all errors. + # def self.error msg = nil, &blk if blk @error_callback = blk else @error_callback.call(msg) if @error_callback and msg end end - def prefetch(size) - @prefetch_size = size - send Protocol::Basic::Qos.new(:prefetch_size => 0, :prefetch_count => size, :global => false) + # Asks the broker to set prefetch_count (size of the prefetch buffer) that the broker + # will maintain for outstanding unacknowledged messages on a this channel. This is + # Applications typically set the prefetch count to 1, which means the processing speed + # of the consumer exerts complete backpressure on the flow of messages in that channel. + # + def prefetch(count) + @prefetch_count = count + send Protocol::Basic::Qos.new(:prefetch_size => 0, :prefetch_count => count, :global => false) self end - # Asks the broker to redeliver all unacknowledged messages on this - # channel. - # - # * requeue (default false) + # Asks the broker to redeliver all unacknowledged messages on this channel. + # * :requeue - (default false) # If this parameter is false, the message will be redelivered to the original recipient. # If this flag is true, the server will attempt to requeue the message, potentially then # delivering it to an alternative subscriber. # def recover requeue = false @@ -771,10 +783,14 @@ # Not typically called by client code. def queues @queues ||= {} end + # Yields a (Mutex-synchronized) FIFO queue of consumers that issued + # Protocol::Basic::Get requests (that is, called Queue#pop) + # + # Not typically called by client code. def get_queue if block_given? (@get_queue_mutex ||= Mutex.new).synchronize { yield(@get_queue ||= []) } @@ -793,10 +809,12 @@ # Not typically called by client code. def consumers @consumers ||= {} end + # Resets and reinitializes the channel and its queues/exchanges + # def reset @deferred_status = nil @channel = nil initialize @connection @@ -808,15 +826,17 @@ qus = @queues @queues = {} qus.each { |_, q| q.reset } if qus - prefetch(@prefetch_size) if @prefetch_size + prefetch(@prefetch_count) if @prefetch_count end + # Tests connection status of associated AMQP connection + # def connected? - connection.connected? + @connection.connected? end private def check_content_completion @@ -828,33 +848,28 @@ end def log *args return unless MQ.logging pp args - puts + puts '' end - - attr_reader :connection - alias :conn :connection end #-- convenience wrapper (read: HACK) for thread-local MQ object class MQ + # unique identifier + def MQ.id + Thread.current[:mq_id] ||= "#{`hostname`.strip}-#{Process.pid}-#{Thread.current.object_id}" + end + def MQ.default - #-- XXX clear this when connection is closed + # TODO: clear this when connection is closed Thread.current[:mq] ||= MQ.new end # Allows for calls to all MQ instance methods. This implicitly calls # MQ.new so that a new channel is allocated for subsequent operations. def MQ.method_missing meth, *args, &blk MQ.default.__send__(meth, *args, &blk) end end - -class MQ - # unique identifier - def MQ.id - Thread.current[:mq_id] ||= "#{`hostname`.strip}-#{Process.pid}-#{Thread.current.object_id}" - end -end \ No newline at end of file