lib/dripdrop/node.rb in dripdrop-0.1.0 vs lib/dripdrop/node.rb in dripdrop-0.2.0

- old
+ new

@@ -5,10 +5,11 @@ require 'uri' require 'dripdrop/message' require 'dripdrop/handlers/zeromq' require 'dripdrop/handlers/websockets' +require 'dripdrop/handlers/http' class DripDrop class Node attr_reader :zm_reactor attr_accessor :debug @@ -17,54 +18,127 @@ @handlers = {} @debug = opts[:debug] @recipients_for = {} @handler_default_opts = {:debug => @debug} @zm_reactor = nil - - EM.run do - ZM::Reactor.new(:my_reactor).run do |zm_reactor| - @zm_reactor = zm_reactor - block.call(self) + @block = block + @thread = nil + end + + def start + @thread = Thread.new do + EM.run do + ZM::Reactor.new(:my_reactor).run do |zm_reactor| + @zm_reactor = zm_reactor + self.instance_eval(&@block) + end end end end + + def join + if @thread + @thread.join + else + raise "Can't join on a node that isn't yet started" + end + end + + #Blocking version of start, equivalent to +start+ then +join+ + def start! + self.start + self.join + end + + def stop + @zm_reactor.stop + EM.stop + end + + #TODO: All these need to be majorly DRYed up + # Creates a ZMQ::SUB type socket. Can only receive messages via +on_recv+ def zmq_subscribe(address,socket_ctype,opts={},&block) zm_addr = str_to_zm_address(address) h_opts = handler_opts_given(opts) handler = DripDrop::ZMQSubHandler.new(zm_addr,@zm_reactor,socket_ctype,h_opts) @zm_reactor.sub_socket(handler) handler end + # Creates a ZMQ::PUB type socket, can only send messages via +send_message+ def zmq_publish(address,socket_ctype,opts={}) zm_addr = str_to_zm_address(address) h_opts = handler_opts_given(opts) handler = DripDrop::ZMQPubHandler.new(zm_addr,@zm_reactor,socket_ctype,h_opts) @zm_reactor.pub_socket(handler) handler end + # Creates a ZMQ::PULL type socket. Can only receive messages via +on_recv+ def zmq_pull(address,socket_ctype,opts={},&block) zm_addr = str_to_zm_address(address) h_opts = handler_opts_given(opts) handler = DripDrop::ZMQPullHandler.new(zm_addr,@zm_reactor,socket_ctype,h_opts) @zm_reactor.pull_socket(handler) handler end + # Creates a ZMQ::PUSH type socket, can only send messages via +send_message+ def zmq_push(address,socket_ctype,opts={}) zm_addr = str_to_zm_address(address) h_opts = handler_opts_given(opts) handler = DripDrop::ZMQPushHandler.new(zm_addr,@zm_reactor,socket_ctype,h_opts) @zm_reactor.push_socket(handler) handler end + # Creates a ZMQ::XREP type socket, both sends and receivesc XREP sockets are extremely + # powerful, so their functionality is currently limited. XREP sockets in DripDrop can reply + # to the original source of the message. + # + # Receiving with XREP sockets in DripDrop is different than other types of sockets, on_recv + # passes 3 arguments to its callback, +identities+, +seq+, and +message+. Identities is the + # socket identity, seq is the sequence number of the message (all messages received at the socket + # get a monotonically incrementing +seq+, and +message+ is the message itself. + # + # To reply from an xrep handler, be sure to call send messages with the same +identities+ and +seq+ + # arguments that +on_recv+ had. So, send_message takes +identities+, +seq+, and +message+. + def zmq_xrep(address,socket_ctype,opts={}) + zm_addr = str_to_zm_address(address) + h_opts = handler_opts_given(opts) + handler = DripDrop::ZMQXRepHandler.new(zm_addr,@zm_reactor,socket_ctype,h_opts) + @zm_reactor.xrep_socket(handler) + handler + end + + # See the documentation for +zmq_xrep+ for more info + def zmq_xreq(address,socket_ctype,opts={}) + zm_addr = str_to_zm_address(address) + h_opts = handler_opts_given(opts) + handler = DripDrop::ZMQXReqHandler.new(zm_addr,@zm_reactor,socket_ctype,h_opts) + @zm_reactor.xreq_socket(handler) + handler + end + def websocket(address,opts={},&block) uri = URI.parse(address) h_opts = handler_opts_given(opts) handler = DripDrop::WebSocketHandler.new(uri,h_opts) + handler + end + + def http_server(address,opts={},&block) + uri = URI.parse(address) + h_opts = handler_opts_given(opts) + handler = DripDrop::HTTPServerHandler.new(uri, h_opts,&block) + handler + end + + def http_client(address,opts={}) + uri = URI.parse(address) + h_opts = handler_opts_given(opts) + handler = DripDrop::HTTPClientHandler.new(uri, h_opts) handler end def send_internal(dest,data) return false unless @recipients_for[dest]