lib/mq.rb in amqp-0.5.5 vs lib/mq.rb in amqp-0.5.9

- old
+ new

@@ -41,11 +41,11 @@ when Frame::Body @body << frame.payload if @body.length >= @header.size @header.properties.update(@method.arguments) @consumer.receive @header, @body - @body = '' + @body = @header = @consumer = @method = nil end when Frame::Method case method = frame.payload when Protocol::Channel::OpenOk @@ -68,29 +68,32 @@ @method = method @header = nil @body = '' @consumer = queues[ method.consumer_tag ] - when Protocol::Channel::Close - raise Error, "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]}" + raise Error, "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]} on #{@channel}" when Protocol::Channel::CloseOk @closing = false conn.callback{ |c| - c.channels.delete(@channel) - c.close unless c.channels.keys.any? + c.channels.delete @channel + c.close if c.channels.empty? } end end end - def send data - data.ticket = @ticket if @ticket and data.respond_to? :ticket + def send *args conn.callback{ |c| - log :sending, data - c.send data, :channel => @channel + (@_send_mutex ||= Mutex.new).synchronize do + args.each do |data| + data.ticket = @ticket if @ticket and data.respond_to? :ticket= + log :sending, data + c.send data, :channel => @channel + end + end } end %w[ direct topic fanout ].each do |type| class_eval %[ @@ -143,13 +146,14 @@ attr_reader :connection alias :conn :connection end -# convenience wrapper for thread-local MQ object +# convenience wrapper (read: HACK) for thread-local MQ object class MQ def MQ.default + # XXX clear this when connection is closed Thread.current[:mq] ||= MQ.new end def MQ.method_missing meth, *args, &blk MQ.default.__send__(meth, *args, &blk) \ No newline at end of file