Sha256: 5211fe58f2cf76346d4ba1db826b9e3f7e55f85c90ab047ce6148ea9b6b7ffb9

Contents?: true

Size: 1.86 KB

Versions: 1

Compression:

Stored size: 1.86 KB

Contents

# encoding: utf-8
#
module LogStash
  class FilterDelegator
    extend Forwardable
    DELEGATED_METHODS = [
      :register,
      :close,
      :threadsafe?,
      :do_close,
      :do_stop,
      :periodic_flush
    ]
    def_delegators :@filter, *DELEGATED_METHODS

    def initialize(logger, klass, metric, *args)
      options = args.reduce({}, :merge)

      @logger = logger
      @klass = klass
      @filter = klass.new(options)

      # Scope the metrics to the plugin
      namespaced_metric = metric.namespace(@filter.plugin_unique_name.to_sym)
      @filter.metric = namespaced_metric

      @metric_events = namespaced_metric.namespace(:events)
      namespaced_metric.gauge(:name, config_name)

      # Not all the filters will do bufferings
      define_flush_method if @filter.respond_to?(:flush)
    end

    def config_name
      @klass.config_name
    end

    def multi_filter(events)
      @metric_events.increment(:in, events.size)

      clock = @metric_events.time(:duration_in_millis)
      new_events = @filter.multi_filter(events)
      clock.stop

      # There is no garantee in the context of filter
      # that EVENTS_INT == EVENTS_OUT, see the aggregates and
      # the split filter
      c = new_events.count { |event| !event.cancelled? }
      @metric_events.increment(:out, c) if c > 0

      return new_events
    end

    private
    def define_flush_method
      define_singleton_method(:flush) do |options = {}|
        # we also need to trace the number of events
        # coming from a specific filters.
        new_events = @filter.flush(options)

        # Filter plugins that does buffering or spooling of events like the
        # `Logstash-filter-aggregates` can return `NIL` and will flush on the next flush ticks.
        @metric_events.increment(:out, new_events.size) if new_events && new_events.size > 0
        new_events
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
logstash-core-5.0.0.alpha5.snapshot1-java lib/logstash/filter_delegator.rb