# { # "index": { # "stamp": "", # "prefix": "", # "suffix": "", # "format": "" # } # } module Anschel class Filter def index conf, stats, log stamp = conf.delete(:stamp) || '@timestamp' prefix = conf.delete(:prefix) || 'logs-%{type}-' suffix = conf.delete(:suffix) || '%Y.%m.%d' format = conf.delete(:format) || %w[ yyyy-MM-dd'T'HH:mm:ss.SSSZZ yyyy-MM-dd'T'HH:mm:ss.SSSZ yyyy-MM-dd'T'HH:mm:ssZZ yyyy-MM-dd'T'HH:mm:ssZ ] # ISO8601 routing = conf.delete(:routing) error_tag = conf.has_key?(:error_tag) ? conf[:error_tag] : 'index-error' stamp = stamp.to_sym format = [ format ] unless format.is_a? Array joda = format.map do |f| j = org.joda.time.format.DateTimeFormat.forPattern f j.withDefaultYear(Time.new.year).withOffsetParsed end stats.create 'filter-index' stats.create 'filter-index-skipped' stats.create 'filter-index-error' log.trace event: 'filter-compiled', kind: 'index', \ stamp: stamp, prefix: prefix, suffix: suffix, format: format, routing: routing lambda do |event| idx_prefix = prefix % event event[:_routing] = routing % event if routing stamped = event.has_key? stamp matched = false joda.each do |j| begin millis = j.parseMillis event[stamp] idx_suffix = Time.at(0.001 * millis).utc.strftime(suffix) event[:_index] = idx_prefix + idx_suffix stats.inc 'filter-index' matched = true rescue java.lang.IllegalArgumentException => e end break if matched end if stamped return filtered(event, conf) if matched timestamp = Time.now.utc event[stamp] = timestamp.iso8601(3) unless stamped event[:_index] = idx_prefix + timestamp.strftime(suffix) log.debug \ event: 'filter-index-warning', reason: 'could not parse event', remediation: 'added bogus index', remediation: "sending to best-guess index '#{event[:_index]}'", stamp: stamp, prefix: prefix, suffix: suffix, format: format, raw_event: event if error_tag event[:tags] ||= [] event[:tags] << error_tag end stats.inc 'filter-index-error' stats.inc 'filter-index' filtered event, conf end end end end