lib/mq.rb in amqp-0.5.5 vs lib/mq.rb in amqp-0.5.9
- old
+ new
@@ -41,11 +41,11 @@
when Frame::Body
@body << frame.payload
if @body.length >= @header.size
@header.properties.update(@method.arguments)
@consumer.receive @header, @body
- @body = ''
+ @body = @header = @consumer = @method = nil
end
when Frame::Method
case method = frame.payload
when Protocol::Channel::OpenOk
@@ -68,29 +68,32 @@
@method = method
@header = nil
@body = ''
@consumer = queues[ method.consumer_tag ]
-
when Protocol::Channel::Close
- raise Error, "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]}"
+ raise Error, "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]} on #{@channel}"
when Protocol::Channel::CloseOk
@closing = false
conn.callback{ |c|
- c.channels.delete(@channel)
- c.close unless c.channels.keys.any?
+ c.channels.delete @channel
+ c.close if c.channels.empty?
}
end
end
end
- def send data
- data.ticket = @ticket if @ticket and data.respond_to? :ticket
+ def send *args
conn.callback{ |c|
- log :sending, data
- c.send data, :channel => @channel
+ (@_send_mutex ||= Mutex.new).synchronize do
+ args.each do |data|
+ data.ticket = @ticket if @ticket and data.respond_to? :ticket=
+ log :sending, data
+ c.send data, :channel => @channel
+ end
+ end
}
end
%w[ direct topic fanout ].each do |type|
class_eval %[
@@ -143,13 +146,14 @@
attr_reader :connection
alias :conn :connection
end
-# convenience wrapper for thread-local MQ object
+# convenience wrapper (read: HACK) for thread-local MQ object
class MQ
def MQ.default
+ # XXX clear this when connection is closed
Thread.current[:mq] ||= MQ.new
end
def MQ.method_missing meth, *args, &blk
MQ.default.__send__(meth, *args, &blk)
\ No newline at end of file