Sha256: f8a710c6394ab60cb95871f9f1bc43a0757345842c1405144e924c1ed9f1a7cf

Contents?: true

Size: 1.73 KB

Versions: 16

Compression:

Stored size: 1.73 KB

Contents

# encoding: utf-8
#
module LogStash
  class FilterDelegator
    extend Forwardable
    DELEGATED_METHODS = [
      :register,
      :close,
      :threadsafe?,
      :do_close,
      :do_stop,
      :periodic_flush
    ]
    def_delegators :@filter, *DELEGATED_METHODS

    def initialize(logger, klass, metric, *args)
      options = args.reduce({}, :merge)

      @logger = logger
      @klass = klass
      @filter = klass.new(options)

      # Scope the metrics to the plugin
      namespaced_metric = metric.namespace(@filter.plugin_unique_name.to_sym)
      @filter.metric = metric

      @metric_events = namespaced_metric.namespace(:events)

      # Not all the filters will do bufferings
      define_flush_method if @filter.respond_to?(:flush)
    end

    def config_name
      @klass.config_name
    end

    def multi_filter(events)
      @metric_events.increment(:in, events.size)

      new_events = @filter.multi_filter(events)

      # There is no garantee in the context of filter
      # that EVENTS_INT == EVENTS_OUT, see the aggregates and
      # the split filter
      c = new_events.count { |event| !event.cancelled? }
      @metric_events.increment(:out, c) if c > 0

      return new_events
    end

    private
    def define_flush_method
      define_singleton_method(:flush) do |options = {}|
        # we also need to trace the number of events
        # coming from a specific filters.
        new_events = @filter.flush(options)

        # Filter plugins that does buffering or spooling of events like the
        # `Logstash-filter-aggregates` can return `NIL` and will flush on the next flush ticks.
        @metric_events.increment(:out, new_events.size) if new_events && new_events.size > 0
        new_events
      end
    end
  end
end

Version data entries

16 entries across 16 versions & 1 rubygems

Version Path
logstash-core-5.0.0.alpha4.snapshot2-java lib/logstash/filter_delegator.rb
logstash-core-5.0.0.alpha4.snapshot1-java lib/logstash/filter_delegator.rb
logstash-core-5.0.0.alpha3-java lib/logstash/filter_delegator.rb
logstash-core-5.0.0.alpha3.snapshot8-java lib/logstash/filter_delegator.rb
logstash-core-5.0.0.alpha3.snapshot7-java lib/logstash/filter_delegator.rb
logstash-core-5.0.0.alpha3.snapshot6-java lib/logstash/filter_delegator.rb
logstash-core-5.0.0.alpha3.snapshot5-java lib/logstash/filter_delegator.rb
logstash-core-5.0.0.alpha3.snapshot4-java lib/logstash/filter_delegator.rb
logstash-core-5.0.0.alpha3.snapshot2-java lib/logstash/filter_delegator.rb
logstash-core-5.0.0.alpha3.snapshot1-java lib/logstash/filter_delegator.rb
logstash-core-5.0.0.alpha2-java lib/logstash/filter_delegator.rb
logstash-core-5.0.0.alpha2.snapshot2-java lib/logstash/filter_delegator.rb
logstash-core-5.0.0.alpha2.snapshot1-java lib/logstash/filter_delegator.rb
logstash-core-5.0.0.alpha1-java lib/logstash/filter_delegator.rb
logstash-core-5.0.0.alpha1.snapshot2-java lib/logstash/filter_delegator.rb
logstash-core-5.0.0.alpha1.snapshot1-java lib/logstash/filter_delegator.rb