lib/riemann/tools/http_check.rb in riemann-tools-1.10.0 vs lib/riemann/tools/http_check.rb in riemann-tools-1.11.0

- old
+ new

@@ -30,19 +30,28 @@ opt :http_timeout, 'Timeout (in seconds) for HTTP requests', short: :none, default: 5.0 opt :checks, 'A list of checks to run.', short: :none, type: :strings, default: %w[consistency connection-latency response-code response-latency] opt :resolvers, 'Run this number of resolver threads', short: :none, type: :integer, default: 5 opt :workers, 'Run this number of worker threads', short: :none, type: :integer, default: 20 opt :user_agent, 'User-Agent header for HTTP requests', short: :none, default: "#{File.basename($PROGRAM_NAME)}/#{Riemann::Tools::VERSION} (+https://github.com/riemann/riemann-tools)" + opt :ignored_asn, 'Ignore addresses belonging to these ASN', short: :none, type: :integers, default: [] + opt :geoip_asn_database, 'Path to the GeoIP ASN database', short: :none, default: '/usr/share/GeoIP/GeoLite2-ASN.mmdb' def initialize + super + @resolve_queue = Queue.new @work_queue = Queue.new + @resolvers = [] + @workers = [] + opts[:resolvers].times do - Thread.new do + @resolvers << Thread.new do loop do uri = @resolve_queue.pop + Thread.exit unless uri + host = uri.host addresses = Resolv::DNS.new.getaddresses(host) if addresses.empty? host = host[1...-1] if host[0] == '[' && host[-1] == ']' @@ -51,27 +60,67 @@ rescue IPAddr::InvalidAddressError # Ignore end end + if opts[:ignored_asn].any? + addresses.reject! do |address| + address_belongs_to_ignored_asn?(address) + end + end + + next if addresses.empty? + @work_queue.push([uri, addresses]) end end end opts[:workers].times do - Thread.new do + @workers << Thread.new do loop do uri, addresses = @work_queue.pop + Thread.exit unless uri + test_uri_addresses(uri, addresses) end end end + end - super + def address_belongs_to_ignored_asn?(address) + begin + require 'maxmind/geoip2' + rescue LoadError + raise StandardError, 'MaxMind::GeoIP2 is not available. Please install the maxmind-geoip2 gem for filtering by ASN.' + end + + @reader ||= MaxMind::GeoIP2::Reader.new(database: opts[:geoip_asn_database]) + asn = @reader.asn(address.to_s) + + opts[:ignored_asn].include?(asn&.autonomous_system_number) + rescue MaxMind::GeoIP2::AddressNotFoundError + false end + # Under normal operation, we have a single instance of this class for the + # lifetime of the process. But when testing, we create a new instance + # for each test, each with its resolvers and worker threads. The test + # process may end-up with a lot of running threads, hitting the OS limit + # of max threads by process and being unable to create more thread: + # + # ThreadError: can't create Thread: Resource temporarily unavailable + # + # To avoid this situation, we provide this method. + def shutdown + @resolve_queue.close + @resolvers.map(&:join) + + @work_queue.close + @workers.map(&:join) + end + def tick report( service: 'riemann http-check resolvers utilization', metric: (opts[:resolvers].to_f - @resolve_queue.num_waiting) / opts[:resolvers], state: @resolve_queue.num_waiting.positive? ? 'ok' : 'critical', @@ -102,14 +151,12 @@ end def test_uri_addresses(uri, addresses) request = get_request(uri) - responses = [] - - addresses.each do |address| - responses << test_uri_address(uri, address.to_s, request) + responses = addresses.map do |address| + test_uri_address(uri, address.to_s, request) end responses.compact! return unless opts[:checks].include?('consistency') @@ -263,12 +310,12 @@ }.merge(endpoint_report(http, uri, 'redirects')), ) end def latency_state(name, latency) - critical_threshold = opts["#{name}_latency_critical".to_sym] - warning_threshold = opts["#{name}_latency_warning".to_sym] + critical_threshold = opts[:"#{name}_latency_critical"] + warning_threshold = opts[:"#{name}_latency_warning"] return if critical_threshold.zero? || warning_threshold.zero? if latency.nil? || latency > critical_threshold 'critical' @@ -298,17 +345,9 @@ def redact_uri(uri) reported_uri = uri.dup reported_uri.password = '**redacted**' if reported_uri.password reported_uri - end - - def endpoint_name(address, port) - if address.ipv6? - "[#{address}]:#{port}" - else - "#{address}:#{port}" - end end end end end