lib/dripdrop/node.rb in dripdrop-0.8.1 vs lib/dripdrop/node.rb in dripdrop-0.9.2

- old
+ new

@@ -1,8 +1,7 @@ require 'rubygems' require 'ffi-rzmq' -require 'zmqmachine' require 'eventmachine' require 'uri' require 'resolv' require 'ipaddr' @@ -13,39 +12,38 @@ require 'dripdrop/handlers/websockets' require 'dripdrop/handlers/http' class DripDrop class Node + ZCTX = ZMQ::Context.new 1 + attr_reader :zm_reactor, :routing, :nodelets attr_accessor :debug def initialize(opts={},&block) - @zm_reactor = nil # The instance of the zmq_machine reactor @block = block @thread = nil # Thread containing the reactors @routing = {} # Routing table @debug = opts[:debug] @recipients_for = {} @handler_default_opts = {:debug => @debug} @nodelets = {} # Cache of registered nodelets + @zctx = ZCTX end # Starts the reactors and runs the block passed to initialize. # This is non-blocking. def start @thread = Thread.new do EM.error_handler {|e| self.error_handler e} EM.run do - ZM::Reactor.new(:my_reactor).run do |zm_reactor| - @zm_reactor = zm_reactor - if @block - self.instance_eval(&@block) - elsif self.respond_to?(:action) - self.action - else - raise "Could not start, no block or action specified" - end + if @block + self.instance_eval(&@block) + elsif self.respond_to?(:action) + self.action + else + raise "Could not start, no block or action specified" end end end end @@ -66,11 +64,10 @@ self.join end # Stops the reactors. If you were blocked on #join, that will unblock. def stop - @zm_reactor.stop EM.stop end # Defines a new route. Routes are the recommended way to instantiate # handlers. For example: @@ -151,26 +148,26 @@ # Creates a ZMQ::SUB type socket. Can only receive messages via +on_recv+. # zmq_subscribe sockets have a +topic_filter+ option, which restricts which # messages they can receive. It takes a regexp as an option. def zmq_subscribe(address,socket_ctype,opts={},&block) - zmq_handler(DripDrop::ZMQSubHandler,:sub_socket,address,socket_ctype,opts) + zmq_handler(DripDrop::ZMQSubHandler,ZMQ::SUB,address,socket_ctype,opts) end # Creates a ZMQ::PUB type socket, can only send messages via +send_message+ def zmq_publish(address,socket_ctype,opts={}) - zmq_handler(DripDrop::ZMQPubHandler,:pub_socket,address,socket_ctype,opts) + zmq_handler(DripDrop::ZMQPubHandler,ZMQ::PUB,address,socket_ctype,opts) end # Creates a ZMQ::PULL type socket. Can only receive messages via +on_recv+ def zmq_pull(address,socket_ctype,opts={},&block) - zmq_handler(DripDrop::ZMQPullHandler,:pull_socket,address,socket_ctype,opts) + zmq_handler(DripDrop::ZMQPullHandler,ZMQ::PULL,address,socket_ctype,opts) end # Creates a ZMQ::PUSH type socket, can only send messages via +send_message+ def zmq_push(address,socket_ctype,opts={}) - zmq_handler(DripDrop::ZMQPushHandler,:push_socket,address,socket_ctype,opts) + zmq_handler(DripDrop::ZMQPushHandler,ZMQ::PUSH,address,socket_ctype,opts) end # Creates a ZMQ::XREP type socket, both sends and receivesc XREP sockets are extremely # powerful, so their functionality is currently limited. XREP sockets in DripDrop can reply # to the original source of the message. @@ -182,16 +179,16 @@ # zmq_xrep(z_addr, :bind).on_recv do |message,response| # response.send_message(message) # end # def zmq_xrep(address,socket_ctype,opts={}) - zmq_handler(DripDrop::ZMQXRepHandler,:xrep_socket,address,socket_ctype,opts) + zmq_handler(DripDrop::ZMQXRepHandler,ZMQ::XREP,address,socket_ctype,opts) end # See the documentation for +zmq_xrep+ for more info def zmq_xreq(address,socket_ctype,opts={}) - zmq_handler(DripDrop::ZMQXReqHandler,:xreq_socket,address,socket_ctype,opts) + zmq_handler(DripDrop::ZMQXReqHandler,ZMQ::XREQ,address,socket_ctype,opts) end # Binds an EM websocket connection to +address+. takes blocks for # +on_open+, +on_recv+, +on_close+ and +on_error+. # @@ -273,21 +270,27 @@ def error_handler(e) $stderr.write "#{e.class}: #{e.message}\n\t#{e.backtrace.join("\n\t")}" end private - - def zmq_handler(klass, zm_sock_type, address, socket_ctype, opts={}) + + def zmq_handler(klass, sock_type, address, socket_ctype, opts={}) addr_uri = URI.parse(address) - - host = Resolv.getaddresses(addr_uri.host).first - host_addr = Resolv.getaddresses('localhost').map {|a| IPAddr.new(a)}.find {|a| a.ipv4?} - host_str = host_addr.ipv6? ? "[#{host_addr.to_s}]" : host_addr.to_s + + if addr_uri.scheme == 'tcp' + host = Resolv.getaddresses(addr_uri.host).first + host_addr = Resolv.getaddresses('localhost').map {|a| IPAddr.new(a)}.find {|a| a.ipv4?} + host_str = host_addr.ipv6? ? "[#{host_addr.to_s}]" : host_addr.to_s + else + host_str = addr_uri.host + end - zm_addr = ZM::Address.new(host_str,addr_uri.port.to_i,addr_uri.scheme.to_sym) - h_opts = handler_opts_given(opts) - handler = klass.new(zm_addr,@zm_reactor,socket_ctype,h_opts) - @zm_reactor.send(zm_sock_type,handler) + z_addr = "#{addr_uri.scheme}://#{host_str}:#{addr_uri.port.to_i}" + h_opts = handler_opts_given(opts) + connection = EM::ZeroMQ.create @zctx, sock_type, socket_ctype, address, klass.new + handler = connection.handler + handler.connection = connection + handler.post_setup handler end def handler_opts_given(opts) @handler_default_opts.merge(opts)