# encoding: utf-8 require "logstash/filters/base" require "logstash/namespace" require "lru_redux" # The DNS filter performs a lookup (either an A record/CNAME record lookup # or a reverse lookup at the PTR record) on records specified under the # `reverse` and `resolve` arrays. # # The config should look like this: # [source,ruby] # filter { # dns { # type => 'type' # reverse => [ "source_host", "field_with_address" ] # resolve => [ "field_with_fqdn" ] # action => "replace" # } # } # # Caveats: Currently there is no way to specify a timeout in the DNS lookup. # # This filter, like all filters, only processes 1 event at a time, so the use # of this plugin can significantly slow down your pipeline's throughput if you # have a high latency network. By way of example, if each DNS lookup takes 2 # milliseconds, the maximum throughput you can achieve with a single filter # worker is 500 events per second (1000 milliseconds / 2 milliseconds). class LogStash::Filters::DNS < LogStash::Filters::Base # TODO(sissel): The timeout limitation does seem to be fixed in here: http://redmine.ruby-lang.org/issues/5100 # but isn't currently in JRuby. # TODO(sissel): make `action` required? This was always the intent, but it # due to a typo it was never enforced. Thus the default behavior in past # versions was `append` by accident. config_name "dns" # Reverse resolve one or more fields. config :reverse, :validate => :array # Forward resolve one or more fields. config :resolve, :validate => :array # Determine what action to do: append or replace the values in the fields # specified under `reverse` and `resolve`. config :action, :validate => [ "append", "replace" ], :default => "append" # Use custom nameserver(s). For example: `["8.8.8.8", "8.8.4.4"]` config :nameserver, :validate => :array # `resolv` calls will be wrapped in a timeout instance config :timeout, :validate => :number, :default => 0.5 # number of times to retry a failed resolve/reverse config :max_retries, :validate => :number, :default => 2 # set the size of cache for successful requests config :hit_cache_size, :validate => :number, :default => 0 # how long to cache successful requests (in seconds) config :hit_cache_ttl, :validate => :number, :default => 60 # cache size for failed requests (Resolv:: config :failed_cache_size, :validate => :number, :default => 0 # how long to cache failed requests (in seconds) config :failed_cache_ttl, :validate => :number, :default => 5 public def register require "resolv" require "timeout" if @nameserver.nil? @resolv = Resolv.new else @resolv = Resolv.new(resolvers=[::Resolv::Hosts.new, ::Resolv::DNS.new(:nameserver => @nameserver, :search => [], :ndots => 1)]) end if @hit_cache_size > 0 @hit_cache = LruRedux::ThreadSafeCache.new(@hit_cache_size, @hit_cache_ttl) end if @failed_cache_size > 0 @failed_cache = LruRedux::ThreadSafeCache.new(@failed_cache_size, @failed_cache_ttl) end @ip_validator = Resolv::AddressRegex end # def register public def filter(event) if @resolve return if resolve(event).nil? end if @reverse return if reverse(event).nil? end filter_matched(event) end private def resolve(event) @resolve.each do |field| is_array = false raw = event[field] if raw.is_a?(Array) is_array = true if raw.length > 1 @logger.warn("DNS: skipping resolve, can't deal with multiple values", :field => field, :value => raw) return end raw = raw.first end begin return if @failed_cache && @failed_cache[raw] # recently failed resolv, skip if @hit_cache address = @hit_cache.getset(raw) { retriable_getaddress(raw) } else address = retriable_getaddress(raw) end rescue Resolv::ResolvError @failed_cache[raw] = true if @failed_cache @logger.debug("DNS: couldn't resolve the hostname.", :field => field, :value => raw) return rescue Resolv::ResolvTimeout, Timeout::Error @logger.error("DNS: timeout on resolving the hostname.", :field => field, :value => raw) return rescue SocketError => e @logger.error("DNS: Encountered SocketError.", :field => field, :value => raw, :message => e.message) return end if @action == "replace" if is_array event[field] = [address] else event[field] = address end else if !is_array event[field] = [event[field], address] else event[field] << address end end end end private def reverse(event) @reverse.each do |field| raw = event[field] is_array = false if raw.is_a?(Array) is_array = true if raw.length > 1 @logger.warn("DNS: skipping reverse, can't deal with multiple values", :field => field, :value => raw) return end raw = raw.first end if ! @ip_validator.match(raw) @logger.debug("DNS: not an address", :field => field, :value => event[field]) return end begin return if @failed_cache && @failed_cache.key?(raw) # recently failed resolv, skip if @hit_cache hostname = @hit_cache.getset(raw) { retriable_getname(raw) } else hostname = retriable_getname(raw) end rescue Resolv::ResolvError @failed_cache[raw] = true if @failed_cache @logger.debug("DNS: couldn't resolve the address.", :field => field, :value => raw) return rescue Resolv::ResolvTimeout, Timeout::Error @logger.error("DNS: timeout on resolving address.", :field => field, :value => raw) return rescue SocketError => e @logger.error("DNS: Encountered SocketError.", :field => field, :value => raw, :message => e.message) return end if @action == "replace" if is_array event[field] = [hostname] else event[field] = hostname end else if !is_array event[field] = [event[field], hostname] else event[field] << hostname end end end end private def retriable_request(&block) tries = 0 begin Timeout::timeout(@timeout) do block.call end rescue Timeout::Error, SocketError if tries < @max_retries tries = tries + 1 retry else raise end end end private def retriable_getname(address) retriable_request do getname(address) end end private def retriable_getaddress(name) retriable_request do getaddress(name) end end private def getname(address) @resolv.getname(address).force_encoding(Encoding::UTF_8) end private def getaddress(name) @resolv.getaddress(name).force_encoding(Encoding::UTF_8) end end # class LogStash::Filters::DNS