lib/arborist/monitor/socket.rb in arborist-0.0.1.pre20160128152542 vs lib/arborist/monitor/socket.rb in arborist-0.0.1.pre20160606141735
- old
+ new
@@ -1,20 +1,20 @@
# -*- ruby -*-
#encoding: utf-8
require 'loggability'
require 'timeout'
+require 'socket'
require 'arborist/monitor' unless defined?( Arborist::Monitor )
using Arborist::TimeRefinements
# Socket-related Arborist monitor logic
module Arborist::Monitor::Socket
-
# Arborist TCP socket monitor logic
class TCP
extend Loggability
log_to :arborist
@@ -117,10 +117,12 @@
# Now wait for connections to complete
until connections.empty? || timeout_at.past?
self.log.debug "Waiting on %d connections for %0.3ds..." %
[ connections.values.length, timeout_at - Time.now ]
+
+ # :FIXME: Race condition: errors if timeout_at - Time.now is 0
_, ready, _ = IO.select( nil, connections.keys, nil, timeout_at - Time.now )
self.log.debug " select returned: %p" % [ ready ]
ready.each do |sock|
self.log.debug " %p is ready" % [ sock ]
@@ -152,11 +154,130 @@
sock.close
end
return results
end
-
end # class TCP
+
+
+ # Arborist UDP socket monitor logic
+ class UDP
+ extend Loggability
+ log_to :arborist
+
+
+ # Defaults for instances of this monitor
+ DEFAULT_OPTIONS = {
+ timeout: 0.001
+ }
+
+
+ ### Instantiate a monitor check and run it for the specified +nodes+.
+ def self::run( nodes )
+ return self.new.run( nodes )
+ end
+
+
+ ### Create a new UDP monitor with the specified +options+. Valid options are:
+ ###
+ ### +:timeout+
+ ### Set the number of seconds to wait for a connection for each node.
+ def initialize( options=DEFAULT_OPTIONS )
+ options = DEFAULT_OPTIONS.merge( options || {} )
+
+ options.each do |name, value|
+ self.public_send( "#{name}=", value )
+ end
+ end
+
+
+ ######
+ public
+ ######
+
+ # The timeout for connecting, in seconds.
+ attr_accessor :timeout
+
+
+ ### Run the UDP check for each of the specified Hash of +nodes+ and return a Hash of
+ ### updates for them based on trying to connect to them.
+ def run( nodes )
+ self.log.debug "Got nodes to UDP check: %p" % [ nodes ]
+
+ connections = self.make_connections( nodes )
+ return self.wait_for_connections( connections )
+ end
+
+
+ ### Open a socket for each of the specified nodes and return a Hash of
+ ### the sockets (or the error from the connection attempt) keyed by
+ ### node identifier.
+ def make_connections( nodes )
+ return nodes.each_with_object( {} ) do |(identifier, node_data), accum|
+
+ address = node_data['addresses'].first
+ port = node_data['port']
+
+ self.log.debug "Creating UDP connection for %s:%d" % [ address, port ]
+ sock = Socket.new( :INET, :DGRAM )
+
+ conn = begin
+ sockaddr = Socket.sockaddr_in( port, address )
+ sock.connect( sockaddr )
+ sock.send( '', 0 )
+ sock
+ rescue SocketError => err
+ self.log.error " %p setting up connection: %s" % [ err.class, err.message ]
+ err
+ end
+
+ accum[ conn ] = [ identifier, sock ]
+ end
+ end
+
+
+ ### For any elements of +connections+ that are sockets, wait on them to complete or error
+ ### and then return a Hash of node updates keyed by identifier based on the results.
+ def wait_for_connections( connections )
+ results = {}
+ start = Time.now
+
+ # First strip out all the ones that failed in the first #connect
+ connections.delete_if do |sock, (identifier, _)|
+ next false if sock.respond_to?( :recvfrom_nonblock ) # Keep sockets
+ self.log.debug " removing connect error for node %s" % [ identifier ]
+ results[ identifier ] = { error: sock.message }
+ end
+
+ # Test all connections
+ connections.each do |sock, (identifier, _)|
+ begin
+ sock.recvfrom_nonblock( 1 )
+
+ rescue IO::WaitReadable
+ ready, _, _ = IO.select( [sock], [], [], self.timeout )
+ if ready.nil?
+ now = Time.now
+ results[ identifier ] = {
+ udp_socket_connect: { time: now.to_s, duration: now - start }
+ }
+ self.log.debug " connection successful"
+ else
+ retry
+ end
+
+ rescue SocketError, SystemCallError => err
+ self.log.debug "%p during connection: %s" % [ err.class, err.message ]
+ results[ identifier ] = { error: err.message }
+
+ ensure
+ sock.close
+ end
+ end
+
+ return results
+ end
+ end # class UDP
end # module Arborist::Monitor::Socket