lib/dripdrop/node.rb in dripdrop-0.9.10 vs lib/dripdrop/node.rb in dripdrop-0.10.0.beta1

- old
+ new

@@ -7,32 +7,33 @@ require 'dripdrop/message' require 'dripdrop/node/nodelet' require 'dripdrop/handlers/base' require 'dripdrop/handlers/zeromq' -require 'dripdrop/handlers/websockets' +require 'dripdrop/handlers/websocket_server' +require 'dripdrop/handlers/mongrel2' require 'dripdrop/handlers/http_client' begin require 'dripdrop/handlers/http_server' rescue LoadError => e - $stderr.write "Could not load http server, your probably don't have eventmachine_httpserver installed\n" - $stderr.write e.message + "\n" - $stderr.write e.backtrace.join("\t\n") + $stderr.write "Warning, could not load http server, your probably don't have eventmachine_httpserver installed\n" end class DripDrop class Node ZCTX = ZMQ::Context.new 1 - attr_reader :zm_reactor, :routing, :nodelets + attr_reader :zm_reactor, :routing, :nodelets, :run_list attr_accessor :debug def initialize(opts={},&block) @block = block @thread = nil # Thread containing the reactors @routing = {} # Routing table + @run_list = opts['run_list'] || opts[:run_list] || nil #List of nodelets to run + @run_list = @run_list.map(&:to_sym) if @run_list @debug = opts[:debug] @recipients_for = {} @handler_default_opts = {:debug => @debug} @nodelets = {} # Cache of registered nodelets @zctx = ZCTX @@ -40,11 +41,11 @@ # Starts the reactors and runs the block passed to initialize. # This is non-blocking. def start @thread = Thread.new do - EM.error_handler {|e| self.error_handler e} + EM.error_handler {|e| self.class.error_handler e} EM.run { action } end end # Blocking version of start, equivalent to +start+ then +join+ @@ -144,20 +145,42 @@ # # nodelet :heartbeat, SpecialNodelet # # If you specify a block, Nodelet#action will be ignored and the block # will be run + # + # Nodelets are made available as instance methods on the current DripDrop::Nodelet + # Object, so the following works as well: + # + # nodelet :mynodelet + # + # mynodelet.route :route_name, :zmq_xreq, 'tcp://127.0.0.1:2000', ;bind def nodelet(name,klass=Nodelet,*configure_args,&block) + # If there's a run list, only run nodes in that list + return nil if @run_list && !@run_list.include?(name.to_sym) + nlet = @nodelets[name] ||= klass.new(self,name,*configure_args) + + # Define a method returning the nodelet in the current node + unless respond_to?(name) + (class << self; self; end).class_eval do + define_method(name) { nlet } + end + end + if block block.call(nlet) else nlet.action end nlet end + def zmq_m2(addresses, opts={}, &block) + zmq_handler(DripDrop::Mongrel2Handler, [ZMQ::PULL, ZMQ::PUB], addresses, [:connect, :connect], opts) + end + # Creates a ZMQ::SUB type socket. Can only receive messages via +on_recv+. # zmq_subscribe sockets have a +topic_filter+ option, which restricts which # messages they can receive. It takes a regexp as an option. def zmq_subscribe(address,socket_ctype,opts={},&block) zmq_handler(DripDrop::ZMQSubHandler,ZMQ::SUB,address,socket_ctype,opts) @@ -197,30 +220,36 @@ # See the documentation for +zmq_xrep+ for more info def zmq_xreq(address,socket_ctype,opts={}) zmq_handler(DripDrop::ZMQXReqHandler,ZMQ::XREQ,address,socket_ctype,opts) end - # Binds an EM websocket connection to +address+. takes blocks for + # Binds an EM websocket server connection to +address+. takes blocks for # +on_open+, +on_recv+, +on_close+ and +on_error+. # # For example +on_recv+ could be used to echo incoming messages thusly: - # websocket(addr).on_open {|conn| + # websocket_server(addr).on_open {|conn| # ws.send_message(:name => 'ws_open_ack') # }.on_recv {|msg,conn| # conn.send(msg) # }.on_close {|conn| # }.on_error {|reason,conn| # } # # The +ws+ object that's passed into the handlers is not # the +DripDrop::WebSocketHandler+ object, but an em-websocket object. - def websocket(address,opts={}) + def websocket_server(address,opts={}) uri = URI.parse(address) h_opts = handler_opts_given(opts) DripDrop::WebSocketHandler.new(uri,h_opts) end + # DEPRECATED: Use websocket_server + def websocket(*args) + $stderr.write "DripDrop#websocket handler is deprecated, use DripDrop#websocket_server" + websocket_server(*args) + end + # Starts a new Thin HTTP server listening on address. # Can have an +on_recv+ handler that gets passed +msg+ and +response+ args. # http_server(addr) {|msg,response| response.send_message(msg)} def http_server(address,opts={},&block) uri = URI.parse(address) @@ -283,25 +312,35 @@ end private def zmq_handler(klass, sock_type, address, socket_ctype, opts={}) - addr_uri = URI.parse(address) - - host_str = addr_uri.host - #if addr_uri.scheme == 'tcp' - # host = Resolv.getaddresses(addr_uri.host).first - # host_addr = Resolv.getaddresses('localhost').map {|a| IPAddr.new(a)}.find {|a| a.ipv4?} - # host_str = host_addr.ipv6? ? "[#{host_addr.to_s}]" : host_addr.to_s - #else - # host_str = addr_uri.host - #end + h_opts = handler_opts_given(opts) - z_addr = "#{addr_uri.scheme}://#{host_str}:#{addr_uri.port.to_i}" - h_opts = handler_opts_given(opts) - connection = EM::ZeroMQ.create @zctx, sock_type, socket_ctype, address, klass.new(h_opts) - handler = connection.handler - handler.connection = connection + sock_type = [sock_type].flatten + address = [address].flatten + socket_ctype = [socket_ctype].flatten + + handler = klass.new(h_opts) + + sock_type.length.times do |index| + addr_uri = URI.parse(address[index]) + + host_str = addr_uri.host + #if addr_uri.scheme == 'tcp' + # host = Resolv.getaddresses(addr_uri.host).first + # host_addr = Resolv.getaddresses('localhost').map {|a| IPAddr.new(a)}.find {|a| a.ipv4?} + # host_str = host_addr.ipv6? ? "[#{host_addr.to_s}]" : host_addr.to_s + #else + # host_str = addr_uri.host + #end + + z_addr = "#{addr_uri.scheme}://#{host_str}:#{addr_uri.port.to_i}" + + connection = EM::ZeroMQ::Context.new(@zctx).create sock_type[index], socket_ctype[index], z_addr, handler + handler.add_connection connection + end + handler.post_setup handler end def handler_opts_given(opts)