lib/mq.rb in tmm1-amqp-0.5.5 vs lib/mq.rb in tmm1-amqp-0.5.9

- old
+ new

@@ -9,11 +9,11 @@ class << self @logging = false attr_accessor :logging end - class Error < Exception; end + class Error < StandardError; end end class MQ include AMQP include EM::Deferrable @@ -40,21 +40,22 @@ when Frame::Body @body << frame.payload if @body.length >= @header.size @header.properties.update(@method.arguments) - @consumer.receive @header, @body - @body = '' + @consumer.receive @header, @body if @consumer + @body = @header = @consumer = @method = nil end when Frame::Method case method = frame.payload when Protocol::Channel::OpenOk send Protocol::Access::Request.new(:realm => '/data', :read => true, :write => true, - :active => true) + :active => true, + :passive => true) when Protocol::Access::RequestOk @ticket = method.ticket callback{ send Protocol::Channel::Close.new(:reply_code => 200, @@ -62,35 +63,59 @@ :method_id => 0, :class_id => 0) } if @closing succeed - when Protocol::Basic::Deliver + when Protocol::Basic::CancelOk + if @consumer = consumers[ method.consumer_tag ] + @consumer.cancelled + else + MQ.error "Basic.CancelOk for invalid consumer tag: #{method.consumer_tag}" + end + + when Protocol::Queue::DeclareOk + queues[ method.queue ].recieve_status method + + when Protocol::Basic::Deliver, Protocol::Basic::GetOk @method = method @header = nil @body = '' - @consumer = queues[ method.consumer_tag ] + if method.is_a? Protocol::Basic::GetOk + @consumer = get_queue{|q| q.shift } + MQ.error "No pending Basic.GetOk requests" unless @consumer + else + @consumer = consumers[ method.consumer_tag ] + MQ.error "Basic.Deliver for invalid consumer tag: #{method.consumer_tag}" unless @consumer + end + when Protocol::Basic::GetEmpty + @consumer = get_queue{|q| q.shift } + @consumer.receive nil, nil + when Protocol::Channel::Close - raise Error, "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]}" + raise Error, "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]} on #{@channel}" when Protocol::Channel::CloseOk @closing = false conn.callback{ |c| - c.channels.delete(@channel) - c.close unless c.channels.keys.any? + c.channels.delete @channel + c.close if c.channels.empty? } end end end - def send data - data.ticket = @ticket if @ticket and data.respond_to? :ticket + def send *args conn.callback{ |c| - log :sending, data - c.send data, :channel => @channel + (@_send_mutex ||= Mutex.new).synchronize do + args.each do |data| + data.ticket = @ticket if @ticket and data.respond_to? :ticket= + log :sending, data + c.send data, :channel => @channel + end + end } end %w[ direct topic fanout ].each do |type| class_eval %[ @@ -117,24 +142,48 @@ else @closing = true end end + # error callback + + def self.error msg = nil, &blk + if blk + @error_callback = blk + else + @error_callback.call(msg) if @error_callback and msg + end + end + # keep track of proxy objects def exchanges @exchanges ||= {} end def queues @queues ||= {} end + def get_queue + if block_given? + (@get_queue_mutex ||= Mutex.new).synchronize{ + yield( @get_queue ||= [] ) + } + end + end + def rpcs @rcps ||= {} end + # queue objects keyed on their consumer tags + + def consumers + @consumers ||= {} + end + private def log *args return unless MQ.logging pp args @@ -143,13 +192,14 @@ attr_reader :connection alias :conn :connection end -# convenience wrapper for thread-local MQ object +# convenience wrapper (read: HACK) for thread-local MQ object class MQ def MQ.default + # XXX clear this when connection is closed Thread.current[:mq] ||= MQ.new end def MQ.method_missing meth, *args, &blk MQ.default.__send__(meth, *args, &blk) \ No newline at end of file