lib/mq.rb in arvicco-amqp-0.6.10 vs lib/mq.rb in arvicco-amqp-0.6.11

- old
+ new

@@ -13,11 +13,12 @@ @logging = false attr_accessor :logging end # Raised whenever an illegal operation is attempted. - class Error < StandardError; end + class Error < StandardError; + end end # The top-level class for building AMQP clients. This class contains several # convenience methods for working with queues and exchanges. Many calls # delegate/forward to subclasses, but this is the preferred API. The subclass @@ -138,17 +139,18 @@ def initialize connection = nil raise 'MQ can only be used from within EM.run{}' unless EM.reactor_running? @connection = connection || AMQP.start - conn.callback{ |c| + conn.callback { |c| @channel = c.add_channel(self) send Protocol::Channel::Open.new } end + attr_reader :channel, :connection - + # May raise a MQ::Error exception when the frame payload contains a # Protocol::Channel::Close object. # # This usually occurs when a client attempts to perform an illegal # operation. A short, and incomplete, list of potential illegal operations @@ -158,90 +160,93 @@ # def process_frame frame log :received, frame case frame - when Frame::Header - @header = frame.payload - @body = '' - check_content_completion + when Frame::Header + @header = frame.payload + @body = '' + check_content_completion - when Frame::Body - @body << frame.payload - check_content_completion + when Frame::Body + @body << frame.payload + check_content_completion - when Frame::Method - case method = frame.payload - when Protocol::Channel::OpenOk - send Protocol::Access::Request.new(:realm => '/data', - :read => true, - :write => true, - :active => true, - :passive => true) + when Frame::Method + case method = frame.payload + when Protocol::Channel::OpenOk + send Protocol::Access::Request.new(:realm => '/data', + :read => true, + :write => true, + :active => true, + :passive => true) - when Protocol::Access::RequestOk - @ticket = method.ticket - callback{ - send Protocol::Channel::Close.new(:reply_code => 200, - :reply_text => 'bye', - :method_id => 0, - :class_id => 0) - } if @closing - succeed + when Protocol::Access::RequestOk + @ticket = method.ticket + callback { + send Protocol::Channel::Close.new(:reply_code => 200, + :reply_text => 'bye', + :method_id => 0, + :class_id => 0) + } if @closing + succeed - when Protocol::Basic::CancelOk - if @consumer = consumers[ method.consumer_tag ] - @consumer.cancelled - else - MQ.error "Basic.CancelOk for invalid consumer tag: #{method.consumer_tag}" - end + when Protocol::Basic::CancelOk + if @consumer = consumers[method.consumer_tag] + @consumer.cancelled + else + MQ.error "Basic.CancelOk for invalid consumer tag: #{method.consumer_tag}" + end - when Protocol::Queue::DeclareOk - queues[ method.queue ].receive_status method + when Protocol::Queue::DeclareOk + queues[method.queue].receive_status method - when Protocol::Basic::Deliver, Protocol::Basic::GetOk - @method = method - @header = nil - @body = '' + when Protocol::Basic::GetOk + @method = method + @header = nil + @body = '' - if method.is_a? Protocol::Basic::GetOk - @consumer = get_queue{|q| q.shift } - MQ.error "No pending Basic.GetOk requests" unless @consumer - else - @consumer = consumers[ method.consumer_tag ] - MQ.error "Basic.Deliver for invalid consumer tag: #{method.consumer_tag}" unless @consumer - end + @consumer = get_queue { |q| q.shift } + MQ.error "No pending Basic.GetOk requests" unless @consumer - when Protocol::Basic::GetEmpty - if @consumer = get_queue{|q| q.shift } - @consumer.receive nil, nil - else - MQ.error "Basic.GetEmpty for invalid consumer" - end + when Protocol::Basic::Deliver + @method = method + @header = nil + @body = '' - when Protocol::Channel::Close - raise Error, "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]} on #{@channel}" + @consumer = consumers[method.consumer_tag] + MQ.error "Basic.Deliver for invalid consumer tag: #{method.consumer_tag}" unless @consumer - when Protocol::Channel::CloseOk - @closing = false - conn.callback{ |c| - c.channels.delete @channel - c.close if c.channels.empty? - } + when Protocol::Basic::GetEmpty + if @consumer = get_queue { |q| q.shift } + @consumer.receive nil, nil + else + MQ.error "Basic.GetEmpty for invalid consumer" + end - when Protocol::Basic::ConsumeOk - if @consumer = consumers[ method.consumer_tag ] - @consumer.confirm_subscribe - else - MQ.error "Basic.ConsumeOk for invalid consumer tag: #{method.consumer_tag}" + when Protocol::Channel::Close + raise Error, "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]} on #{@channel}" + + when Protocol::Channel::CloseOk + @closing = false + conn.callback { |c| + c.channels.delete @channel + c.close if c.channels.empty? + } + + when Protocol::Basic::ConsumeOk + if @consumer = consumers[method.consumer_tag] + @consumer.confirm_subscribe + else + MQ.error "Basic.ConsumeOk for invalid consumer tag: #{method.consumer_tag}" + end end - end end end def send *args - conn.callback{ |c| + conn.callback { |c| (@_send_mutex ||= Mutex.new).synchronize do args.each do |data| data.ticket = @ticket if @ticket and data.respond_to? :ticket= log :sending, data c.send data, :channel => @channel @@ -768,12 +773,12 @@ @queues ||= {} end def get_queue if block_given? - (@get_queue_mutex ||= Mutex.new).synchronize{ - yield( @get_queue ||= [] ) + (@get_queue_mutex ||= Mutex.new).synchronize { + yield(@get_queue ||= []) } end end # Returns a hash of all rpc proxy objects. @@ -797,15 +802,15 @@ @consumers = {} exs = @exchanges @exchanges = {} - exs.each{ |_,e| e.reset } if exs + exs.each { |_, e| e.reset } if exs qus = @queues @queues = {} - qus.each{ |_,q| q.reset } if qus + qus.each { |_, q| q.reset } if qus prefetch(@prefetch_size) if @prefetch_size end def connected? @@ -814,10 +819,10 @@ private def check_content_completion if @body.length >= @header.size - @header.properties.update(@method.arguments) + @header.properties.update(@method.arguments) if @method @consumer.receive @header, @body if @consumer @body = @header = @consumer = @method = nil end end \ No newline at end of file