lib/dripdrop/node.rb in dripdrop-0.6.0 vs lib/dripdrop/node.rb in dripdrop-0.7.1

- old
+ new

@@ -11,31 +11,39 @@ require 'dripdrop/handlers/websockets' require 'dripdrop/handlers/http' class DripDrop class Node - attr_reader :zm_reactor, :routing + attr_reader :zm_reactor, :routing, :nodelets attr_accessor :debug def initialize(opts={},&block) @zm_reactor = nil # The instance of the zmq_machine reactor @block = block @thread = nil # Thread containing the reactors @routing = {} # Routing table @debug = opts[:debug] @recipients_for = {} @handler_default_opts = {:debug => @debug} + @nodelets = {} # Cache of registered nodelets end # Starts the reactors and runs the block passed to initialize. # This is non-blocking. def start @thread = Thread.new do + EM.error_handler {|e| self.error_handler e} EM.run do ZM::Reactor.new(:my_reactor).run do |zm_reactor| @zm_reactor = zm_reactor - self.instance_eval(&@block) + if @block + self.instance_eval(&@block) + elsif self.respond_to?(:action) + self.action + else + raise "Could not start, no block or action specified" + end end end end end @@ -73,12 +81,18 @@ # :stats_sub # A regular zmq_subscribe handler # # See the docs for +routes_for+ for more info in grouping routes for # nodelets and maintaining sanity in larger apps def route(name,handler_type,*handler_args) + route_full(nil, name, handler_type, *handler_args) + end + + # Probably not useful for most, apps. This is used internally to + # create a route for a given nodelet. + def route_full(nodelet, name, handler_type, *handler_args) # If we're in a route_for block, prepend appropriately - full_name = @route_prepend ? "#{@route_prepend}_#{name}".to_sym : name + full_name = (nodelet && nodelet.name) ? "#{nodelet.name}_#{name}".to_sym : name handler = self.send(handler_type, *handler_args) @routing[full_name] = handler # Define the route name as a singleton method @@ -87,50 +101,54 @@ end handler end - # Defines a group of +route+s, to be used as the interface for a +nodelet+ - # later on. - # - # All routes defined with the +route_for+ block will be prepended with the - # +nodelet_name+ and an underscore. So, the following routes: - # - # routes_for :forwarder do - # route :input, :zmq_subscribe, 'tcp://127.0.0.1:2200', :bind - # route :output, :zmq_publish, f.in.address, :connect - # end - # - # Will yield the routes: +forwarder_input+ and +forwarder_output+ globally. - # Within the block scope of the +forwarder+ nodelet however, the routes are additionally - # available with their own short names. See the +nodelet+ method for details. + # DEPRECATED, will be deleted in 0.8 def routes_for(nodelet_name,&block) - @route_prepend = nodelet_name #This feels ugly. Blech. - block.call - @route_prepend = nil + $stderr.write "routes_for is now deprecated, use nodelet instead" + nlet = nodelet(nodelet_name,&block) + block.call(nlet) end # Nodelets are a way of segmenting a DripDrop::Node. This can be used # for both organization and deployment. One might want the production # deployment of an app to be broken across multiple servers or processes - # for instance. Additionally, by combining nodelets with +routes_for+ - # managing routes becomes a little easier. + # for instance: # - # Nodelets can be used thusly: - # routes_for :heartbeat do - # route :ticker, :zmq_publish, 'tcp://127.0.0.1', :bind + # nodelet :heartbeat do |nlet| + # nlet.route :ticker, :zmq_publish, 'tcp://127.0.0.1', :bind + # EM::PeriodicalTimer.new(1) do + # nlet.ticker.send_message(:name => 'tick') + # end # end # - # nodelet :heartbeat do - # zm_reactor.periodical_timer(500) do - # ticker.send_message(:name => 'tick') + # Nodelets can also be subclassed, for instance: + # + # class SpecialNodelet < DripDrop::Node::Nodelet + # def action + # nlet.route :ticker, :zmq_publish, 'tcp://127.0.0.1', :bind + # EM::PeriodicalTimer.new(1) do + # nlet.ticker.send_message(:name => 'tick') + # end + # end # end - def nodelet(name,&block) - nlet_obj = Nodelet.new(name,routing) - block.call(nlet_obj) + # + # nodelet :heartbeat, SpecialNodelet + # + # If you specify a block, Nodelet#action will be ignored and the block + # will be run + def nodelet(name,klass=Nodelet,&block) + nlet = @nodelets[name] ||= klass.new(self,name,routing) + if block + block.call(nlet) + else + nlet.action + end + nlet end - + # Creates a ZMQ::SUB type socket. Can only receive messages via +on_recv+. # zmq_subscribe sockets have a +topic_filter+ option, which restricts which # messages they can receive. It takes a regexp as an option. def zmq_subscribe(address,socket_ctype,opts={},&block) zmq_handler(DripDrop::ZMQSubHandler,:sub_socket,address,socket_ctype,opts) @@ -174,35 +192,33 @@ # Binds an EM websocket connection to +address+. takes blocks for # +on_open+, +on_recv+, +on_close+ and +on_error+. # # For example +on_recv+ could be used to echo incoming messages thusly: - # websocket(addr).on_open {|ws| + # websocket(addr).on_open {|conn| # ws.send_message(:name => 'ws_open_ack') - # }.on_recv {|msg,ws| - # ws.send(msg) - # }.on_close {|ws| - # }.on_error {|ws| + # }.on_recv {|msg,conn| + # conn.send(msg) + # }.on_close {|conn| + # }.on_error {|reason,conn| # } # # The +ws+ object that's passed into the handlers is not # the +DripDrop::WebSocketHandler+ object, but an em-websocket object. def websocket(address,opts={}) uri = URI.parse(address) h_opts = handler_opts_given(opts) - handler = DripDrop::WebSocketHandler.new(uri,h_opts) - handler + DripDrop::WebSocketHandler.new(uri,h_opts) end # Starts a new Thin HTTP server listening on address. # Can have an +on_recv+ handler that gets passed +msg+ and +response+ args. # http_server(addr) {|msg,response| response.send_message(msg)} def http_server(address,opts={},&block) uri = URI.parse(address) h_opts = handler_opts_given(opts) - handler = DripDrop::HTTPServerHandler.new(uri, h_opts,&block) - handler + DripDrop::HTTPServerHandler.new(uri, h_opts,&block) end # An EM HTTP client. # Example: # client = http_client(addr) @@ -210,12 +226,11 @@ # puts resp_msg.inspect # end def http_client(address,opts={}) uri = URI.parse(address) h_opts = handler_opts_given(opts) - handler = DripDrop::HTTPClientHandler.new(uri, h_opts) - handler + DripDrop::HTTPClientHandler.new(uri, h_opts) end # An inprocess pub/sub queue that works similarly to EM::Channel, # but has manually specified identifiers for subscribers letting you # more easily delete subscribers without crazy id tracking. @@ -248,9 +263,14 @@ # Deletes a subscriber to the channel +dest+ previously identified by a # reciever created with +recv_internal+ def remove_recv_internal(dest,identifier) return false unless @recipients_for[dest] @recipients_for[dest].delete(identifier) + end + + # Catch all error handler + def error_handler(e) + $stderr.write "#{e.class}: #{e.message}\n\t#{e.backtrace.join("\n\t")}" end private def zmq_handler(klass, zm_sock_type, address, socket_ctype, opts={})