lib/arborist/monitor/socket.rb in arborist-0.2.0.pre20170519125456 vs lib/arborist/monitor/socket.rb in arborist-0.2.0

- old
+ new

@@ -5,80 +5,73 @@ require 'loggability' require 'timeout' require 'socket' require 'arborist/monitor' unless defined?( Arborist::Monitor ) +require 'arborist/monitor/connection_batching' -using Arborist::TimeRefinements - # Socket-related Arborist monitor logic module Arborist::Monitor::Socket + extend Configurability + + configurability( 'arborist.monitors.socket' ) do + + ## + # The default timeout employed by the socket monitors, in floating-point + # seconds. + setting :default_timeout, default: 2.0 do |val| + Float( val ) + end + + end + + # Arborist TCP socket monitor logic class TCP extend Loggability + include Arborist::Monitor::ConnectionBatching + log_to :arborist - # Defaults for instances of this monitor - DEFAULT_OPTIONS = { - timeout: 2.seconds - } + # Always request the node addresses and port. + USED_PROPERTIES = [ :addresses, :port ].freeze ### Instantiate a monitor check and run it for the specified +nodes+. def self::run( nodes ) return self.new.run( nodes ) end + ### Return the properties used by this monitor. + def self::node_properties + return USED_PROPERTIES + end + + ### Create a new TCP monitor with the specified +options+. Valid options are: ### ### +:timeout+ ### Set the number of seconds to wait for a connection for each node. - def initialize( options=DEFAULT_OPTIONS ) - options = DEFAULT_OPTIONS.merge( options || {} ) - - options.each do |name, value| - self.public_send( "#{name}=", value ) - end + def initialize( timeout: Arborist::Monitor::Socket.default_timeout ) + self.timeout = timeout end ###### public ###### - # The timeout for connecting, in seconds. - attr_accessor :timeout + ### Return an Enumerator that lazily yields Hashes of the form expected by the + ### ConnectionBatching mixin for each of the specified +nodes+. + def make_connections_enum( nodes ) + return nodes.lazy.map do |identifier, node_data| + self.log.debug "Creating a socket for %s" % [ identifier ] - - ### Run the TCP check for each of the specified Hash of +nodes+ and return a Hash of - ### updates for them based on trying to connect to them. - def run( nodes ) - self.log.debug "Got nodes to TCP check: %p" % [ nodes ] - - connections = self.make_connections( nodes ) - return self.wait_for_connections( connections ) - end - - - ### Return a clone of this object with its timeout set to +new_timeout+. - def with_timeout( new_timeout ) - copy = self.clone - copy.timeout = new_timeout - return copy - end - - - ### Open a socket for each of the specified nodes using non-blocking connect(2), and - ### return a Hash of the sockets (or the error from the connection attempt) keyed by - ### node identifier. - def make_connections( nodes ) - return nodes.each_with_object( {} ) do |(identifier, node_data), accum| - # :TODO: Should this try all the addresses? Should you be able to specify an # address for a Service? address = node_data['addresses'].first port = node_data['port'] sockaddr = nil @@ -87,199 +80,140 @@ sock = Socket.new( :INET, :STREAM ) conn = begin sockaddr = Socket.sockaddr_in( port, address ) sock.connect_nonblock( sockaddr ) + sock rescue Errno::EINPROGRESS self.log.debug " connection started" sock rescue => err self.log.error " %p setting up connection: %s" % [ err.class, err.message ] err end - accum[ conn ] = [ identifier, sockaddr ] + { conn: conn, identifier: identifier } end end - ### For any elements of +connections+ that are sockets, wait on them to complete or error - ### and then return a Hash of node updates keyed by identifier based on the results. - def wait_for_connections( connections ) - results = {} - start = Time.now - timeout_at = Time.now + self.timeout - - # First strip out all the ones that failed in the first #connect_nonblock - connections.delete_if do |sock, (identifier, _)| - next false if sock.respond_to?( :connect_nonblock ) # Keep sockets - self.log.debug " removing connect error for node %s" % [ identifier ] - results[ identifier ] = { error: sock.message } + ### Build a status for the specified +conn_hash+ after its :conn has indicated + ### it is ready. + def status_for_conn( conn_hash, duration ) + sock = conn_hash[:conn] + # Why getpeername? Testing socket success without read()ing, I think? + # FreeBSD source? + res = sock.getpeername + return { + tcp_socket_connect: { duration: duration } + } + rescue SocketError, SystemCallError => err + self.log.debug "Got %p while connecting to %s" % [ err.class, conn_hash[:identifier] ] + begin + sock.read( 1 ) + rescue => err + return { error: err.message } end - - # Now wait for connections to complete - wait_seconds = timeout_at - Time.now - until connections.empty? || wait_seconds <= 0 - self.log.debug "Waiting on %d connections for %0.3fs..." % - [ connections.values.length, wait_seconds ] - - _, ready, _ = IO.select( nil, connections.keys, nil, wait_seconds ) - - now = Time.now - ready.each do |sock| - identifier, sockaddr = *connections.delete( sock ) - - begin - res = sock.getpeername - self.log.debug "connected to %s" % [ identifier ] - results[ identifier ] = { - tcp_socket_connect: { time: now.iso8601, duration: now - start } - } - rescue SocketError, SystemCallError => err - begin - sock.read( 1 ) - rescue => err - self.log.debug "read: %p: %s" % [ err.class, err.message ] - results[ identifier ] = { error: err.message } - end - ensure - sock.close - end - - end if ready - - wait_seconds = timeout_at - Time.now - end - - # Anything left is a timeout - connections.each do |sock, (identifier, _)| - self.log.debug "%s: timeout (no connection in %0.3ds)" % [ identifier, self.timeout ] - results[ identifier ] = { error: "Timeout after %0.3fs" % [self.timeout] } - sock.close - end - - return results + ensure + sock.close if sock end + end # class TCP # Arborist UDP socket monitor logic class UDP extend Loggability + include Arborist::Monitor::ConnectionBatching + log_to :arborist # Defaults for instances of this monitor DEFAULT_OPTIONS = { timeout: 0.001 } + # Always request the node addresses and port. + USED_PROPERTIES = [ :addresses, :port ].freeze + ### Instantiate a monitor check and run it for the specified +nodes+. def self::run( nodes ) return self.new.run( nodes ) end + ### Return the properties used by this monitor. + def self::node_properties + return USED_PROPERTIES + end + + ### Create a new UDP monitor with the specified +options+. Valid options are: ### ### +:timeout+ ### Set the number of seconds to wait for a connection for each node. - def initialize( options=DEFAULT_OPTIONS ) - options = DEFAULT_OPTIONS.merge( options || {} ) - - options.each do |name, value| - self.public_send( "#{name}=", value ) - end + def initialize( timeout: Arborist::Monitor::Socket.default_timeout ) + self.timeout = timeout end ###### public ###### - # The timeout for connecting, in seconds. - attr_accessor :timeout - - - ### Run the UDP check for each of the specified Hash of +nodes+ and return a Hash of - ### updates for them based on trying to connect to them. - def run( nodes ) - self.log.debug "Got nodes to UDP check: %p" % [ nodes ] - - connections = self.make_connections( nodes ) - return self.wait_for_connections( connections ) - end - - ### Open a socket for each of the specified nodes and return a Hash of ### the sockets (or the error from the connection attempt) keyed by ### node identifier. - def make_connections( nodes ) - return nodes.each_with_object( {} ) do |(identifier, node_data), accum| - + def make_connections_enum( nodes ) + return nodes.lazy.map do |identifier, node_data| address = node_data['addresses'].first port = node_data['port'] self.log.debug "Creating UDP connection for %s:%d" % [ address, port ] sock = Socket.new( :INET, :DGRAM ) conn = begin - sockaddr = Socket.sockaddr_in( port, address ) - sock.connect( sockaddr ) - sock.send( '', 0 ) - sock - rescue SocketError => err - self.log.error " %p setting up connection: %s" % [ err.class, err.message ] - err - end + sockaddr = Socket.sockaddr_in( port, address ) + sock.connect( sockaddr ) + sock.send( '', 0 ) + sock.recvfrom_nonblock( 1 ) + sock + rescue Errno::EAGAIN + self.log.debug " connection started" + sock + rescue => err + self.log.error " %p setting up connection: %s" % [ err.class, err.message ] + err + end - accum[ conn ] = [ identifier, sock ] + self.log.debug "UDP connection object is: %p" % [ conn ] + { conn: conn, identifier: identifier } end end - ### For any elements of +connections+ that are sockets, wait on them to complete or error - ### and then return a Hash of node updates keyed by identifier based on the results. - def wait_for_connections( connections ) - results = {} - start = Time.now + ### Build a status for the specified +conn_hash+ after its :conn has indicated + ### it is ready. + def status_for_conn( conn_hash, duration ) + sock = conn_hash[:conn] + sock.recvfrom_nonblock( 1 ) + return { + udp_socket_connect: { duration: duration } + } + rescue Errno::EAGAIN + return { + udp_socket_connect: { duration: duration } + } + rescue SocketError, SystemCallError => err + self.log.debug "Got %p while connecting to %s" % [ err.class, conn_hash[:identifier] ] + return { error: err.message } + ensure + sock.close if sock + end - # First strip out all the ones that failed in the first #connect - connections.delete_if do |sock, (identifier, _)| - next false if sock.respond_to?( :recvfrom_nonblock ) # Keep sockets - self.log.debug " removing connect error for node %s" % [ identifier ] - results[ identifier ] = { error: sock.message } - end - # Test all connections - connections.each do |sock, (identifier, _)| - begin - sock.recvfrom_nonblock( 1 ) - - rescue IO::WaitReadable - ready, _, _ = IO.select( [sock], [], [], self.timeout ) - if ready.nil? - now = Time.now - results[ identifier ] = { - udp_socket_connect: { time: now.iso8601, duration: now - start } - } - self.log.debug " connection successful" - else - retry - end - - rescue SocketError, SystemCallError => err - self.log.debug "%p during connection: %s" % [ err.class, err.message ] - results[ identifier ] = { error: err.message } - - ensure - sock.close - end - end - - return results - end end # class UDP end # module Arborist::Monitor::Socket