lib/dripdrop/node.rb in dripdrop-0.3.1 vs lib/dripdrop/node.rb in dripdrop-0.4.0

- old
+ new

@@ -3,27 +3,29 @@ require 'zmqmachine' require 'eventmachine' require 'uri' require 'dripdrop/message' +require 'dripdrop/node/nodelet' +require 'dripdrop/handlers/base' require 'dripdrop/handlers/zeromq' require 'dripdrop/handlers/websockets' require 'dripdrop/handlers/http' class DripDrop class Node - attr_reader :zm_reactor + attr_reader :zm_reactor, :routing attr_accessor :debug def initialize(opts={},&block) - @handlers = {} - @debug = opts[:debug] - @recipients_for = {} + @zm_reactor = nil # The instance of the zmq_machine reactor + @block = block + @thread = nil # Thread containing the reactors + @routing = {} # Routing table + @debug = opts[:debug] + @recipients_for = {} @handler_default_opts = {:debug => @debug} - @zm_reactor = nil - @block = block - @thread = nil end # Starts the reactors and runs the block passed to initialize. # This is non-blocking. def start @@ -58,67 +60,143 @@ def stop @zm_reactor.stop EM.stop end - # Creates a ZMQ::SUB type socket. Can only receive messages via +on_recv+ + # Defines a new route. Routes are the recommended way to instantiate + # handlers. For example: + # + # route :stats_pub, :zmq_publish, 'tcp://127.0.0.1:2200', :bind + # route :stats_sub, :zmq_subscribe, stats_pub.address, :connect + # + # Will make the following methods available within the reactor block: + # stats_pub # A regular zmq_publish handler + # :stats_sub # A regular zmq_subscribe handler + # + # See the docs for +routes_for+ for more info in grouping routes for + # nodelets and maintaining sanity in larger apps + def route(name,handler_type,*handler_args) + # If we're in a route_for block, prepend appropriately + full_name = @route_prepend ? "#{@route_prepend}_#{name}".to_sym : name + + handler = self.send(handler_type, *handler_args) + @routing[full_name] = handler + + # Define the route name as a singleton method + (class << self; self; end).class_eval do + define_method(full_name) { handler } + end + + handler + end + + # Defines a group of +route+s, to be used as the interface for a +nodelet+ + # later on. + # + # All routes defined with the +route_for+ block will be prepended with the + # +nodelet_name+ and an underscore. So, the following routes: + # + # routes_for :forwarder do + # route :input, :zmq_subscribe, 'tcp://127.0.0.1:2200', :bind + # route :output, :zmq_publish, f.in.address, :connect + # end + # + # Will yield the routes: +forwarder_input+ and +forwarder_output+ globally. + # Within the block scope of the +forwarder+ nodelet however, the routes are additionally + # available with their own short names. See the +nodelet+ method for details. + def routes_for(nodelet_name,&block) + @route_prepend = nodelet_name #This feels ugly. Blech. + block.call + @route_prepend = nil + end + + # Nodelets are a way of segmenting a DripDrop::Node. This can be used + # for both organization and deployment. One might want the production + # deployment of an app to be broken across multiple servers or processes + # for instance. Additionally, by combining nodelets with +routes_for+ + # managing routes becomes a little easier. + # + # Nodelets can be used thusly: + # routes_for :heartbeat do + # route :ticker, :zmq_publish, 'tcp://127.0.0.1', :bind + # end + # + # nodelet :heartbeat do + # zm_reactor.periodical_timer(500) do + # ticker.send_message(:name => 'tick') + # end + def nodelet(name,&block) + nlet_obj = Nodelet.new(name,routing) + block.call(nlet_obj) + end + + # Creates a ZMQ::SUB type socket. Can only receive messages via +on_recv+. + # zmq_subscribe sockets have a +topic_filter+ option, which restricts which + # messages they can receive. It takes a regexp as an option. def zmq_subscribe(address,socket_ctype,opts={},&block) - zmq_handler(DripDrop::ZMQSubHandler,:sub_socket,address,socket_ctype,opts={}) + zmq_handler(DripDrop::ZMQSubHandler,:sub_socket,address,socket_ctype,opts) end # Creates a ZMQ::PUB type socket, can only send messages via +send_message+ def zmq_publish(address,socket_ctype,opts={}) - zmq_handler(DripDrop::ZMQPubHandler,:pub_socket,address,socket_ctype,opts={}) + zmq_handler(DripDrop::ZMQPubHandler,:pub_socket,address,socket_ctype,opts) end # Creates a ZMQ::PULL type socket. Can only receive messages via +on_recv+ def zmq_pull(address,socket_ctype,opts={},&block) - zmq_handler(DripDrop::ZMQPullHandler,:pull_socket,address,socket_ctype,opts={}) + zmq_handler(DripDrop::ZMQPullHandler,:pull_socket,address,socket_ctype,opts) end # Creates a ZMQ::PUSH type socket, can only send messages via +send_message+ def zmq_push(address,socket_ctype,opts={}) - zmq_handler(DripDrop::ZMQPushHandler,:push_socket,address,socket_ctype,opts={}) + zmq_handler(DripDrop::ZMQPushHandler,:push_socket,address,socket_ctype,opts) end # Creates a ZMQ::XREP type socket, both sends and receivesc XREP sockets are extremely # powerful, so their functionality is currently limited. XREP sockets in DripDrop can reply # to the original source of the message. # # Receiving with XREP sockets in DripDrop is different than other types of sockets, on_recv - # passes 3 arguments to its callback, +identities+, +seq+, and +message+. Identities is the - # socket identity, seq is the sequence number of the message (all messages received at the socket - # get a monotonically incrementing +seq+, and +message+ is the message itself. - # - # To reply from an xrep handler, be sure to call send messages with the same +identities+ and +seq+ - # arguments that +on_recv+ had. So, send_message takes +identities+, +seq+, and +message+. + # passes 2 arguments to its callback, +message+, and +response+. A minimal example is shown below: + # + # + # zmq_xrep(z_addr, :bind).on_recv do |message,response| + # response.send_message(message) + # end + # def zmq_xrep(address,socket_ctype,opts={}) - zmq_handler(DripDrop::ZMQXRepHandler,:xrep_socket,address,socket_ctype,opts={}) + zmq_handler(DripDrop::ZMQXRepHandler,:xrep_socket,address,socket_ctype,opts) end # See the documentation for +zmq_xrep+ for more info def zmq_xreq(address,socket_ctype,opts={}) - zmq_handler(DripDrop::ZMQXReqHandler,:xreq_socket,address,socket_ctype,opts={}) + zmq_handler(DripDrop::ZMQXReqHandler,:xreq_socket,address,socket_ctype,opts) end # Binds an EM websocket connection to +address+. takes blocks for # +on_open+, +on_recv+, +on_close+ and +on_error+. # # For example +on_recv+ could be used to echo incoming messages thusly: - # websocket(addr).on_recv {|msg,websocket| ws.send(msg)} + # websocket(addr).on_open {|ws| + # ws.send_message(:name => 'ws_open_ack') + # }.on_recv {|msg,ws| + # ws.send(msg) + # }.on_close {|ws| + # }.on_error {|ws| + # } # - # All other events only receive the +websocket+ object, which corresponds - # not to the +DripDrop::WebSocketHandler+ object, but to an em-websocket object. + # The +ws+ object that's passed into the handlers is not + # the +DripDrop::WebSocketHandler+ object, but an em-websocket object. def websocket(address,opts={}) uri = URI.parse(address) h_opts = handler_opts_given(opts) handler = DripDrop::WebSocketHandler.new(uri,h_opts) handler end # Starts a new Thin HTTP server listening on address. - # Can have an +on_recv+ handler that gets passed a single +response+ arg. - # http_server(addr) {|response,msg| response.send_message(msg)} + # Can have an +on_recv+ handler that gets passed +msg+ and +response+ args. + # http_server(addr) {|msg,response| response.send_message(msg)} def http_server(address,opts={},&block) uri = URI.parse(address) h_opts = handler_opts_given(opts) handler = DripDrop::HTTPServerHandler.new(uri, h_opts,&block) handler