lib/mq.rb in arvicco-amqp-0.6.10 vs lib/mq.rb in arvicco-amqp-0.6.11
- old
+ new
@@ -13,11 +13,12 @@
@logging = false
attr_accessor :logging
end
# Raised whenever an illegal operation is attempted.
- class Error < StandardError; end
+ class Error < StandardError;
+ end
end
# The top-level class for building AMQP clients. This class contains several
# convenience methods for working with queues and exchanges. Many calls
# delegate/forward to subclasses, but this is the preferred API. The subclass
@@ -138,17 +139,18 @@
def initialize connection = nil
raise 'MQ can only be used from within EM.run{}' unless EM.reactor_running?
@connection = connection || AMQP.start
- conn.callback{ |c|
+ conn.callback { |c|
@channel = c.add_channel(self)
send Protocol::Channel::Open.new
}
end
+
attr_reader :channel, :connection
-
+
# May raise a MQ::Error exception when the frame payload contains a
# Protocol::Channel::Close object.
#
# This usually occurs when a client attempts to perform an illegal
# operation. A short, and incomplete, list of potential illegal operations
@@ -158,90 +160,93 @@
#
def process_frame frame
log :received, frame
case frame
- when Frame::Header
- @header = frame.payload
- @body = ''
- check_content_completion
+ when Frame::Header
+ @header = frame.payload
+ @body = ''
+ check_content_completion
- when Frame::Body
- @body << frame.payload
- check_content_completion
+ when Frame::Body
+ @body << frame.payload
+ check_content_completion
- when Frame::Method
- case method = frame.payload
- when Protocol::Channel::OpenOk
- send Protocol::Access::Request.new(:realm => '/data',
- :read => true,
- :write => true,
- :active => true,
- :passive => true)
+ when Frame::Method
+ case method = frame.payload
+ when Protocol::Channel::OpenOk
+ send Protocol::Access::Request.new(:realm => '/data',
+ :read => true,
+ :write => true,
+ :active => true,
+ :passive => true)
- when Protocol::Access::RequestOk
- @ticket = method.ticket
- callback{
- send Protocol::Channel::Close.new(:reply_code => 200,
- :reply_text => 'bye',
- :method_id => 0,
- :class_id => 0)
- } if @closing
- succeed
+ when Protocol::Access::RequestOk
+ @ticket = method.ticket
+ callback {
+ send Protocol::Channel::Close.new(:reply_code => 200,
+ :reply_text => 'bye',
+ :method_id => 0,
+ :class_id => 0)
+ } if @closing
+ succeed
- when Protocol::Basic::CancelOk
- if @consumer = consumers[ method.consumer_tag ]
- @consumer.cancelled
- else
- MQ.error "Basic.CancelOk for invalid consumer tag: #{method.consumer_tag}"
- end
+ when Protocol::Basic::CancelOk
+ if @consumer = consumers[method.consumer_tag]
+ @consumer.cancelled
+ else
+ MQ.error "Basic.CancelOk for invalid consumer tag: #{method.consumer_tag}"
+ end
- when Protocol::Queue::DeclareOk
- queues[ method.queue ].receive_status method
+ when Protocol::Queue::DeclareOk
+ queues[method.queue].receive_status method
- when Protocol::Basic::Deliver, Protocol::Basic::GetOk
- @method = method
- @header = nil
- @body = ''
+ when Protocol::Basic::GetOk
+ @method = method
+ @header = nil
+ @body = ''
- if method.is_a? Protocol::Basic::GetOk
- @consumer = get_queue{|q| q.shift }
- MQ.error "No pending Basic.GetOk requests" unless @consumer
- else
- @consumer = consumers[ method.consumer_tag ]
- MQ.error "Basic.Deliver for invalid consumer tag: #{method.consumer_tag}" unless @consumer
- end
+ @consumer = get_queue { |q| q.shift }
+ MQ.error "No pending Basic.GetOk requests" unless @consumer
- when Protocol::Basic::GetEmpty
- if @consumer = get_queue{|q| q.shift }
- @consumer.receive nil, nil
- else
- MQ.error "Basic.GetEmpty for invalid consumer"
- end
+ when Protocol::Basic::Deliver
+ @method = method
+ @header = nil
+ @body = ''
- when Protocol::Channel::Close
- raise Error, "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]} on #{@channel}"
+ @consumer = consumers[method.consumer_tag]
+ MQ.error "Basic.Deliver for invalid consumer tag: #{method.consumer_tag}" unless @consumer
- when Protocol::Channel::CloseOk
- @closing = false
- conn.callback{ |c|
- c.channels.delete @channel
- c.close if c.channels.empty?
- }
+ when Protocol::Basic::GetEmpty
+ if @consumer = get_queue { |q| q.shift }
+ @consumer.receive nil, nil
+ else
+ MQ.error "Basic.GetEmpty for invalid consumer"
+ end
- when Protocol::Basic::ConsumeOk
- if @consumer = consumers[ method.consumer_tag ]
- @consumer.confirm_subscribe
- else
- MQ.error "Basic.ConsumeOk for invalid consumer tag: #{method.consumer_tag}"
+ when Protocol::Channel::Close
+ raise Error, "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]} on #{@channel}"
+
+ when Protocol::Channel::CloseOk
+ @closing = false
+ conn.callback { |c|
+ c.channels.delete @channel
+ c.close if c.channels.empty?
+ }
+
+ when Protocol::Basic::ConsumeOk
+ if @consumer = consumers[method.consumer_tag]
+ @consumer.confirm_subscribe
+ else
+ MQ.error "Basic.ConsumeOk for invalid consumer tag: #{method.consumer_tag}"
+ end
end
- end
end
end
def send *args
- conn.callback{ |c|
+ conn.callback { |c|
(@_send_mutex ||= Mutex.new).synchronize do
args.each do |data|
data.ticket = @ticket if @ticket and data.respond_to? :ticket=
log :sending, data
c.send data, :channel => @channel
@@ -768,12 +773,12 @@
@queues ||= {}
end
def get_queue
if block_given?
- (@get_queue_mutex ||= Mutex.new).synchronize{
- yield( @get_queue ||= [] )
+ (@get_queue_mutex ||= Mutex.new).synchronize {
+ yield(@get_queue ||= [])
}
end
end
# Returns a hash of all rpc proxy objects.
@@ -797,15 +802,15 @@
@consumers = {}
exs = @exchanges
@exchanges = {}
- exs.each{ |_,e| e.reset } if exs
+ exs.each { |_, e| e.reset } if exs
qus = @queues
@queues = {}
- qus.each{ |_,q| q.reset } if qus
+ qus.each { |_, q| q.reset } if qus
prefetch(@prefetch_size) if @prefetch_size
end
def connected?
@@ -814,10 +819,10 @@
private
def check_content_completion
if @body.length >= @header.size
- @header.properties.update(@method.arguments)
+ @header.properties.update(@method.arguments) if @method
@consumer.receive @header, @body if @consumer
@body = @header = @consumer = @method = nil
end
end
\ No newline at end of file