lib/marionette/master.rb in marionette-0.0.8 vs lib/marionette/master.rb in marionette-0.0.9

- old
+ new

@@ -16,30 +16,89 @@ end # Sends a msg to puppet node # and stands by for a reply # and processes reply - def talk(msg) + def talk(msg, send_guarantee = false, poll_guarantee = false) # Initial connection to socket @socket = socket_connect + @poll = true - # Repeat until talk succeeds. - while true do + # Initiate send + begin + + # if send successful start polling + @socket.send_string Marshal.dump(msg), ZMQ::NOBLOCK + + rescue + + if send_guarantee + + # Keep sending till successful + while true do + + begin + + @socket.send_string Marshal.dump(msg), ZMQ::NOBLOCK + break + + rescue + + # sleep half a second before next attempt + sleep 500 + + end - # Send the message - @socket.send_string Marshal.dump(msg) + end + + else + + # don't poll if send failed and no guarantee + @poll = false + + end + + end - # Poll server until it is receive-able and re-poll if necessary - poller = Poller.new @socket, {:max => 10, :interval => 500} - break if poller.pull? - @socket = socket_reconnect(@socket) + if poll? + options = {:max => 10, :interval => 500} + poller = Poller.new @socket, options + + if not poller.pull? and poll_guarantee + + # Repeat poll till reply receive-able + while true do + + @socket = socket_reconnect(@socket) + poller = Poller.new @socket, options + break if poller.pull? + + end + + end + + if poller.pull? + + # Fetch reply + @reply = process_message(@socket.recv_string) + + else + + # Polled but no reply + @reply = "Polled #{options[:max]} times every #{options[:interval]} milliseconds but no reply." + + end + + else + + # Send failed + @reply = "Send failed!" + end - @reply = process_message(@socket.recv_string) - end private # Re-connect master to puppet socket @@ -48,10 +107,14 @@ socket.close socket_connect end + def poll? + @poll + end + # Connect master to puppet socket def socket_connect(uri = @uri) # Set ZMQ context context = ZMQ::Context.new(1) @@ -77,9 +140,25 @@ msg = response end end + + def send_successful + + while true do + + # Send the message and catch send failure + begin + + @socket.send_string Marshal.dump(msg), ZMQ::NOBLOCK + + rescue + + end + end + + end class Poller < ZMQ::Poller attr_accessor :pull, :reconnect def initialize(socket, options)