lib/arborist/monitor/socket.rb in arborist-0.2.0.pre20170519125456 vs lib/arborist/monitor/socket.rb in arborist-0.2.0
- old
+ new
@@ -5,80 +5,73 @@
require 'loggability'
require 'timeout'
require 'socket'
require 'arborist/monitor' unless defined?( Arborist::Monitor )
+require 'arborist/monitor/connection_batching'
-using Arborist::TimeRefinements
-
# Socket-related Arborist monitor logic
module Arborist::Monitor::Socket
+ extend Configurability
+
+ configurability( 'arborist.monitors.socket' ) do
+
+ ##
+ # The default timeout employed by the socket monitors, in floating-point
+ # seconds.
+ setting :default_timeout, default: 2.0 do |val|
+ Float( val )
+ end
+
+ end
+
+
# Arborist TCP socket monitor logic
class TCP
extend Loggability
+ include Arborist::Monitor::ConnectionBatching
+
log_to :arborist
- # Defaults for instances of this monitor
- DEFAULT_OPTIONS = {
- timeout: 2.seconds
- }
+ # Always request the node addresses and port.
+ USED_PROPERTIES = [ :addresses, :port ].freeze
### Instantiate a monitor check and run it for the specified +nodes+.
def self::run( nodes )
return self.new.run( nodes )
end
+ ### Return the properties used by this monitor.
+ def self::node_properties
+ return USED_PROPERTIES
+ end
+
+
### Create a new TCP monitor with the specified +options+. Valid options are:
###
### +:timeout+
### Set the number of seconds to wait for a connection for each node.
- def initialize( options=DEFAULT_OPTIONS )
- options = DEFAULT_OPTIONS.merge( options || {} )
-
- options.each do |name, value|
- self.public_send( "#{name}=", value )
- end
+ def initialize( timeout: Arborist::Monitor::Socket.default_timeout )
+ self.timeout = timeout
end
######
public
######
- # The timeout for connecting, in seconds.
- attr_accessor :timeout
+ ### Return an Enumerator that lazily yields Hashes of the form expected by the
+ ### ConnectionBatching mixin for each of the specified +nodes+.
+ def make_connections_enum( nodes )
+ return nodes.lazy.map do |identifier, node_data|
+ self.log.debug "Creating a socket for %s" % [ identifier ]
-
- ### Run the TCP check for each of the specified Hash of +nodes+ and return a Hash of
- ### updates for them based on trying to connect to them.
- def run( nodes )
- self.log.debug "Got nodes to TCP check: %p" % [ nodes ]
-
- connections = self.make_connections( nodes )
- return self.wait_for_connections( connections )
- end
-
-
- ### Return a clone of this object with its timeout set to +new_timeout+.
- def with_timeout( new_timeout )
- copy = self.clone
- copy.timeout = new_timeout
- return copy
- end
-
-
- ### Open a socket for each of the specified nodes using non-blocking connect(2), and
- ### return a Hash of the sockets (or the error from the connection attempt) keyed by
- ### node identifier.
- def make_connections( nodes )
- return nodes.each_with_object( {} ) do |(identifier, node_data), accum|
-
# :TODO: Should this try all the addresses? Should you be able to specify an
# address for a Service?
address = node_data['addresses'].first
port = node_data['port']
sockaddr = nil
@@ -87,199 +80,140 @@
sock = Socket.new( :INET, :STREAM )
conn = begin
sockaddr = Socket.sockaddr_in( port, address )
sock.connect_nonblock( sockaddr )
+ sock
rescue Errno::EINPROGRESS
self.log.debug " connection started"
sock
rescue => err
self.log.error " %p setting up connection: %s" % [ err.class, err.message ]
err
end
- accum[ conn ] = [ identifier, sockaddr ]
+ { conn: conn, identifier: identifier }
end
end
- ### For any elements of +connections+ that are sockets, wait on them to complete or error
- ### and then return a Hash of node updates keyed by identifier based on the results.
- def wait_for_connections( connections )
- results = {}
- start = Time.now
- timeout_at = Time.now + self.timeout
-
- # First strip out all the ones that failed in the first #connect_nonblock
- connections.delete_if do |sock, (identifier, _)|
- next false if sock.respond_to?( :connect_nonblock ) # Keep sockets
- self.log.debug " removing connect error for node %s" % [ identifier ]
- results[ identifier ] = { error: sock.message }
+ ### Build a status for the specified +conn_hash+ after its :conn has indicated
+ ### it is ready.
+ def status_for_conn( conn_hash, duration )
+ sock = conn_hash[:conn]
+ # Why getpeername? Testing socket success without read()ing, I think?
+ # FreeBSD source?
+ res = sock.getpeername
+ return {
+ tcp_socket_connect: { duration: duration }
+ }
+ rescue SocketError, SystemCallError => err
+ self.log.debug "Got %p while connecting to %s" % [ err.class, conn_hash[:identifier] ]
+ begin
+ sock.read( 1 )
+ rescue => err
+ return { error: err.message }
end
-
- # Now wait for connections to complete
- wait_seconds = timeout_at - Time.now
- until connections.empty? || wait_seconds <= 0
- self.log.debug "Waiting on %d connections for %0.3fs..." %
- [ connections.values.length, wait_seconds ]
-
- _, ready, _ = IO.select( nil, connections.keys, nil, wait_seconds )
-
- now = Time.now
- ready.each do |sock|
- identifier, sockaddr = *connections.delete( sock )
-
- begin
- res = sock.getpeername
- self.log.debug "connected to %s" % [ identifier ]
- results[ identifier ] = {
- tcp_socket_connect: { time: now.iso8601, duration: now - start }
- }
- rescue SocketError, SystemCallError => err
- begin
- sock.read( 1 )
- rescue => err
- self.log.debug "read: %p: %s" % [ err.class, err.message ]
- results[ identifier ] = { error: err.message }
- end
- ensure
- sock.close
- end
-
- end if ready
-
- wait_seconds = timeout_at - Time.now
- end
-
- # Anything left is a timeout
- connections.each do |sock, (identifier, _)|
- self.log.debug "%s: timeout (no connection in %0.3ds)" % [ identifier, self.timeout ]
- results[ identifier ] = { error: "Timeout after %0.3fs" % [self.timeout] }
- sock.close
- end
-
- return results
+ ensure
+ sock.close if sock
end
+
end # class TCP
# Arborist UDP socket monitor logic
class UDP
extend Loggability
+ include Arborist::Monitor::ConnectionBatching
+
log_to :arborist
# Defaults for instances of this monitor
DEFAULT_OPTIONS = {
timeout: 0.001
}
+ # Always request the node addresses and port.
+ USED_PROPERTIES = [ :addresses, :port ].freeze
+
### Instantiate a monitor check and run it for the specified +nodes+.
def self::run( nodes )
return self.new.run( nodes )
end
+ ### Return the properties used by this monitor.
+ def self::node_properties
+ return USED_PROPERTIES
+ end
+
+
### Create a new UDP monitor with the specified +options+. Valid options are:
###
### +:timeout+
### Set the number of seconds to wait for a connection for each node.
- def initialize( options=DEFAULT_OPTIONS )
- options = DEFAULT_OPTIONS.merge( options || {} )
-
- options.each do |name, value|
- self.public_send( "#{name}=", value )
- end
+ def initialize( timeout: Arborist::Monitor::Socket.default_timeout )
+ self.timeout = timeout
end
######
public
######
- # The timeout for connecting, in seconds.
- attr_accessor :timeout
-
-
- ### Run the UDP check for each of the specified Hash of +nodes+ and return a Hash of
- ### updates for them based on trying to connect to them.
- def run( nodes )
- self.log.debug "Got nodes to UDP check: %p" % [ nodes ]
-
- connections = self.make_connections( nodes )
- return self.wait_for_connections( connections )
- end
-
-
### Open a socket for each of the specified nodes and return a Hash of
### the sockets (or the error from the connection attempt) keyed by
### node identifier.
- def make_connections( nodes )
- return nodes.each_with_object( {} ) do |(identifier, node_data), accum|
-
+ def make_connections_enum( nodes )
+ return nodes.lazy.map do |identifier, node_data|
address = node_data['addresses'].first
port = node_data['port']
self.log.debug "Creating UDP connection for %s:%d" % [ address, port ]
sock = Socket.new( :INET, :DGRAM )
conn = begin
- sockaddr = Socket.sockaddr_in( port, address )
- sock.connect( sockaddr )
- sock.send( '', 0 )
- sock
- rescue SocketError => err
- self.log.error " %p setting up connection: %s" % [ err.class, err.message ]
- err
- end
+ sockaddr = Socket.sockaddr_in( port, address )
+ sock.connect( sockaddr )
+ sock.send( '', 0 )
+ sock.recvfrom_nonblock( 1 )
+ sock
+ rescue Errno::EAGAIN
+ self.log.debug " connection started"
+ sock
+ rescue => err
+ self.log.error " %p setting up connection: %s" % [ err.class, err.message ]
+ err
+ end
- accum[ conn ] = [ identifier, sock ]
+ self.log.debug "UDP connection object is: %p" % [ conn ]
+ { conn: conn, identifier: identifier }
end
end
- ### For any elements of +connections+ that are sockets, wait on them to complete or error
- ### and then return a Hash of node updates keyed by identifier based on the results.
- def wait_for_connections( connections )
- results = {}
- start = Time.now
+ ### Build a status for the specified +conn_hash+ after its :conn has indicated
+ ### it is ready.
+ def status_for_conn( conn_hash, duration )
+ sock = conn_hash[:conn]
+ sock.recvfrom_nonblock( 1 )
+ return {
+ udp_socket_connect: { duration: duration }
+ }
+ rescue Errno::EAGAIN
+ return {
+ udp_socket_connect: { duration: duration }
+ }
+ rescue SocketError, SystemCallError => err
+ self.log.debug "Got %p while connecting to %s" % [ err.class, conn_hash[:identifier] ]
+ return { error: err.message }
+ ensure
+ sock.close if sock
+ end
- # First strip out all the ones that failed in the first #connect
- connections.delete_if do |sock, (identifier, _)|
- next false if sock.respond_to?( :recvfrom_nonblock ) # Keep sockets
- self.log.debug " removing connect error for node %s" % [ identifier ]
- results[ identifier ] = { error: sock.message }
- end
- # Test all connections
- connections.each do |sock, (identifier, _)|
- begin
- sock.recvfrom_nonblock( 1 )
-
- rescue IO::WaitReadable
- ready, _, _ = IO.select( [sock], [], [], self.timeout )
- if ready.nil?
- now = Time.now
- results[ identifier ] = {
- udp_socket_connect: { time: now.iso8601, duration: now - start }
- }
- self.log.debug " connection successful"
- else
- retry
- end
-
- rescue SocketError, SystemCallError => err
- self.log.debug "%p during connection: %s" % [ err.class, err.message ]
- results[ identifier ] = { error: err.message }
-
- ensure
- sock.close
- end
- end
-
- return results
- end
end # class UDP
end # module Arborist::Monitor::Socket