lib/ass.rb in hayeah-ASS-0.0.2 vs lib/ass.rb in hayeah-ASS-0.1.0

- old
+ new

@@ -1,211 +1,139 @@ require 'mq' +class Ass + + attr_reader :server_exchange -# TODO a way to specify serializer (json, marshal...) -module ASS + def self.declare(server_exchange) + self.new([server_exchange,{:passive => true}]) + end + + # uh... we'll assume that the exchanges are all direct exchanges. + def initialize(server_exchange) + @server_exchange = get_exchange(server_exchange) + end - # non-destructive get. Fail if server's not started. - def self.get(name) - ASS::Server.new(name,:passive => true) + def client(client_exchange,*args) + @client_exchange ||= get_exchange(client_exchange) + q = get_queue(@client_exchange,*args) + Client.new(@server_exchange,@client_exchange,q) end - def self.new(name,opts={}) - ASS::Server.new(name) + def server(*args) + Server.new(@server_exchange,get_queue(@server_exchange,*args)) end - def self.peep(server_name,callback=nil,&block) - callback = block if callback.nil? - ASS::Peeper.new(server_name,callback) + def get_exchange(arg) + case arg + when Array + exchanges = exchange[0] + opts = exchange[1] + else + exchange = arg + opts = nil + end + opts = {} if opts.nil? + exchange = exchange.is_a?(MQ::Exchange) ? exchange : MQ.direct(exchange,opts) + raise "accepts only direct exchanges" unless exchange.type == :direct + exchange end - - module Callback - module MagicMethods - def header - @__header__ - end - - def meta - @__meta__ - end - - def service - @__service__ - end - - def call(method,data=nil,meta=nil,opts={}) - @__service__.call(method,data,meta,opts) - end + + # can specify a key to create a queue for that subdomain + def get_queue(exchange,*args) + case args[0] + when Hash + key = nil + opts = args[0] + when String + key = args[0] + opts = args[1] end + opts = {} if opts.nil? + if key + name = "#{exchange.name}--#{key}" + q = MQ.queue(name,opts) + q.bind(exchange,{ :routing_key => key}) + else + q = MQ.queue(exchange.name,opts) + q.bind(exchange,{ :routing_key => exchange.name }) + end + q + end - # called to initiate a callback - def build_callback(callback) - c = case callback - when Proc - Class.new &callback - when Class - callback - when Module - Class.new { include callback } - when Object - callback # use singleton objcet as callback - end - case c + module Callback + + def build_callback_klass(callback) + case callback + when Proc + Class.new &callback when Class - c.instance_eval { include MagicMethods } - else - c.extend MagicMethods + callback + when Module + Class.new { include callback } end - c end - - # called for each request - def prepare_callback(callback,info,payload) + + def callback(info,payload) # method,data,meta - if callback.is_a? Class - if callback.respond_to? :version - klass = callback.get_version(payload[:version]) - else - klass = callback - end - obj = klass.new + if @callback_klass.respond_to? :version + klass = @callback_klass.get_version(payload[:version]) else - obj = callback + klass = @callback_klass end - obj.instance_variable_set("@__service__",self) + obj = klass.new + service = self + obj.instance_variable_set("@__service__",service) obj.instance_variable_set("@__header__",info) obj.instance_variable_set("@__meta__",payload[:meta]) - #p [:call,payload] - obj - end - end + class << obj + def header + @__header__ + end - class Server - include Callback + def meta + @__meta__ + end + + def service + @__service__ + end - def initialize(name,opts={}) - @server_exchange = MQ.fanout(name,opts) - end - - def name - self.exchange.name - end - - def exchange - @server_exchange - end - - # takes options available to MQ::Exchange - def client(opts={}) - ASS::Client.new(self,opts) - end - - def client_name - "#{self.exchange.name}--" - end - - # takes options available to MQ::Queue# takes options available to MQ::Queue#subscribe - def rpc(opts={}) - ASS::RPC.new(self,opts) - end - - def queue(opts={}) - unless @queue - @queue ||= MQ.queue(self.name,opts) - @queue.bind(self.exchange) + def call(method,data=nil,meta=nil,opts={}) + @__service__.call(method,data,meta,opts) + end end - self + #p [:call,payload] + obj.send(payload[:method], + payload[:data]) end - - # takes options available to MQ::Queue# takes options available to MQ::Queue#subscribe - def react(callback=nil,opts=nil,&block) - if block - opts = callback - callback = block - end - opts = {} if opts.nil? - - @callback = build_callback(callback) - @ack = opts[:ack] - self.queue unless @queue - @queue.subscribe(opts) do |info,payload| - payload = ::Marshal.load(payload) - #p [info,info.reply_to,payload] - obj = prepare_callback(@callback,info,payload) - data2 = obj.send(payload[:method],payload[:data]) - payload2 = payload.merge :data => data2 - # the client MUST exist, otherwise it's an error. - ## FIXME it's bad if the server dies b/c - ## the client isn't there. It's bad that - ## this can cause the server to fail. - MQ.direct(info.reply_to,:passive => true). - publish(::Marshal.dump(payload2), - :routing_key => info.routing_key, - :message_id => info.message_id) if info.reply_to - info.ack if @ack - end - self - end - - def inspect - "#<#{self.class} #{self.name}>" - end end class Client include Callback - - # takes options available to MQ::Exchange - def initialize(server,opts={}) - @server = server - # the routing key is also used as the name of the client - @key = opts.delete :key - @key = @key.to_s if @key - @client_exchange = MQ.direct @server.client_name, opts + def initialize(server_exchange,client_exchange,queue) + @server_exchange = server_exchange + @client_exchange = client_exchange + @queue = queue end - - def name - self.exchange.name - end - - def exchange - @client_exchange - end - - # takes options available to MQ::Queue - def queue(opts={}) - unless @queue - # if key is not given, the queue name is - # the same as the exchange name. - @queue ||= MQ.queue("#{self.name}#{@key}",opts) - @queue.bind(self.exchange,:routing_key => @key || self.name) - end - self # return self to allow chaining - end - - # takes options available to MQ::Queue#subscribe + def react(callback=nil,opts=nil,&block) if block opts = callback callback = block end opts = {} if opts.nil? - - @callback = build_callback(callback) + + @callback_klass = build_callback_klass(callback) @ack = opts[:ack] - # ensure queue is set - self.queue unless @queue @queue.subscribe(opts) do |info,payload| payload = ::Marshal.load(payload) - obj = prepare_callback(@callback,info,payload) - obj.send(payload[:method],payload[:data]) + callback(info,payload) info.ack if @ack end self end - - # note that we can redirect the result to some - # place else by setting :key and :reply_to + def call(method,data=nil,meta=nil,opts={}) # opts passed to publish # if no routing key is given, use receiver's name as the routing key. version = @klass.version if @klass.respond_to? :version payload = { @@ -213,201 +141,72 @@ :data => data, :meta => meta, :version => version } - @server.exchange.publish Marshal.dump(payload), { - # opts[:routing_key] will override :key in MQ::Exchange#publish - :key => (@key ? @key : self.name), - :reply_to => self.name + # set it up s.t. server would respond to + # private queue if key is given, otherwise + # the server would respond to public queue. + key = opts.delete(:key) + @server_exchange.publish Marshal.dump(payload), { + :key => (key ? key : @server_exchange.name), + :reply_to => @client_exchange.name }.merge(opts) end - + # for casting, just null the reply_to field, so server doesn't respond. def cast(method,data=nil,meta=nil,opts={}) self.call(method,data,meta,opts.merge({:reply_to => nil})) end - def inspect - "#<#{self.class} #{self.name}>" - end + end - # assumes server initializes it with an exclusive and auto_delete queue. - # TODO timeout - class RPC - require 'thread' + class Server + include Callback - # i don't want deferrable. I want actual blockage when waiting. - ## subscribe prolly run in a different thread. - # hmmm. I guess deferrable is a better idea. - class Future - attr_reader :message_id - attr_accessor :header, :data, :meta, :timeout - def initialize(rpc,message_id) - @message_id = message_id - @rpc = rpc - @timeout = false - @done = false - end - - def wait(timeout=nil,&block) - # TODO timeout with eventmachine - @rpc.wait(self,timeout,&block) # synchronous call that will block - # EM.cancel_timer(ticket) - end - - def done! - @done = true - end - - def done? - @done - end - - def timeout? - @timeout - end - - def inspect - "#<#{self.class} #{message_id}>" - end + def initialize(server_exchange,q) + @queue = q + @server_exchange = server_exchange end - class Reactor - # want to minimize name conflicts here. - def initialize(rpc) - @rpc = rpc - end - - def method_missing(_method,data) - @rpc.buffer << [header,data,meta] - end + attr_reader :queue + def exchange + @server_exchange end - attr_reader :buffer, :futures, :ready - def initialize(server,opts={}) - @server = server - @seq = 0 - # queue is used be used to synchronize RPC - # user thread and the AMQP eventmachine thread. - @buffer = Queue.new - @ready = {} # the ready results not yet waited - @futures = {} # all futures not yet waited for. - @reactor = Reactor.new(self) - # Creates an exclusive queue to serve the RPC client. - @client = @server.client(:key => "rpc.#{rand(999_999_999_999)}"). - queue(:exclusive => true).react(@reactor,opts) - end - - def call(method,data,meta=nil,opts={}) - message_id = @seq.to_s # message gotta be unique for this RPC client. - @client.call method, data, meta, opts.merge(:message_id => message_id) - @seq += 1 - @futures[message_id] = Future.new(self,message_id) - end - - # the idea is to block on a synchronized queue - # until we get the future we want. - # - # WARNING: blocks forever if the thread - # calling wait is the same as the EventMachine - # thread. - def wait(future,timeout=nil) - return future.data if future.done? # future was waited before - timer = nil - if timeout - timer = EM.add_timer(timeout) { - @buffer << [:timeout,future.message_id,nil] - } + def react(callback=nil,opts=nil,&block) + if block + opts = callback + callback = block end - ready_future = nil - if @ready.has_key? future.message_id - @ready.delete future.message_id - ready_future = future - else - while true - header,data,meta = data = @buffer.pop # synchronize. like erlang's mailbox select. - if header == :timeout # timeout the future we are waiting for. - message_id = data - # if we got a timeout from previous wait. throw it away. - next if future.message_id != message_id - future.timeout = true - future.done! - @futures.delete future.message_id - return yield # return the value of timeout block - end - some_future = @futures[header.message_id] - # If we didn't find the future among the - # future, it must have timedout. Just - # throw result away and keep processing. - next unless some_future - some_future.header = header - some_future.data = data - some_future.meta = meta - if some_future == future - # The future we are waiting for - EM.cancel_timer(timer) - ready_future = future - break - else - # Ready, but we are not waiting for it. Save for later. - @ready[some_future.message_id] = some_future - end + opts = {} if opts.nil? + + @callback_klass = build_callback_klass(callback) + @ack = opts[:ack] + @queue.subscribe(opts) do |info,payload| + payload = ::Marshal.load(payload) + #p [info,info.reply_to,payload] + data2 = callback(info,payload) + payload2 = payload.merge :data => data2 + if info.routing_key == @server_exchange.name + # addressed to the server's public + # queue, respond to the routing_key of + # the client's public queue. + key = info.reply_to + else + # addressed to the private queue + key = info.routing_key end + MQ.direct(info.reply_to).publish(::Marshal.dump(payload2),:routing_key => key) if info.reply_to + info.ack if @ack end - ready_future.done! - @futures.delete ready_future.message_id - return ready_future.data + self end - - def waitall - @futures.values.map { |k,v| - wait(v) - } - end - - def inspect - "#<#{self.class} #{self.name}>" - end end - # TODO should prolly have the option of using - # non auto-delete queues. This would be useful - # for logger. Maybe if a peeper name is given, - # then create queues with options. class Peeper - include Callback - attr_reader :server_name - def initialize(server_name,callback) - @server_name = server_name - @clients = {} - @callback = build_callback(callback) - - uid = "#{@server_name}.peeper.#{rand 999_999_999_999}" - q = MQ.queue uid, :auto_delete => true - q.bind(@server_name) # messages to the server would be duplicated here. - q.subscribe { |info,payload| - payload = ::Marshal.load(payload) - # sets context, but doesn't make the call - obj = prepare_callback(@callback,info,payload) - # there is a specific method we want to call. - obj.server(payload[:method],payload[:data]) - - # bind to peep client message queue if we've not seen it before. - unless @clients.has_key? info.routing_key - @clients[info.routing_key] = true - client_q = MQ.queue "#{uid}--#{info.routing_key}", - :auto_delete => true - # messages to the client would be duplicated here. - client_q.bind("#{server_name}--", :routing_key => info.routing_key) - client_q.subscribe { |info,payload| - payload = ::Marshal.load(payload) - obj = prepare_callback(@callback,info,payload) - obj.client(payload[:method],payload[:data]) - } - end - } + def initialize(exchange,callback) + # create a temporary queue that binds to an exchange end end - end