Sha256: b45865071c2b61511e84be73654ee3920893d913f836195747241e55c3f80c0a

Contents?: true

Size: 1.98 KB

Versions: 13

Compression:

Stored size: 1.98 KB

Contents

# encoding: utf-8
#
module LogStash
  class FilterDelegator
    extend Forwardable
    DELEGATED_METHODS = [
      :register,
      :close,
      :threadsafe?,
      :do_close,
      :do_stop,
      :periodic_flush,
      :reloadable?
    ]
    def_delegators :@filter, *DELEGATED_METHODS

    attr_reader :id

    def initialize(filter, id)
      @klass = filter.class
      @id = id
      @filter = filter

      # Scope the metrics to the plugin
      namespaced_metric = filter.metric
      @metric_events = namespaced_metric.namespace(:events)
      @metric_events_in = @metric_events.counter(:in)
      @metric_events_out = @metric_events.counter(:out)
      @metric_events_time = @metric_events.counter(:duration_in_millis)
      namespaced_metric.gauge(:name, config_name)

      # Not all the filters will do bufferings
      define_flush_method if @filter.respond_to?(:flush)
    end

    def config_name
      @klass.config_name
    end

    def multi_filter(events)
      @metric_events_in.increment(events.size)

      start_time = java.lang.System.nano_time
      new_events = @filter.multi_filter(events)
      @metric_events_time.increment((java.lang.System.nano_time - start_time) / 1_000_000)

      # There is no guarantee in the context of filter
      # that EVENTS_IN == EVENTS_OUT, see the aggregates and
      # the split filter
      c = new_events.count { |event| !event.cancelled? }
      @metric_events_out.increment(c) if c > 0
      new_events
    end

    private
    def define_flush_method
      define_singleton_method(:flush) do |options = {}|
        # we also need to trace the number of events
        # coming from a specific filters.
        new_events = @filter.flush(options)

        # Filter plugins that does buffering or spooling of events like the
        # `Logstash-filter-aggregates` can return `NIL` and will flush on the next flush ticks.
        @metric_events_out.increment(new_events.size) if new_events && new_events.size > 0
        new_events
      end
    end
  end
end

Version data entries

13 entries across 13 versions & 1 rubygems

Version Path
logstash-core-6.5.4-java lib/logstash/filter_delegator.rb
logstash-core-6.5.3-java lib/logstash/filter_delegator.rb
logstash-core-6.5.2-java lib/logstash/filter_delegator.rb
logstash-core-6.5.1-java lib/logstash/filter_delegator.rb
logstash-core-7.0.0.alpha1-java lib/logstash/filter_delegator.rb
logstash-core-6.5.0-java lib/logstash/filter_delegator.rb
logstash-core-6.4.3-java lib/logstash/filter_delegator.rb
logstash-core-6.4.2-java lib/logstash/filter_delegator.rb
logstash-core-6.4.1-java lib/logstash/filter_delegator.rb
logstash-core-6.4.0-java lib/logstash/filter_delegator.rb
logstash-core-6.3.2-java lib/logstash/filter_delegator.rb
logstash-core-6.3.1-java lib/logstash/filter_delegator.rb
logstash-core-6.3.0-java lib/logstash/filter_delegator.rb