lib/dripdrop/node.rb in dripdrop-0.6.0 vs lib/dripdrop/node.rb in dripdrop-0.7.1
- old
+ new
@@ -11,31 +11,39 @@
require 'dripdrop/handlers/websockets'
require 'dripdrop/handlers/http'
class DripDrop
class Node
- attr_reader :zm_reactor, :routing
+ attr_reader :zm_reactor, :routing, :nodelets
attr_accessor :debug
def initialize(opts={},&block)
@zm_reactor = nil # The instance of the zmq_machine reactor
@block = block
@thread = nil # Thread containing the reactors
@routing = {} # Routing table
@debug = opts[:debug]
@recipients_for = {}
@handler_default_opts = {:debug => @debug}
+ @nodelets = {} # Cache of registered nodelets
end
# Starts the reactors and runs the block passed to initialize.
# This is non-blocking.
def start
@thread = Thread.new do
+ EM.error_handler {|e| self.error_handler e}
EM.run do
ZM::Reactor.new(:my_reactor).run do |zm_reactor|
@zm_reactor = zm_reactor
- self.instance_eval(&@block)
+ if @block
+ self.instance_eval(&@block)
+ elsif self.respond_to?(:action)
+ self.action
+ else
+ raise "Could not start, no block or action specified"
+ end
end
end
end
end
@@ -73,12 +81,18 @@
# :stats_sub # A regular zmq_subscribe handler
#
# See the docs for +routes_for+ for more info in grouping routes for
# nodelets and maintaining sanity in larger apps
def route(name,handler_type,*handler_args)
+ route_full(nil, name, handler_type, *handler_args)
+ end
+
+ # Probably not useful for most, apps. This is used internally to
+ # create a route for a given nodelet.
+ def route_full(nodelet, name, handler_type, *handler_args)
# If we're in a route_for block, prepend appropriately
- full_name = @route_prepend ? "#{@route_prepend}_#{name}".to_sym : name
+ full_name = (nodelet && nodelet.name) ? "#{nodelet.name}_#{name}".to_sym : name
handler = self.send(handler_type, *handler_args)
@routing[full_name] = handler
# Define the route name as a singleton method
@@ -87,50 +101,54 @@
end
handler
end
- # Defines a group of +route+s, to be used as the interface for a +nodelet+
- # later on.
- #
- # All routes defined with the +route_for+ block will be prepended with the
- # +nodelet_name+ and an underscore. So, the following routes:
- #
- # routes_for :forwarder do
- # route :input, :zmq_subscribe, 'tcp://127.0.0.1:2200', :bind
- # route :output, :zmq_publish, f.in.address, :connect
- # end
- #
- # Will yield the routes: +forwarder_input+ and +forwarder_output+ globally.
- # Within the block scope of the +forwarder+ nodelet however, the routes are additionally
- # available with their own short names. See the +nodelet+ method for details.
+ # DEPRECATED, will be deleted in 0.8
def routes_for(nodelet_name,&block)
- @route_prepend = nodelet_name #This feels ugly. Blech.
- block.call
- @route_prepend = nil
+ $stderr.write "routes_for is now deprecated, use nodelet instead"
+ nlet = nodelet(nodelet_name,&block)
+ block.call(nlet)
end
# Nodelets are a way of segmenting a DripDrop::Node. This can be used
# for both organization and deployment. One might want the production
# deployment of an app to be broken across multiple servers or processes
- # for instance. Additionally, by combining nodelets with +routes_for+
- # managing routes becomes a little easier.
+ # for instance:
#
- # Nodelets can be used thusly:
- # routes_for :heartbeat do
- # route :ticker, :zmq_publish, 'tcp://127.0.0.1', :bind
+ # nodelet :heartbeat do |nlet|
+ # nlet.route :ticker, :zmq_publish, 'tcp://127.0.0.1', :bind
+ # EM::PeriodicalTimer.new(1) do
+ # nlet.ticker.send_message(:name => 'tick')
+ # end
# end
#
- # nodelet :heartbeat do
- # zm_reactor.periodical_timer(500) do
- # ticker.send_message(:name => 'tick')
+ # Nodelets can also be subclassed, for instance:
+ #
+ # class SpecialNodelet < DripDrop::Node::Nodelet
+ # def action
+ # nlet.route :ticker, :zmq_publish, 'tcp://127.0.0.1', :bind
+ # EM::PeriodicalTimer.new(1) do
+ # nlet.ticker.send_message(:name => 'tick')
+ # end
+ # end
# end
- def nodelet(name,&block)
- nlet_obj = Nodelet.new(name,routing)
- block.call(nlet_obj)
+ #
+ # nodelet :heartbeat, SpecialNodelet
+ #
+ # If you specify a block, Nodelet#action will be ignored and the block
+ # will be run
+ def nodelet(name,klass=Nodelet,&block)
+ nlet = @nodelets[name] ||= klass.new(self,name,routing)
+ if block
+ block.call(nlet)
+ else
+ nlet.action
+ end
+ nlet
end
-
+
# Creates a ZMQ::SUB type socket. Can only receive messages via +on_recv+.
# zmq_subscribe sockets have a +topic_filter+ option, which restricts which
# messages they can receive. It takes a regexp as an option.
def zmq_subscribe(address,socket_ctype,opts={},&block)
zmq_handler(DripDrop::ZMQSubHandler,:sub_socket,address,socket_ctype,opts)
@@ -174,35 +192,33 @@
# Binds an EM websocket connection to +address+. takes blocks for
# +on_open+, +on_recv+, +on_close+ and +on_error+.
#
# For example +on_recv+ could be used to echo incoming messages thusly:
- # websocket(addr).on_open {|ws|
+ # websocket(addr).on_open {|conn|
# ws.send_message(:name => 'ws_open_ack')
- # }.on_recv {|msg,ws|
- # ws.send(msg)
- # }.on_close {|ws|
- # }.on_error {|ws|
+ # }.on_recv {|msg,conn|
+ # conn.send(msg)
+ # }.on_close {|conn|
+ # }.on_error {|reason,conn|
# }
#
# The +ws+ object that's passed into the handlers is not
# the +DripDrop::WebSocketHandler+ object, but an em-websocket object.
def websocket(address,opts={})
uri = URI.parse(address)
h_opts = handler_opts_given(opts)
- handler = DripDrop::WebSocketHandler.new(uri,h_opts)
- handler
+ DripDrop::WebSocketHandler.new(uri,h_opts)
end
# Starts a new Thin HTTP server listening on address.
# Can have an +on_recv+ handler that gets passed +msg+ and +response+ args.
# http_server(addr) {|msg,response| response.send_message(msg)}
def http_server(address,opts={},&block)
uri = URI.parse(address)
h_opts = handler_opts_given(opts)
- handler = DripDrop::HTTPServerHandler.new(uri, h_opts,&block)
- handler
+ DripDrop::HTTPServerHandler.new(uri, h_opts,&block)
end
# An EM HTTP client.
# Example:
# client = http_client(addr)
@@ -210,12 +226,11 @@
# puts resp_msg.inspect
# end
def http_client(address,opts={})
uri = URI.parse(address)
h_opts = handler_opts_given(opts)
- handler = DripDrop::HTTPClientHandler.new(uri, h_opts)
- handler
+ DripDrop::HTTPClientHandler.new(uri, h_opts)
end
# An inprocess pub/sub queue that works similarly to EM::Channel,
# but has manually specified identifiers for subscribers letting you
# more easily delete subscribers without crazy id tracking.
@@ -248,9 +263,14 @@
# Deletes a subscriber to the channel +dest+ previously identified by a
# reciever created with +recv_internal+
def remove_recv_internal(dest,identifier)
return false unless @recipients_for[dest]
@recipients_for[dest].delete(identifier)
+ end
+
+ # Catch all error handler
+ def error_handler(e)
+ $stderr.write "#{e.class}: #{e.message}\n\t#{e.backtrace.join("\n\t")}"
end
private
def zmq_handler(klass, zm_sock_type, address, socket_ctype, opts={})