lib/pylon/node.rb in pylon-0.2.2 vs lib/pylon/node.rb in pylon-0.2.3

- old
+ new

@@ -76,22 +76,68 @@ def random_uuid UUIDTools::UUID.timestamp_create end + # Expects a two element tuple containing a string command and a + # hash of params: + # ["command", :params => {}] + def handle_command string + command, params = JSON.parse(string) + case command + when "sync_time" + Log.info "handle_command: sync time received, running ntpdate" + ["sync_time", %x{ntpdate -u pool.ntp.org}] + when "add" + Log.info "handle_command: add message received, params: #{params.inspect}" + when "new_leader" + Log.info "handle_command: new_leader message received" + new_leader = params + new_leader.send "add", self + self + when "status" + Log.info "handle_command: status message received, sending node back" + self + when "ping" + timestamp = Time.now.to_i + Log.info "handle_command: ping requested, sending pong with timestamp: #{timestamp}" + ["pong", timestamp] + when "exit" + error = "handle_command: exit command received" + error << " with message: #{params["message"]}" if params.has_key? "message" + Pylon::Application.fatal! error, 1 + else + Pylon::Application.fatal! "handle_command: unrecognized command '#{command.inspect}' (params: #{params.inspect}), exiting!", -99 + end + end + + def send(command = "status", params = {}) + Thread.new do + req_socket = context.socket ZMQ::REQ + req_socket.setsockopt ZMQ::LINGER, 0 + req_socket.connect unicast_endpoint + if req_socket.send_string "command", ZMQ::SNDMORE + Log.debug "connect_node: sent command protocol initiator" + if req_socket.send_string([command, params].to_json) + response = JSON.parse(req_socket.recv_string) + end + end + response + end.value + end + def unicast_announcer Thread.new do Log.debug "unicast_announcer: zeromq pub socket announcer starting up on #{unicast_endpoint}" - pub_socket = context.socket ZMQ::PUB - pub_socket.setsockopt ZMQ::IDENTITY, "node" - pub_socket.bind unicast_endpoint + rep_socket = context.socket ZMQ::REP + rep_socket.bind unicast_endpoint loop do + if rep_socket.recv_string == "command" + rep_socket.send_string handle_command(rep_socket.recv_string).to_json if rep_socket.more_parts? + end sleep_after_announce = Pylon::Config[:sleep_after_announce] - Log.debug "#{self}: unicast announcing then sleeping #{sleep_after_announce} secs" - pub_socket.send_string uuid.to_s, ZMQ::SNDMORE - pub_socket.send_string self.to_json - #Thread.pass + Thread.pass sleep sleep_after_announce end end end @@ -107,11 +153,11 @@ loop do sleep_after_announce = Pylon::Config[:sleep_after_announce] Log.debug "#{self}: announcing then sleeping #{sleep_after_announce} secs" pub_socket.send_string uuid.to_s, ZMQ::SNDMORE pub_socket.send_string self.to_json - #Thread.pass + Thread.pass sleep sleep_after_announce end end end @@ -141,7 +187,9 @@ node.unicast_endpoint json["unicast_endpoint"] node.timestamp json["timestamp"] Log.debug "#{node}: created from json succesfully" node end + end end +