lib/mq.rb in tmm1-amqp-0.5.5 vs lib/mq.rb in tmm1-amqp-0.5.9
- old
+ new
@@ -9,11 +9,11 @@
class << self
@logging = false
attr_accessor :logging
end
- class Error < Exception; end
+ class Error < StandardError; end
end
class MQ
include AMQP
include EM::Deferrable
@@ -40,21 +40,22 @@
when Frame::Body
@body << frame.payload
if @body.length >= @header.size
@header.properties.update(@method.arguments)
- @consumer.receive @header, @body
- @body = ''
+ @consumer.receive @header, @body if @consumer
+ @body = @header = @consumer = @method = nil
end
when Frame::Method
case method = frame.payload
when Protocol::Channel::OpenOk
send Protocol::Access::Request.new(:realm => '/data',
:read => true,
:write => true,
- :active => true)
+ :active => true,
+ :passive => true)
when Protocol::Access::RequestOk
@ticket = method.ticket
callback{
send Protocol::Channel::Close.new(:reply_code => 200,
@@ -62,35 +63,59 @@
:method_id => 0,
:class_id => 0)
} if @closing
succeed
- when Protocol::Basic::Deliver
+ when Protocol::Basic::CancelOk
+ if @consumer = consumers[ method.consumer_tag ]
+ @consumer.cancelled
+ else
+ MQ.error "Basic.CancelOk for invalid consumer tag: #{method.consumer_tag}"
+ end
+
+ when Protocol::Queue::DeclareOk
+ queues[ method.queue ].recieve_status method
+
+ when Protocol::Basic::Deliver, Protocol::Basic::GetOk
@method = method
@header = nil
@body = ''
- @consumer = queues[ method.consumer_tag ]
+ if method.is_a? Protocol::Basic::GetOk
+ @consumer = get_queue{|q| q.shift }
+ MQ.error "No pending Basic.GetOk requests" unless @consumer
+ else
+ @consumer = consumers[ method.consumer_tag ]
+ MQ.error "Basic.Deliver for invalid consumer tag: #{method.consumer_tag}" unless @consumer
+ end
+ when Protocol::Basic::GetEmpty
+ @consumer = get_queue{|q| q.shift }
+ @consumer.receive nil, nil
+
when Protocol::Channel::Close
- raise Error, "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]}"
+ raise Error, "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]} on #{@channel}"
when Protocol::Channel::CloseOk
@closing = false
conn.callback{ |c|
- c.channels.delete(@channel)
- c.close unless c.channels.keys.any?
+ c.channels.delete @channel
+ c.close if c.channels.empty?
}
end
end
end
- def send data
- data.ticket = @ticket if @ticket and data.respond_to? :ticket
+ def send *args
conn.callback{ |c|
- log :sending, data
- c.send data, :channel => @channel
+ (@_send_mutex ||= Mutex.new).synchronize do
+ args.each do |data|
+ data.ticket = @ticket if @ticket and data.respond_to? :ticket=
+ log :sending, data
+ c.send data, :channel => @channel
+ end
+ end
}
end
%w[ direct topic fanout ].each do |type|
class_eval %[
@@ -117,24 +142,48 @@
else
@closing = true
end
end
+ # error callback
+
+ def self.error msg = nil, &blk
+ if blk
+ @error_callback = blk
+ else
+ @error_callback.call(msg) if @error_callback and msg
+ end
+ end
+
# keep track of proxy objects
def exchanges
@exchanges ||= {}
end
def queues
@queues ||= {}
end
+ def get_queue
+ if block_given?
+ (@get_queue_mutex ||= Mutex.new).synchronize{
+ yield( @get_queue ||= [] )
+ }
+ end
+ end
+
def rpcs
@rcps ||= {}
end
+ # queue objects keyed on their consumer tags
+
+ def consumers
+ @consumers ||= {}
+ end
+
private
def log *args
return unless MQ.logging
pp args
@@ -143,13 +192,14 @@
attr_reader :connection
alias :conn :connection
end
-# convenience wrapper for thread-local MQ object
+# convenience wrapper (read: HACK) for thread-local MQ object
class MQ
def MQ.default
+ # XXX clear this when connection is closed
Thread.current[:mq] ||= MQ.new
end
def MQ.method_missing meth, *args, &blk
MQ.default.__send__(meth, *args, &blk)
\ No newline at end of file