lib/mq.rb in amqp-0.5.0 vs lib/mq.rb in amqp-0.5.1
- old
+ new
@@ -1,152 +1,34 @@
$:.unshift File.expand_path(File.dirname(File.expand_path(__FILE__)))
require 'amqp'
-unless defined?(BlankSlate)
- class BlankSlate < BasicObject; end if defined?(BasicObject)
+class MQ
+ %w[ exchange queue rpc ].each do |file|
+ require "mq/#{file}"
+ end
- class BlankSlate
- instance_methods.each { |m| undef_method m unless m =~ /^__/ }
+ class << self
+ @logging = false
+ attr_accessor :logging
end
end
class MQ
include AMQP
include EM::Deferrable
- class Exchange
- include AMQP
-
- def initialize mq, type, name, opts = {}
- if name.is_a? Hash
- opts = name
- name = "amq.#{type}"
- end
-
- @mq = mq
- @type, @name = type, name
- @key = opts[:key]
-
- @mq.callback{
- @mq.send Protocol::Exchange::Declare.new({ :exchange => name,
- :type => type,
- :nowait => true }.merge(opts))
- } unless name == "amq.#{type}"
- end
- attr_reader :name, :type, :key
-
- def publish data, opts = {}
- @mq.callback{
- @mq.send Protocol::Basic::Publish.new({ :exchange => name,
- :routing_key => opts.delete(:key) || @key }.merge(opts))
-
- data = data.to_s
-
- @mq.send Protocol::Header.new(Protocol::Basic,
- data.length, { :content_type => 'application/octet-stream',
- :delivery_mode => 1,
- :priority => 0 }.merge(opts))
- @mq.send Frame::Body.new(data)
- }
- self
- end
- end
-
- class Queue
- include AMQP
-
- def initialize mq, name, opts = {}
- @mq = mq
- @name = name
- @mq.callback{
- @mq.send Protocol::Queue::Declare.new({ :queue => name,
- :nowait => true }.merge(opts))
- }
- bind(@mq.direct, :key => name)
- end
- attr_reader :name
-
- def bind exchange, opts = {}
- @mq.callback{
- @mq.send Protocol::Queue::Bind.new({ :queue => name,
- :exchange => exchange.respond_to?(:name) ? exchange.name : exchange,
- :routing_key => opts.delete(:key),
- :nowait => true }.merge(opts))
- }
- self
- end
-
- def subscribe opts = {}, &blk
- @on_msg = blk
- @mq.callback{
- @mq.send Protocol::Basic::Consume.new({ :queue => name,
- :consumer_tag => name,
- :no_ack => true,
- :nowait => true }.merge(opts))
- }
- self
- end
-
- def receive headers, body
- if @on_msg
- @on_msg.call *(@on_msg.arity == 1 ? [body] : [headers, body])
- end
- end
- end
-
- class RPC < BlankSlate
- def initialize mq, queue, obj = nil
- @mq = mq
-
- if obj
- @obj = case obj
- when Class
- obj.new
- when Module
- (::Class.new do include(obj) end).new
- else
- obj
- end
-
- @mq.queue(queue).subscribe{ |info, request|
- method, *args = Marshal.load(request)
- ret = @obj.__send__(method, *args)
-
- if info.reply_to
- @mq.direct.publish(Marshal.dump(ret), :key => info.reply_to, :message_id => info.message_id)
- end
- }
- else
- @callbacks ||= {}
- @queue = @mq.queue(@name = 'some random identifier for me').subscribe{|info, msg|
- ret = Marshal.load(msg)
- if blk = @callbacks.delete(info.message_id)
- blk.call(ret)
- end
- }
- @exchange = @mq.direct(:key => queue)
- end
- end
-
- def method_missing meth, *args, &blk
- message_id = "random message id #{rand(999_999_999_999)}"
- @callbacks[message_id] = blk if blk
- @exchange.publish(Marshal.dump([meth, *args]), :reply_to => blk ? @name : nil, :message_id => message_id)
- end
- end
-end
-
-class MQ
def initialize
conn.callback{ |c|
@channel = c.add_channel(self)
send Protocol::Channel::Open.new
}
end
attr_reader :channel
def process_frame frame
+ log :received, frame
+
case frame
when Frame::Header
@header = frame.payload
@body = ''
@@ -178,10 +60,11 @@
end
def send data
data.ticket = @ticket if @ticket and data.respond_to? :ticket
conn.callback{ |c|
+ log :sending, data
c.send data, :channel => @channel
}
end
%w[ direct topic fanout ].each do |type|
@@ -200,10 +83,18 @@
rpcs[name] ||= RPC.new(self, name, obj)
end
private
+ def log *args
+ return unless MQ.logging
+ pp args
+ puts
+ end
+
+ # keep track of proxy objects
+
def exchanges
@exchanges ||= {}
end
def queues
@@ -212,18 +103,31 @@
def rpcs
@rcps ||= {}
end
+ # create a class level connection on demand
+
def connection
@@connection ||= AMQP.start
end
alias :conn :connection
+end
+# convenience wrapper for thread-local MQ object
+
+class MQ
+ def MQ.default
+ Thread.current[:mq] ||= MQ.new
+ end
+
def MQ.method_missing meth, *args, &blk
MQ.default.__send__(meth, *args, &blk)
end
-
- def MQ.default
- Thread.current[:mq] ||= MQ.new
+end
+
+# unique identifier
+class MQ
+ def MQ.id
+ Thread.current[:mq_id] ||= "#{`hostname`.strip}-#{Process.pid}-#{Thread.current.object_id}"
end
end
\ No newline at end of file