lib/ass.rb in hayeah-ASS-0.0.1 vs lib/ass.rb in hayeah-ASS-0.0.2
- old
+ new
@@ -1,139 +1,211 @@
require 'mq'
-class Ass
-
- attr_reader :server_exchange
- def self.declare(server_exchange)
- self.new([server_exchange,{:passive => true}])
- end
-
- # uh... we'll assume that the exchanges are all direct exchanges.
- def initialize(server_exchange)
- @server_exchange = get_exchange(server_exchange)
- end
+# TODO a way to specify serializer (json, marshal...)
+module ASS
- def client(client_exchange,*args)
- @client_exchange ||= get_exchange(client_exchange)
- q = get_queue(@client_exchange,*args)
- Client.new(@server_exchange,@client_exchange,q)
+ # non-destructive get. Fail if server's not started.
+ def self.get(name)
+ ASS::Server.new(name,:passive => true)
end
- def server(*args)
- Server.new(@server_exchange,get_queue(@server_exchange,*args))
+ def self.new(name,opts={})
+ ASS::Server.new(name)
end
- def get_exchange(arg)
- case arg
- when Array
- exchanges = exchange[0]
- opts = exchange[1]
- else
- exchange = arg
- opts = nil
- end
- opts = {} if opts.nil?
- exchange = exchange.is_a?(MQ::Exchange) ? exchange : MQ.direct(exchange,opts)
- raise "accepts only direct exchanges" unless exchange.type == :direct
- exchange
+ def self.peep(server_name,callback=nil,&block)
+ callback = block if callback.nil?
+ ASS::Peeper.new(server_name,callback)
end
-
- # can specify a key to create a queue for that subdomain
- def get_queue(exchange,*args)
- case args[0]
- when Hash
- key = nil
- opts = args[0]
- when String
- key = args[0]
- opts = args[1]
+
+ module Callback
+ module MagicMethods
+ def header
+ @__header__
+ end
+
+ def meta
+ @__meta__
+ end
+
+ def service
+ @__service__
+ end
+
+ def call(method,data=nil,meta=nil,opts={})
+ @__service__.call(method,data,meta,opts)
+ end
end
- opts = {} if opts.nil?
- if key
- name = "#{exchange.name}--#{key}"
- q = MQ.queue(name,opts)
- q.bind(exchange,{ :routing_key => key})
- else
- q = MQ.queue(exchange.name,opts)
- q.bind(exchange,{ :routing_key => exchange.name })
- end
- q
- end
- module Callback
-
- def build_callback_klass(callback)
- case callback
- when Proc
- Class.new &callback
+ # called to initiate a callback
+ def build_callback(callback)
+ c = case callback
+ when Proc
+ Class.new &callback
+ when Class
+ callback
+ when Module
+ Class.new { include callback }
+ when Object
+ callback # use singleton objcet as callback
+ end
+ case c
when Class
- callback
- when Module
- Class.new { include callback }
+ c.instance_eval { include MagicMethods }
+ else
+ c.extend MagicMethods
end
+ c
end
-
- def callback(info,payload)
+
+ # called for each request
+ def prepare_callback(callback,info,payload)
# method,data,meta
- if @callback_klass.respond_to? :version
- klass = @callback_klass.get_version(payload[:version])
+ if callback.is_a? Class
+ if callback.respond_to? :version
+ klass = callback.get_version(payload[:version])
+ else
+ klass = callback
+ end
+ obj = klass.new
else
- klass = @callback_klass
+ obj = callback
end
- obj = klass.new
- service = self
- obj.instance_variable_set("@__service__",service)
+ obj.instance_variable_set("@__service__",self)
obj.instance_variable_set("@__header__",info)
obj.instance_variable_set("@__meta__",payload[:meta])
- class << obj
- def header
- @__header__
- end
+ #p [:call,payload]
+ obj
+ end
+ end
- def meta
- @__meta__
- end
-
- def service
- @__service__
- end
+ class Server
+ include Callback
- def call(method,data=nil,meta=nil,opts={})
- @__service__.call(method,data,meta,opts)
- end
+ def initialize(name,opts={})
+ @server_exchange = MQ.fanout(name,opts)
+ end
+
+ def name
+ self.exchange.name
+ end
+
+ def exchange
+ @server_exchange
+ end
+
+ # takes options available to MQ::Exchange
+ def client(opts={})
+ ASS::Client.new(self,opts)
+ end
+
+ def client_name
+ "#{self.exchange.name}--"
+ end
+
+ # takes options available to MQ::Queue# takes options available to MQ::Queue#subscribe
+ def rpc(opts={})
+ ASS::RPC.new(self,opts)
+ end
+
+ def queue(opts={})
+ unless @queue
+ @queue ||= MQ.queue(self.name,opts)
+ @queue.bind(self.exchange)
end
- #p [:call,payload]
- obj.send(payload[:method],
- payload[:data])
+ self
end
+
+ # takes options available to MQ::Queue# takes options available to MQ::Queue#subscribe
+ def react(callback=nil,opts=nil,&block)
+ if block
+ opts = callback
+ callback = block
+ end
+ opts = {} if opts.nil?
+
+ @callback = build_callback(callback)
+ @ack = opts[:ack]
+ self.queue unless @queue
+ @queue.subscribe(opts) do |info,payload|
+ payload = ::Marshal.load(payload)
+ #p [info,info.reply_to,payload]
+ obj = prepare_callback(@callback,info,payload)
+ data2 = obj.send(payload[:method],payload[:data])
+ payload2 = payload.merge :data => data2
+ # the client MUST exist, otherwise it's an error.
+ ## FIXME it's bad if the server dies b/c
+ ## the client isn't there. It's bad that
+ ## this can cause the server to fail.
+ MQ.direct(info.reply_to,:passive => true).
+ publish(::Marshal.dump(payload2),
+ :routing_key => info.routing_key,
+ :message_id => info.message_id) if info.reply_to
+ info.ack if @ack
+ end
+ self
+ end
+
+ def inspect
+ "#<#{self.class} #{self.name}>"
+ end
end
class Client
include Callback
- def initialize(server_exchange,client_exchange,queue)
- @server_exchange = server_exchange
- @client_exchange = client_exchange
- @queue = queue
+
+ # takes options available to MQ::Exchange
+ def initialize(server,opts={})
+ @server = server
+ # the routing key is also used as the name of the client
+ @key = opts.delete :key
+ @key = @key.to_s if @key
+ @client_exchange = MQ.direct @server.client_name, opts
end
-
+
+ def name
+ self.exchange.name
+ end
+
+ def exchange
+ @client_exchange
+ end
+
+ # takes options available to MQ::Queue
+ def queue(opts={})
+ unless @queue
+ # if key is not given, the queue name is
+ # the same as the exchange name.
+ @queue ||= MQ.queue("#{self.name}#{@key}",opts)
+ @queue.bind(self.exchange,:routing_key => @key || self.name)
+ end
+ self # return self to allow chaining
+ end
+
+ # takes options available to MQ::Queue#subscribe
def react(callback=nil,opts=nil,&block)
if block
opts = callback
callback = block
end
opts = {} if opts.nil?
-
- @callback_klass = build_callback_klass(callback)
+
+ @callback = build_callback(callback)
@ack = opts[:ack]
+ # ensure queue is set
+ self.queue unless @queue
@queue.subscribe(opts) do |info,payload|
payload = ::Marshal.load(payload)
- callback(info,payload)
+ obj = prepare_callback(@callback,info,payload)
+ obj.send(payload[:method],payload[:data])
info.ack if @ack
end
self
end
-
+
+ # note that we can redirect the result to some
+ # place else by setting :key and :reply_to
def call(method,data=nil,meta=nil,opts={})
# opts passed to publish
# if no routing key is given, use receiver's name as the routing key.
version = @klass.version if @klass.respond_to? :version
payload = {
@@ -141,72 +213,201 @@
:data => data,
:meta => meta,
:version => version
}
- # set it up s.t. server would respond to
- # private queue if key is given, otherwise
- # the server would respond to public queue.
- key = opts.delete(:key)
- @server_exchange.publish Marshal.dump(payload), {
- :key => (key ? key : @server_exchange.name),
- :reply_to => @client_exchange.name
+ @server.exchange.publish Marshal.dump(payload), {
+ # opts[:routing_key] will override :key in MQ::Exchange#publish
+ :key => (@key ? @key : self.name),
+ :reply_to => self.name
}.merge(opts)
end
-
+
# for casting, just null the reply_to field, so server doesn't respond.
def cast(method,data=nil,meta=nil,opts={})
self.call(method,data,meta,opts.merge({:reply_to => nil}))
end
-
+ def inspect
+ "#<#{self.class} #{self.name}>"
+ end
end
- class Server
- include Callback
+ # assumes server initializes it with an exclusive and auto_delete queue.
+ # TODO timeout
+ class RPC
+ require 'thread'
- def initialize(server_exchange,q)
- @queue = q
- @server_exchange = server_exchange
+ # i don't want deferrable. I want actual blockage when waiting.
+ ## subscribe prolly run in a different thread.
+ # hmmm. I guess deferrable is a better idea.
+ class Future
+ attr_reader :message_id
+ attr_accessor :header, :data, :meta, :timeout
+ def initialize(rpc,message_id)
+ @message_id = message_id
+ @rpc = rpc
+ @timeout = false
+ @done = false
+ end
+
+ def wait(timeout=nil,&block)
+ # TODO timeout with eventmachine
+ @rpc.wait(self,timeout,&block) # synchronous call that will block
+ # EM.cancel_timer(ticket)
+ end
+
+ def done!
+ @done = true
+ end
+
+ def done?
+ @done
+ end
+
+ def timeout?
+ @timeout
+ end
+
+ def inspect
+ "#<#{self.class} #{message_id}>"
+ end
end
- attr_reader :queue
- def exchange
- @server_exchange
+ class Reactor
+ # want to minimize name conflicts here.
+ def initialize(rpc)
+ @rpc = rpc
+ end
+
+ def method_missing(_method,data)
+ @rpc.buffer << [header,data,meta]
+ end
end
- def react(callback=nil,opts=nil,&block)
- if block
- opts = callback
- callback = block
+ attr_reader :buffer, :futures, :ready
+ def initialize(server,opts={})
+ @server = server
+ @seq = 0
+ # queue is used be used to synchronize RPC
+ # user thread and the AMQP eventmachine thread.
+ @buffer = Queue.new
+ @ready = {} # the ready results not yet waited
+ @futures = {} # all futures not yet waited for.
+ @reactor = Reactor.new(self)
+ # Creates an exclusive queue to serve the RPC client.
+ @client = @server.client(:key => "rpc.#{rand(999_999_999_999)}").
+ queue(:exclusive => true).react(@reactor,opts)
+ end
+
+ def call(method,data,meta=nil,opts={})
+ message_id = @seq.to_s # message gotta be unique for this RPC client.
+ @client.call method, data, meta, opts.merge(:message_id => message_id)
+ @seq += 1
+ @futures[message_id] = Future.new(self,message_id)
+ end
+
+ # the idea is to block on a synchronized queue
+ # until we get the future we want.
+ #
+ # WARNING: blocks forever if the thread
+ # calling wait is the same as the EventMachine
+ # thread.
+ def wait(future,timeout=nil)
+ return future.data if future.done? # future was waited before
+ timer = nil
+ if timeout
+ timer = EM.add_timer(timeout) {
+ @buffer << [:timeout,future.message_id,nil]
+ }
end
- opts = {} if opts.nil?
-
- @callback_klass = build_callback_klass(callback)
- @ack = opts[:ack]
- @queue.subscribe(opts) do |info,payload|
- payload = ::Marshal.load(payload)
- #p [info,info.reply_to,payload]
- data2 = callback(info,payload)
- payload2 = payload.merge :data => data2
- if info.routing_key == @server_exchange.name
- # addressed to the server's public
- # queue, respond to the routing_key of
- # the client's public queue.
- key = info.reply_to
- else
- # addressed to the private queue
- key = info.routing_key
+ ready_future = nil
+ if @ready.has_key? future.message_id
+ @ready.delete future.message_id
+ ready_future = future
+ else
+ while true
+ header,data,meta = data = @buffer.pop # synchronize. like erlang's mailbox select.
+ if header == :timeout # timeout the future we are waiting for.
+ message_id = data
+ # if we got a timeout from previous wait. throw it away.
+ next if future.message_id != message_id
+ future.timeout = true
+ future.done!
+ @futures.delete future.message_id
+ return yield # return the value of timeout block
+ end
+ some_future = @futures[header.message_id]
+ # If we didn't find the future among the
+ # future, it must have timedout. Just
+ # throw result away and keep processing.
+ next unless some_future
+ some_future.header = header
+ some_future.data = data
+ some_future.meta = meta
+ if some_future == future
+ # The future we are waiting for
+ EM.cancel_timer(timer)
+ ready_future = future
+ break
+ else
+ # Ready, but we are not waiting for it. Save for later.
+ @ready[some_future.message_id] = some_future
+ end
end
- MQ.direct(info.reply_to).publish(::Marshal.dump(payload2),:routing_key => key) if info.reply_to
- info.ack if @ack
end
- self
+ ready_future.done!
+ @futures.delete ready_future.message_id
+ return ready_future.data
end
+
+ def waitall
+ @futures.values.map { |k,v|
+ wait(v)
+ }
+ end
+
+ def inspect
+ "#<#{self.class} #{self.name}>"
+ end
end
+ # TODO should prolly have the option of using
+ # non auto-delete queues. This would be useful
+ # for logger. Maybe if a peeper name is given,
+ # then create queues with options.
class Peeper
- def initialize(exchange,callback)
- # create a temporary queue that binds to an exchange
+ include Callback
+ attr_reader :server_name
+ def initialize(server_name,callback)
+ @server_name = server_name
+ @clients = {}
+ @callback = build_callback(callback)
+
+ uid = "#{@server_name}.peeper.#{rand 999_999_999_999}"
+ q = MQ.queue uid, :auto_delete => true
+ q.bind(@server_name) # messages to the server would be duplicated here.
+ q.subscribe { |info,payload|
+ payload = ::Marshal.load(payload)
+ # sets context, but doesn't make the call
+ obj = prepare_callback(@callback,info,payload)
+ # there is a specific method we want to call.
+ obj.server(payload[:method],payload[:data])
+
+ # bind to peep client message queue if we've not seen it before.
+ unless @clients.has_key? info.routing_key
+ @clients[info.routing_key] = true
+ client_q = MQ.queue "#{uid}--#{info.routing_key}",
+ :auto_delete => true
+ # messages to the client would be duplicated here.
+ client_q.bind("#{server_name}--", :routing_key => info.routing_key)
+ client_q.subscribe { |info,payload|
+ payload = ::Marshal.load(payload)
+ obj = prepare_callback(@callback,info,payload)
+ obj.client(payload[:method],payload[:data])
+ }
+ end
+ }
end
end
+
end