lib/ass.rb in hayeah-ASS-0.0.1 vs lib/ass.rb in hayeah-ASS-0.0.2

- old
+ new

@@ -1,139 +1,211 @@ require 'mq' -class Ass - - attr_reader :server_exchange - def self.declare(server_exchange) - self.new([server_exchange,{:passive => true}]) - end - - # uh... we'll assume that the exchanges are all direct exchanges. - def initialize(server_exchange) - @server_exchange = get_exchange(server_exchange) - end +# TODO a way to specify serializer (json, marshal...) +module ASS - def client(client_exchange,*args) - @client_exchange ||= get_exchange(client_exchange) - q = get_queue(@client_exchange,*args) - Client.new(@server_exchange,@client_exchange,q) + # non-destructive get. Fail if server's not started. + def self.get(name) + ASS::Server.new(name,:passive => true) end - def server(*args) - Server.new(@server_exchange,get_queue(@server_exchange,*args)) + def self.new(name,opts={}) + ASS::Server.new(name) end - def get_exchange(arg) - case arg - when Array - exchanges = exchange[0] - opts = exchange[1] - else - exchange = arg - opts = nil - end - opts = {} if opts.nil? - exchange = exchange.is_a?(MQ::Exchange) ? exchange : MQ.direct(exchange,opts) - raise "accepts only direct exchanges" unless exchange.type == :direct - exchange + def self.peep(server_name,callback=nil,&block) + callback = block if callback.nil? + ASS::Peeper.new(server_name,callback) end - - # can specify a key to create a queue for that subdomain - def get_queue(exchange,*args) - case args[0] - when Hash - key = nil - opts = args[0] - when String - key = args[0] - opts = args[1] + + module Callback + module MagicMethods + def header + @__header__ + end + + def meta + @__meta__ + end + + def service + @__service__ + end + + def call(method,data=nil,meta=nil,opts={}) + @__service__.call(method,data,meta,opts) + end end - opts = {} if opts.nil? - if key - name = "#{exchange.name}--#{key}" - q = MQ.queue(name,opts) - q.bind(exchange,{ :routing_key => key}) - else - q = MQ.queue(exchange.name,opts) - q.bind(exchange,{ :routing_key => exchange.name }) - end - q - end - module Callback - - def build_callback_klass(callback) - case callback - when Proc - Class.new &callback + # called to initiate a callback + def build_callback(callback) + c = case callback + when Proc + Class.new &callback + when Class + callback + when Module + Class.new { include callback } + when Object + callback # use singleton objcet as callback + end + case c when Class - callback - when Module - Class.new { include callback } + c.instance_eval { include MagicMethods } + else + c.extend MagicMethods end + c end - - def callback(info,payload) + + # called for each request + def prepare_callback(callback,info,payload) # method,data,meta - if @callback_klass.respond_to? :version - klass = @callback_klass.get_version(payload[:version]) + if callback.is_a? Class + if callback.respond_to? :version + klass = callback.get_version(payload[:version]) + else + klass = callback + end + obj = klass.new else - klass = @callback_klass + obj = callback end - obj = klass.new - service = self - obj.instance_variable_set("@__service__",service) + obj.instance_variable_set("@__service__",self) obj.instance_variable_set("@__header__",info) obj.instance_variable_set("@__meta__",payload[:meta]) - class << obj - def header - @__header__ - end + #p [:call,payload] + obj + end + end - def meta - @__meta__ - end - - def service - @__service__ - end + class Server + include Callback - def call(method,data=nil,meta=nil,opts={}) - @__service__.call(method,data,meta,opts) - end + def initialize(name,opts={}) + @server_exchange = MQ.fanout(name,opts) + end + + def name + self.exchange.name + end + + def exchange + @server_exchange + end + + # takes options available to MQ::Exchange + def client(opts={}) + ASS::Client.new(self,opts) + end + + def client_name + "#{self.exchange.name}--" + end + + # takes options available to MQ::Queue# takes options available to MQ::Queue#subscribe + def rpc(opts={}) + ASS::RPC.new(self,opts) + end + + def queue(opts={}) + unless @queue + @queue ||= MQ.queue(self.name,opts) + @queue.bind(self.exchange) end - #p [:call,payload] - obj.send(payload[:method], - payload[:data]) + self end + + # takes options available to MQ::Queue# takes options available to MQ::Queue#subscribe + def react(callback=nil,opts=nil,&block) + if block + opts = callback + callback = block + end + opts = {} if opts.nil? + + @callback = build_callback(callback) + @ack = opts[:ack] + self.queue unless @queue + @queue.subscribe(opts) do |info,payload| + payload = ::Marshal.load(payload) + #p [info,info.reply_to,payload] + obj = prepare_callback(@callback,info,payload) + data2 = obj.send(payload[:method],payload[:data]) + payload2 = payload.merge :data => data2 + # the client MUST exist, otherwise it's an error. + ## FIXME it's bad if the server dies b/c + ## the client isn't there. It's bad that + ## this can cause the server to fail. + MQ.direct(info.reply_to,:passive => true). + publish(::Marshal.dump(payload2), + :routing_key => info.routing_key, + :message_id => info.message_id) if info.reply_to + info.ack if @ack + end + self + end + + def inspect + "#<#{self.class} #{self.name}>" + end end class Client include Callback - def initialize(server_exchange,client_exchange,queue) - @server_exchange = server_exchange - @client_exchange = client_exchange - @queue = queue + + # takes options available to MQ::Exchange + def initialize(server,opts={}) + @server = server + # the routing key is also used as the name of the client + @key = opts.delete :key + @key = @key.to_s if @key + @client_exchange = MQ.direct @server.client_name, opts end - + + def name + self.exchange.name + end + + def exchange + @client_exchange + end + + # takes options available to MQ::Queue + def queue(opts={}) + unless @queue + # if key is not given, the queue name is + # the same as the exchange name. + @queue ||= MQ.queue("#{self.name}#{@key}",opts) + @queue.bind(self.exchange,:routing_key => @key || self.name) + end + self # return self to allow chaining + end + + # takes options available to MQ::Queue#subscribe def react(callback=nil,opts=nil,&block) if block opts = callback callback = block end opts = {} if opts.nil? - - @callback_klass = build_callback_klass(callback) + + @callback = build_callback(callback) @ack = opts[:ack] + # ensure queue is set + self.queue unless @queue @queue.subscribe(opts) do |info,payload| payload = ::Marshal.load(payload) - callback(info,payload) + obj = prepare_callback(@callback,info,payload) + obj.send(payload[:method],payload[:data]) info.ack if @ack end self end - + + # note that we can redirect the result to some + # place else by setting :key and :reply_to def call(method,data=nil,meta=nil,opts={}) # opts passed to publish # if no routing key is given, use receiver's name as the routing key. version = @klass.version if @klass.respond_to? :version payload = { @@ -141,72 +213,201 @@ :data => data, :meta => meta, :version => version } - # set it up s.t. server would respond to - # private queue if key is given, otherwise - # the server would respond to public queue. - key = opts.delete(:key) - @server_exchange.publish Marshal.dump(payload), { - :key => (key ? key : @server_exchange.name), - :reply_to => @client_exchange.name + @server.exchange.publish Marshal.dump(payload), { + # opts[:routing_key] will override :key in MQ::Exchange#publish + :key => (@key ? @key : self.name), + :reply_to => self.name }.merge(opts) end - + # for casting, just null the reply_to field, so server doesn't respond. def cast(method,data=nil,meta=nil,opts={}) self.call(method,data,meta,opts.merge({:reply_to => nil})) end - + def inspect + "#<#{self.class} #{self.name}>" + end end - class Server - include Callback + # assumes server initializes it with an exclusive and auto_delete queue. + # TODO timeout + class RPC + require 'thread' - def initialize(server_exchange,q) - @queue = q - @server_exchange = server_exchange + # i don't want deferrable. I want actual blockage when waiting. + ## subscribe prolly run in a different thread. + # hmmm. I guess deferrable is a better idea. + class Future + attr_reader :message_id + attr_accessor :header, :data, :meta, :timeout + def initialize(rpc,message_id) + @message_id = message_id + @rpc = rpc + @timeout = false + @done = false + end + + def wait(timeout=nil,&block) + # TODO timeout with eventmachine + @rpc.wait(self,timeout,&block) # synchronous call that will block + # EM.cancel_timer(ticket) + end + + def done! + @done = true + end + + def done? + @done + end + + def timeout? + @timeout + end + + def inspect + "#<#{self.class} #{message_id}>" + end end - attr_reader :queue - def exchange - @server_exchange + class Reactor + # want to minimize name conflicts here. + def initialize(rpc) + @rpc = rpc + end + + def method_missing(_method,data) + @rpc.buffer << [header,data,meta] + end end - def react(callback=nil,opts=nil,&block) - if block - opts = callback - callback = block + attr_reader :buffer, :futures, :ready + def initialize(server,opts={}) + @server = server + @seq = 0 + # queue is used be used to synchronize RPC + # user thread and the AMQP eventmachine thread. + @buffer = Queue.new + @ready = {} # the ready results not yet waited + @futures = {} # all futures not yet waited for. + @reactor = Reactor.new(self) + # Creates an exclusive queue to serve the RPC client. + @client = @server.client(:key => "rpc.#{rand(999_999_999_999)}"). + queue(:exclusive => true).react(@reactor,opts) + end + + def call(method,data,meta=nil,opts={}) + message_id = @seq.to_s # message gotta be unique for this RPC client. + @client.call method, data, meta, opts.merge(:message_id => message_id) + @seq += 1 + @futures[message_id] = Future.new(self,message_id) + end + + # the idea is to block on a synchronized queue + # until we get the future we want. + # + # WARNING: blocks forever if the thread + # calling wait is the same as the EventMachine + # thread. + def wait(future,timeout=nil) + return future.data if future.done? # future was waited before + timer = nil + if timeout + timer = EM.add_timer(timeout) { + @buffer << [:timeout,future.message_id,nil] + } end - opts = {} if opts.nil? - - @callback_klass = build_callback_klass(callback) - @ack = opts[:ack] - @queue.subscribe(opts) do |info,payload| - payload = ::Marshal.load(payload) - #p [info,info.reply_to,payload] - data2 = callback(info,payload) - payload2 = payload.merge :data => data2 - if info.routing_key == @server_exchange.name - # addressed to the server's public - # queue, respond to the routing_key of - # the client's public queue. - key = info.reply_to - else - # addressed to the private queue - key = info.routing_key + ready_future = nil + if @ready.has_key? future.message_id + @ready.delete future.message_id + ready_future = future + else + while true + header,data,meta = data = @buffer.pop # synchronize. like erlang's mailbox select. + if header == :timeout # timeout the future we are waiting for. + message_id = data + # if we got a timeout from previous wait. throw it away. + next if future.message_id != message_id + future.timeout = true + future.done! + @futures.delete future.message_id + return yield # return the value of timeout block + end + some_future = @futures[header.message_id] + # If we didn't find the future among the + # future, it must have timedout. Just + # throw result away and keep processing. + next unless some_future + some_future.header = header + some_future.data = data + some_future.meta = meta + if some_future == future + # The future we are waiting for + EM.cancel_timer(timer) + ready_future = future + break + else + # Ready, but we are not waiting for it. Save for later. + @ready[some_future.message_id] = some_future + end end - MQ.direct(info.reply_to).publish(::Marshal.dump(payload2),:routing_key => key) if info.reply_to - info.ack if @ack end - self + ready_future.done! + @futures.delete ready_future.message_id + return ready_future.data end + + def waitall + @futures.values.map { |k,v| + wait(v) + } + end + + def inspect + "#<#{self.class} #{self.name}>" + end end + # TODO should prolly have the option of using + # non auto-delete queues. This would be useful + # for logger. Maybe if a peeper name is given, + # then create queues with options. class Peeper - def initialize(exchange,callback) - # create a temporary queue that binds to an exchange + include Callback + attr_reader :server_name + def initialize(server_name,callback) + @server_name = server_name + @clients = {} + @callback = build_callback(callback) + + uid = "#{@server_name}.peeper.#{rand 999_999_999_999}" + q = MQ.queue uid, :auto_delete => true + q.bind(@server_name) # messages to the server would be duplicated here. + q.subscribe { |info,payload| + payload = ::Marshal.load(payload) + # sets context, but doesn't make the call + obj = prepare_callback(@callback,info,payload) + # there is a specific method we want to call. + obj.server(payload[:method],payload[:data]) + + # bind to peep client message queue if we've not seen it before. + unless @clients.has_key? info.routing_key + @clients[info.routing_key] = true + client_q = MQ.queue "#{uid}--#{info.routing_key}", + :auto_delete => true + # messages to the client would be duplicated here. + client_q.bind("#{server_name}--", :routing_key => info.routing_key) + client_q.subscribe { |info,payload| + payload = ::Marshal.load(payload) + obj = prepare_callback(@callback,info,payload) + obj.client(payload[:method],payload[:data]) + } + end + } end end + end