lib/mq.rb in tmm1-amqp-0.5.2 vs lib/mq.rb in tmm1-amqp-0.5.3

- old
+ new

@@ -8,10 +8,12 @@ class << self @logging = false attr_accessor :logging end + + class Error < Exception; end end class MQ include AMQP include EM::Deferrable @@ -33,10 +35,11 @@ @body = '' when Frame::Body @body << frame.payload if @body.length >= @header.size + @header.properties.update(@method.arguments) @consumer.receive @header, @body @body = '' end when Frame::Method @@ -47,16 +50,34 @@ :write => true, :active => true) when Protocol::Access::RequestOk @ticket = method.ticket + callback{ + send Protocol::Channel::Close.new(:reply_code => 200, + :reply_text => 'bye', + :method_id => 0, + :class_id => 0) + } if @closing succeed when Protocol::Basic::Deliver + @method = method @header = nil @body = '' @consumer = queues[ method.consumer_tag ] + + + when Protocol::Channel::Close + raise Error, "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]}" + + when Protocol::Channel::CloseOk + @closing = false + conn.callback{ |c| + c.channels.delete(@channel) + c.close unless c.channels.keys.any? + } end end end def send data @@ -81,16 +102,19 @@ def rpc name, obj = nil rpcs[name] ||= RPC.new(self, name, obj) end - private - - def log *args - return unless MQ.logging - pp args - puts + def close + if @deferred_status == :succeeded + send Protocol::Channel::Close.new(:reply_code => 200, + :reply_text => 'bye', + :method_id => 0, + :class_id => 0) + else + @closing = true + end end # keep track of proxy objects def exchanges @@ -103,12 +127,21 @@ def rpcs @rcps ||= {} end + private + + def log *args + return unless MQ.logging + pp args + puts + end + # create a class level connection on demand def connection + raise 'MQ can only be used within EM.run{}' unless EM.reactor_running? @@connection ||= AMQP.start end alias :conn :connection end \ No newline at end of file