lib/mq.rb in amqp-0.5.0 vs lib/mq.rb in amqp-0.5.1

- old
+ new

@@ -1,152 +1,34 @@ $:.unshift File.expand_path(File.dirname(File.expand_path(__FILE__))) require 'amqp' -unless defined?(BlankSlate) - class BlankSlate < BasicObject; end if defined?(BasicObject) +class MQ + %w[ exchange queue rpc ].each do |file| + require "mq/#{file}" + end - class BlankSlate - instance_methods.each { |m| undef_method m unless m =~ /^__/ } + class << self + @logging = false + attr_accessor :logging end end class MQ include AMQP include EM::Deferrable - class Exchange - include AMQP - - def initialize mq, type, name, opts = {} - if name.is_a? Hash - opts = name - name = "amq.#{type}" - end - - @mq = mq - @type, @name = type, name - @key = opts[:key] - - @mq.callback{ - @mq.send Protocol::Exchange::Declare.new({ :exchange => name, - :type => type, - :nowait => true }.merge(opts)) - } unless name == "amq.#{type}" - end - attr_reader :name, :type, :key - - def publish data, opts = {} - @mq.callback{ - @mq.send Protocol::Basic::Publish.new({ :exchange => name, - :routing_key => opts.delete(:key) || @key }.merge(opts)) - - data = data.to_s - - @mq.send Protocol::Header.new(Protocol::Basic, - data.length, { :content_type => 'application/octet-stream', - :delivery_mode => 1, - :priority => 0 }.merge(opts)) - @mq.send Frame::Body.new(data) - } - self - end - end - - class Queue - include AMQP - - def initialize mq, name, opts = {} - @mq = mq - @name = name - @mq.callback{ - @mq.send Protocol::Queue::Declare.new({ :queue => name, - :nowait => true }.merge(opts)) - } - bind(@mq.direct, :key => name) - end - attr_reader :name - - def bind exchange, opts = {} - @mq.callback{ - @mq.send Protocol::Queue::Bind.new({ :queue => name, - :exchange => exchange.respond_to?(:name) ? exchange.name : exchange, - :routing_key => opts.delete(:key), - :nowait => true }.merge(opts)) - } - self - end - - def subscribe opts = {}, &blk - @on_msg = blk - @mq.callback{ - @mq.send Protocol::Basic::Consume.new({ :queue => name, - :consumer_tag => name, - :no_ack => true, - :nowait => true }.merge(opts)) - } - self - end - - def receive headers, body - if @on_msg - @on_msg.call *(@on_msg.arity == 1 ? [body] : [headers, body]) - end - end - end - - class RPC < BlankSlate - def initialize mq, queue, obj = nil - @mq = mq - - if obj - @obj = case obj - when Class - obj.new - when Module - (::Class.new do include(obj) end).new - else - obj - end - - @mq.queue(queue).subscribe{ |info, request| - method, *args = Marshal.load(request) - ret = @obj.__send__(method, *args) - - if info.reply_to - @mq.direct.publish(Marshal.dump(ret), :key => info.reply_to, :message_id => info.message_id) - end - } - else - @callbacks ||= {} - @queue = @mq.queue(@name = 'some random identifier for me').subscribe{|info, msg| - ret = Marshal.load(msg) - if blk = @callbacks.delete(info.message_id) - blk.call(ret) - end - } - @exchange = @mq.direct(:key => queue) - end - end - - def method_missing meth, *args, &blk - message_id = "random message id #{rand(999_999_999_999)}" - @callbacks[message_id] = blk if blk - @exchange.publish(Marshal.dump([meth, *args]), :reply_to => blk ? @name : nil, :message_id => message_id) - end - end -end - -class MQ def initialize conn.callback{ |c| @channel = c.add_channel(self) send Protocol::Channel::Open.new } end attr_reader :channel def process_frame frame + log :received, frame + case frame when Frame::Header @header = frame.payload @body = '' @@ -178,10 +60,11 @@ end def send data data.ticket = @ticket if @ticket and data.respond_to? :ticket conn.callback{ |c| + log :sending, data c.send data, :channel => @channel } end %w[ direct topic fanout ].each do |type| @@ -200,10 +83,18 @@ rpcs[name] ||= RPC.new(self, name, obj) end private + def log *args + return unless MQ.logging + pp args + puts + end + + # keep track of proxy objects + def exchanges @exchanges ||= {} end def queues @@ -212,18 +103,31 @@ def rpcs @rcps ||= {} end + # create a class level connection on demand + def connection @@connection ||= AMQP.start end alias :conn :connection +end +# convenience wrapper for thread-local MQ object + +class MQ + def MQ.default + Thread.current[:mq] ||= MQ.new + end + def MQ.method_missing meth, *args, &blk MQ.default.__send__(meth, *args, &blk) end - - def MQ.default - Thread.current[:mq] ||= MQ.new +end + +# unique identifier +class MQ + def MQ.id + Thread.current[:mq_id] ||= "#{`hostname`.strip}-#{Process.pid}-#{Thread.current.object_id}" end end \ No newline at end of file