Sha256: 1c466fd2c14183565b5def26aa8830f49ca43f1766e4200b87aa64cced08a544

Contents?: true

Size: 1.71 KB

Versions: 1

Compression:

Stored size: 1.71 KB

Contents

require 'fluent/plugin/filter'

module Fluent::Plugin
  class ElasticsearchTimestampCheckFilter < Filter
    Fluent::Plugin.register_filter('elasticsearch_timestamp_check', self)

    def configure(conf)
      super
      require 'date'
    end

    def start
      super
    end

    def shutdown
      super
    end

    def filter(tag, time, record)
      %w{@timestamp timestamp time syslog_timestamp}.map do |field|
        record[field]
      end.compact.each do |timestamp|
        begin
          # all digit entry would be treated as epoch seconds or epoch millis
          if !!(timestamp =~ /\A[-+]?\d+\z/)
            num = timestamp.to_i
            # epoch second or epoch millis should be either 10 or 13 digits
            # other length should be considered invalid (until the next digit
            # rollover at 2286-11-20  17:46:40 Z
            next unless [10, 13].include?(num.to_s.length)
            record['@timestamp'] = record['fluent_converted_timestamp'] =
              Time.at(
                num / (10 ** (num.to_s.length - 10))
              ).strftime('%Y-%m-%dT%H:%M:%S.%L%z')
            break
          end

          # normal timestamp string processing
          record['@timestamp'] = record['fluent_converted_timestamp'] =
            DateTime.parse(timestamp).strftime('%Y-%m-%dT%H:%M:%S.%L%z')
          $log.debug("Timestamp parsed: #{record['@timestamp']}")
          break
        rescue ArgumentError
        end
      end

      unless record['fluent_converted_timestamp']
        record['@timestamp'] = record['fluent_added_timestamp'] =
          Time.now.strftime('%Y-%m-%dT%H:%M:%S.%L%z')
        $log.debug("Timestamp added: #{record['@timestamp']}")
      end

      record
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-elasticsearch-timestamp-check-0.2.4 lib/fluent/plugin/filter_elasticsearch_timestamp_check.rb