lib/riemann/tools/http_check.rb in riemann-tools-1.10.0 vs lib/riemann/tools/http_check.rb in riemann-tools-1.11.0
- old
+ new
@@ -30,19 +30,28 @@
opt :http_timeout, 'Timeout (in seconds) for HTTP requests', short: :none, default: 5.0
opt :checks, 'A list of checks to run.', short: :none, type: :strings, default: %w[consistency connection-latency response-code response-latency]
opt :resolvers, 'Run this number of resolver threads', short: :none, type: :integer, default: 5
opt :workers, 'Run this number of worker threads', short: :none, type: :integer, default: 20
opt :user_agent, 'User-Agent header for HTTP requests', short: :none, default: "#{File.basename($PROGRAM_NAME)}/#{Riemann::Tools::VERSION} (+https://github.com/riemann/riemann-tools)"
+ opt :ignored_asn, 'Ignore addresses belonging to these ASN', short: :none, type: :integers, default: []
+ opt :geoip_asn_database, 'Path to the GeoIP ASN database', short: :none, default: '/usr/share/GeoIP/GeoLite2-ASN.mmdb'
def initialize
+ super
+
@resolve_queue = Queue.new
@work_queue = Queue.new
+ @resolvers = []
+ @workers = []
+
opts[:resolvers].times do
- Thread.new do
+ @resolvers << Thread.new do
loop do
uri = @resolve_queue.pop
+ Thread.exit unless uri
+
host = uri.host
addresses = Resolv::DNS.new.getaddresses(host)
if addresses.empty?
host = host[1...-1] if host[0] == '[' && host[-1] == ']'
@@ -51,27 +60,67 @@
rescue IPAddr::InvalidAddressError
# Ignore
end
end
+ if opts[:ignored_asn].any?
+ addresses.reject! do |address|
+ address_belongs_to_ignored_asn?(address)
+ end
+ end
+
+ next if addresses.empty?
+
@work_queue.push([uri, addresses])
end
end
end
opts[:workers].times do
- Thread.new do
+ @workers << Thread.new do
loop do
uri, addresses = @work_queue.pop
+ Thread.exit unless uri
+
test_uri_addresses(uri, addresses)
end
end
end
+ end
- super
+ def address_belongs_to_ignored_asn?(address)
+ begin
+ require 'maxmind/geoip2'
+ rescue LoadError
+ raise StandardError, 'MaxMind::GeoIP2 is not available. Please install the maxmind-geoip2 gem for filtering by ASN.'
+ end
+
+ @reader ||= MaxMind::GeoIP2::Reader.new(database: opts[:geoip_asn_database])
+ asn = @reader.asn(address.to_s)
+
+ opts[:ignored_asn].include?(asn&.autonomous_system_number)
+ rescue MaxMind::GeoIP2::AddressNotFoundError
+ false
end
+ # Under normal operation, we have a single instance of this class for the
+ # lifetime of the process. But when testing, we create a new instance
+ # for each test, each with its resolvers and worker threads. The test
+ # process may end-up with a lot of running threads, hitting the OS limit
+ # of max threads by process and being unable to create more thread:
+ #
+ # ThreadError: can't create Thread: Resource temporarily unavailable
+ #
+ # To avoid this situation, we provide this method.
+ def shutdown
+ @resolve_queue.close
+ @resolvers.map(&:join)
+
+ @work_queue.close
+ @workers.map(&:join)
+ end
+
def tick
report(
service: 'riemann http-check resolvers utilization',
metric: (opts[:resolvers].to_f - @resolve_queue.num_waiting) / opts[:resolvers],
state: @resolve_queue.num_waiting.positive? ? 'ok' : 'critical',
@@ -102,14 +151,12 @@
end
def test_uri_addresses(uri, addresses)
request = get_request(uri)
- responses = []
-
- addresses.each do |address|
- responses << test_uri_address(uri, address.to_s, request)
+ responses = addresses.map do |address|
+ test_uri_address(uri, address.to_s, request)
end
responses.compact!
return unless opts[:checks].include?('consistency')
@@ -263,12 +310,12 @@
}.merge(endpoint_report(http, uri, 'redirects')),
)
end
def latency_state(name, latency)
- critical_threshold = opts["#{name}_latency_critical".to_sym]
- warning_threshold = opts["#{name}_latency_warning".to_sym]
+ critical_threshold = opts[:"#{name}_latency_critical"]
+ warning_threshold = opts[:"#{name}_latency_warning"]
return if critical_threshold.zero? || warning_threshold.zero?
if latency.nil? || latency > critical_threshold
'critical'
@@ -298,17 +345,9 @@
def redact_uri(uri)
reported_uri = uri.dup
reported_uri.password = '**redacted**' if reported_uri.password
reported_uri
- end
-
- def endpoint_name(address, port)
- if address.ipv6?
- "[#{address}]:#{port}"
- else
- "#{address}:#{port}"
- end
end
end
end
end