lib/pylon/node.rb in pylon-0.2.2 vs lib/pylon/node.rb in pylon-0.2.3
- old
+ new
@@ -76,22 +76,68 @@
def random_uuid
UUIDTools::UUID.timestamp_create
end
+ # Expects a two element tuple containing a string command and a
+ # hash of params:
+ # ["command", :params => {}]
+ def handle_command string
+ command, params = JSON.parse(string)
+ case command
+ when "sync_time"
+ Log.info "handle_command: sync time received, running ntpdate"
+ ["sync_time", %x{ntpdate -u pool.ntp.org}]
+ when "add"
+ Log.info "handle_command: add message received, params: #{params.inspect}"
+ when "new_leader"
+ Log.info "handle_command: new_leader message received"
+ new_leader = params
+ new_leader.send "add", self
+ self
+ when "status"
+ Log.info "handle_command: status message received, sending node back"
+ self
+ when "ping"
+ timestamp = Time.now.to_i
+ Log.info "handle_command: ping requested, sending pong with timestamp: #{timestamp}"
+ ["pong", timestamp]
+ when "exit"
+ error = "handle_command: exit command received"
+ error << " with message: #{params["message"]}" if params.has_key? "message"
+ Pylon::Application.fatal! error, 1
+ else
+ Pylon::Application.fatal! "handle_command: unrecognized command '#{command.inspect}' (params: #{params.inspect}), exiting!", -99
+ end
+ end
+
+ def send(command = "status", params = {})
+ Thread.new do
+ req_socket = context.socket ZMQ::REQ
+ req_socket.setsockopt ZMQ::LINGER, 0
+ req_socket.connect unicast_endpoint
+ if req_socket.send_string "command", ZMQ::SNDMORE
+ Log.debug "connect_node: sent command protocol initiator"
+ if req_socket.send_string([command, params].to_json)
+ response = JSON.parse(req_socket.recv_string)
+ end
+ end
+ response
+ end.value
+ end
+
def unicast_announcer
Thread.new do
Log.debug "unicast_announcer: zeromq pub socket announcer starting up on #{unicast_endpoint}"
- pub_socket = context.socket ZMQ::PUB
- pub_socket.setsockopt ZMQ::IDENTITY, "node"
- pub_socket.bind unicast_endpoint
+ rep_socket = context.socket ZMQ::REP
+ rep_socket.bind unicast_endpoint
loop do
+ if rep_socket.recv_string == "command"
+ rep_socket.send_string handle_command(rep_socket.recv_string).to_json if rep_socket.more_parts?
+ end
sleep_after_announce = Pylon::Config[:sleep_after_announce]
- Log.debug "#{self}: unicast announcing then sleeping #{sleep_after_announce} secs"
- pub_socket.send_string uuid.to_s, ZMQ::SNDMORE
- pub_socket.send_string self.to_json
- #Thread.pass
+ Thread.pass
sleep sleep_after_announce
end
end
end
@@ -107,11 +153,11 @@
loop do
sleep_after_announce = Pylon::Config[:sleep_after_announce]
Log.debug "#{self}: announcing then sleeping #{sleep_after_announce} secs"
pub_socket.send_string uuid.to_s, ZMQ::SNDMORE
pub_socket.send_string self.to_json
- #Thread.pass
+ Thread.pass
sleep sleep_after_announce
end
end
end
@@ -141,7 +187,9 @@
node.unicast_endpoint json["unicast_endpoint"]
node.timestamp json["timestamp"]
Log.debug "#{node}: created from json succesfully"
node
end
+
end
end
+