lib/mq.rb in tmm1-amqp-0.5.2 vs lib/mq.rb in tmm1-amqp-0.5.3
- old
+ new
@@ -8,10 +8,12 @@
class << self
@logging = false
attr_accessor :logging
end
+
+ class Error < Exception; end
end
class MQ
include AMQP
include EM::Deferrable
@@ -33,10 +35,11 @@
@body = ''
when Frame::Body
@body << frame.payload
if @body.length >= @header.size
+ @header.properties.update(@method.arguments)
@consumer.receive @header, @body
@body = ''
end
when Frame::Method
@@ -47,16 +50,34 @@
:write => true,
:active => true)
when Protocol::Access::RequestOk
@ticket = method.ticket
+ callback{
+ send Protocol::Channel::Close.new(:reply_code => 200,
+ :reply_text => 'bye',
+ :method_id => 0,
+ :class_id => 0)
+ } if @closing
succeed
when Protocol::Basic::Deliver
+ @method = method
@header = nil
@body = ''
@consumer = queues[ method.consumer_tag ]
+
+
+ when Protocol::Channel::Close
+ raise Error, "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]}"
+
+ when Protocol::Channel::CloseOk
+ @closing = false
+ conn.callback{ |c|
+ c.channels.delete(@channel)
+ c.close unless c.channels.keys.any?
+ }
end
end
end
def send data
@@ -81,16 +102,19 @@
def rpc name, obj = nil
rpcs[name] ||= RPC.new(self, name, obj)
end
- private
-
- def log *args
- return unless MQ.logging
- pp args
- puts
+ def close
+ if @deferred_status == :succeeded
+ send Protocol::Channel::Close.new(:reply_code => 200,
+ :reply_text => 'bye',
+ :method_id => 0,
+ :class_id => 0)
+ else
+ @closing = true
+ end
end
# keep track of proxy objects
def exchanges
@@ -103,12 +127,21 @@
def rpcs
@rcps ||= {}
end
+ private
+
+ def log *args
+ return unless MQ.logging
+ pp args
+ puts
+ end
+
# create a class level connection on demand
def connection
+ raise 'MQ can only be used within EM.run{}' unless EM.reactor_running?
@@connection ||= AMQP.start
end
alias :conn :connection
end
\ No newline at end of file