lib/dripdrop/node.rb in dripdrop-0.8.1 vs lib/dripdrop/node.rb in dripdrop-0.9.2
- old
+ new
@@ -1,8 +1,7 @@
require 'rubygems'
require 'ffi-rzmq'
-require 'zmqmachine'
require 'eventmachine'
require 'uri'
require 'resolv'
require 'ipaddr'
@@ -13,39 +12,38 @@
require 'dripdrop/handlers/websockets'
require 'dripdrop/handlers/http'
class DripDrop
class Node
+ ZCTX = ZMQ::Context.new 1
+
attr_reader :zm_reactor, :routing, :nodelets
attr_accessor :debug
def initialize(opts={},&block)
- @zm_reactor = nil # The instance of the zmq_machine reactor
@block = block
@thread = nil # Thread containing the reactors
@routing = {} # Routing table
@debug = opts[:debug]
@recipients_for = {}
@handler_default_opts = {:debug => @debug}
@nodelets = {} # Cache of registered nodelets
+ @zctx = ZCTX
end
# Starts the reactors and runs the block passed to initialize.
# This is non-blocking.
def start
@thread = Thread.new do
EM.error_handler {|e| self.error_handler e}
EM.run do
- ZM::Reactor.new(:my_reactor).run do |zm_reactor|
- @zm_reactor = zm_reactor
- if @block
- self.instance_eval(&@block)
- elsif self.respond_to?(:action)
- self.action
- else
- raise "Could not start, no block or action specified"
- end
+ if @block
+ self.instance_eval(&@block)
+ elsif self.respond_to?(:action)
+ self.action
+ else
+ raise "Could not start, no block or action specified"
end
end
end
end
@@ -66,11 +64,10 @@
self.join
end
# Stops the reactors. If you were blocked on #join, that will unblock.
def stop
- @zm_reactor.stop
EM.stop
end
# Defines a new route. Routes are the recommended way to instantiate
# handlers. For example:
@@ -151,26 +148,26 @@
# Creates a ZMQ::SUB type socket. Can only receive messages via +on_recv+.
# zmq_subscribe sockets have a +topic_filter+ option, which restricts which
# messages they can receive. It takes a regexp as an option.
def zmq_subscribe(address,socket_ctype,opts={},&block)
- zmq_handler(DripDrop::ZMQSubHandler,:sub_socket,address,socket_ctype,opts)
+ zmq_handler(DripDrop::ZMQSubHandler,ZMQ::SUB,address,socket_ctype,opts)
end
# Creates a ZMQ::PUB type socket, can only send messages via +send_message+
def zmq_publish(address,socket_ctype,opts={})
- zmq_handler(DripDrop::ZMQPubHandler,:pub_socket,address,socket_ctype,opts)
+ zmq_handler(DripDrop::ZMQPubHandler,ZMQ::PUB,address,socket_ctype,opts)
end
# Creates a ZMQ::PULL type socket. Can only receive messages via +on_recv+
def zmq_pull(address,socket_ctype,opts={},&block)
- zmq_handler(DripDrop::ZMQPullHandler,:pull_socket,address,socket_ctype,opts)
+ zmq_handler(DripDrop::ZMQPullHandler,ZMQ::PULL,address,socket_ctype,opts)
end
# Creates a ZMQ::PUSH type socket, can only send messages via +send_message+
def zmq_push(address,socket_ctype,opts={})
- zmq_handler(DripDrop::ZMQPushHandler,:push_socket,address,socket_ctype,opts)
+ zmq_handler(DripDrop::ZMQPushHandler,ZMQ::PUSH,address,socket_ctype,opts)
end
# Creates a ZMQ::XREP type socket, both sends and receivesc XREP sockets are extremely
# powerful, so their functionality is currently limited. XREP sockets in DripDrop can reply
# to the original source of the message.
@@ -182,16 +179,16 @@
# zmq_xrep(z_addr, :bind).on_recv do |message,response|
# response.send_message(message)
# end
#
def zmq_xrep(address,socket_ctype,opts={})
- zmq_handler(DripDrop::ZMQXRepHandler,:xrep_socket,address,socket_ctype,opts)
+ zmq_handler(DripDrop::ZMQXRepHandler,ZMQ::XREP,address,socket_ctype,opts)
end
# See the documentation for +zmq_xrep+ for more info
def zmq_xreq(address,socket_ctype,opts={})
- zmq_handler(DripDrop::ZMQXReqHandler,:xreq_socket,address,socket_ctype,opts)
+ zmq_handler(DripDrop::ZMQXReqHandler,ZMQ::XREQ,address,socket_ctype,opts)
end
# Binds an EM websocket connection to +address+. takes blocks for
# +on_open+, +on_recv+, +on_close+ and +on_error+.
#
@@ -273,21 +270,27 @@
def error_handler(e)
$stderr.write "#{e.class}: #{e.message}\n\t#{e.backtrace.join("\n\t")}"
end
private
-
- def zmq_handler(klass, zm_sock_type, address, socket_ctype, opts={})
+
+ def zmq_handler(klass, sock_type, address, socket_ctype, opts={})
addr_uri = URI.parse(address)
-
- host = Resolv.getaddresses(addr_uri.host).first
- host_addr = Resolv.getaddresses('localhost').map {|a| IPAddr.new(a)}.find {|a| a.ipv4?}
- host_str = host_addr.ipv6? ? "[#{host_addr.to_s}]" : host_addr.to_s
+
+ if addr_uri.scheme == 'tcp'
+ host = Resolv.getaddresses(addr_uri.host).first
+ host_addr = Resolv.getaddresses('localhost').map {|a| IPAddr.new(a)}.find {|a| a.ipv4?}
+ host_str = host_addr.ipv6? ? "[#{host_addr.to_s}]" : host_addr.to_s
+ else
+ host_str = addr_uri.host
+ end
- zm_addr = ZM::Address.new(host_str,addr_uri.port.to_i,addr_uri.scheme.to_sym)
- h_opts = handler_opts_given(opts)
- handler = klass.new(zm_addr,@zm_reactor,socket_ctype,h_opts)
- @zm_reactor.send(zm_sock_type,handler)
+ z_addr = "#{addr_uri.scheme}://#{host_str}:#{addr_uri.port.to_i}"
+ h_opts = handler_opts_given(opts)
+ connection = EM::ZeroMQ.create @zctx, sock_type, socket_ctype, address, klass.new
+ handler = connection.handler
+ handler.connection = connection
+ handler.post_setup
handler
end
def handler_opts_given(opts)
@handler_default_opts.merge(opts)