Sha256: 16217c920b7cd449992b07aa7f5891996d7c741cc794494d89cdbd5b18de0a99

Contents?: true

Size: 1.88 KB

Versions: 2

Compression:

Stored size: 1.88 KB

Contents

require 'fluent/plugin/filter'

module Fluent
  class Plugin::InfluxdbDeduplicationFilter < Plugin::Filter
    Fluent::Plugin.register_filter('influxdb_deduplication', self)

    config_param :time_key, :string, default: nil,
                 desc: <<-DESC
The output time key to use.
    DESC

    config_param :out_of_order, :string, default: nil,
                 desc: <<-DESC
If not nil, the field takes the value true if the record arrives in order and false otherwise
    DESC

    def configure(conf)
      super

      unless @time_key
        raise Fluent::ConfigError, "a time key must be set"
      end
    end

    def start
      super

      @last_timestamp = 0
      @sequence = 0
    end

    def filter(tag, time, record)
      if time.is_a?(Integer)
        input_time = Fluent::EventTime.new(time)
      elsif time.is_a?(Fluent::EventTime)
        input_time = time
      else
        @log.error("unreadable time")
        return nil
      end

      nano_time = input_time.sec * 1000000000

      if input_time.sec < @last_timestamp
        @log.debug("out of sequence timestamp")
        if @out_of_order
          record[@out_of_order] = true
          record[@time_key] = nano_time
        else
          @log.debug("out of order record dropped")
          return nil
        end
      elsif input_time.sec == @last_timestamp && @sequence < 999999999
        @sequence = @sequence + 1
        record[@time_key] = nano_time + @sequence
        if @out_of_order
          record[@out_of_order] = false
        end
      elsif input_time.sec == @last_timestamp && @sequence == 999999999
        @log.error("received more then 999999999 records in a second")
        return nil
      else
        @sequence = 0
        @last_timestamp = input_time.sec
        record[@time_key] = nano_time
        if @out_of_order
          record[@out_of_order] = false
        end
      end

      record
    end

  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
fluent-plugin-influxdb-deduplication-0.1.1 lib/fluent/plugin/filter_influxdb_deduplication.rb
fluent-plugin-influxdb-deduplication-0.1.0 lib/fluent/plugin/filter_influxdb_deduplication.rb