Sha256: 64f868c94a0792aa0ca6f4f1533b98137a14ea72d452ca825fb92d9a9ba84310

Contents?: true

Size: 1.93 KB

Versions: 17

Compression:

Stored size: 1.93 KB

Contents

# encoding: utf-8
#
module LogStash
  class FilterDelegator
    extend Forwardable
    DELEGATED_METHODS = [
      :register,
      :close,
      :threadsafe?,
      :do_close,
      :do_stop,
      :periodic_flush,
      :reloadable?
    ]
    def_delegators :@filter, *DELEGATED_METHODS

    def initialize(logger, klass, metric, execution_context, plugin_args)
      @logger = logger
      @klass = klass
      @id = plugin_args["id"]
      @filter = klass.new(plugin_args)

      # Scope the metrics to the plugin
      namespaced_metric = metric.namespace(@id.to_sym)
      @filter.metric = namespaced_metric
      @filter.execution_context = execution_context

      @metric_events = namespaced_metric.namespace(:events)
      namespaced_metric.gauge(:name, config_name)

      # Not all the filters will do bufferings
      define_flush_method if @filter.respond_to?(:flush)
    end

    def config_name
      @klass.config_name
    end

    def multi_filter(events)
      @metric_events.increment(:in, events.size)

      clock = @metric_events.time(:duration_in_millis)
      new_events = @filter.multi_filter(events)
      clock.stop

      # There is no guarantee in the context of filter
      # that EVENTS_INT == EVENTS_OUT, see the aggregates and
      # the split filter
      c = new_events.count { |event| !event.cancelled? }
      @metric_events.increment(:out, c) if c > 0

      return new_events
    end

    private
    def define_flush_method
      define_singleton_method(:flush) do |options = {}|
        # we also need to trace the number of events
        # coming from a specific filters.
        new_events = @filter.flush(options)

        # Filter plugins that does buffering or spooling of events like the
        # `Logstash-filter-aggregates` can return `NIL` and will flush on the next flush ticks.
        @metric_events.increment(:out, new_events.size) if new_events && new_events.size > 0
        new_events
      end
    end
  end
end

Version data entries

17 entries across 17 versions & 3 rubygems

Version Path
logstash-core-5.5.3-java lib/logstash/filter_delegator.rb
logstash-core-5.5.2-java lib/logstash/filter_delegator.rb
logstash-core-6.0.0.beta1-java lib/logstash/filter_delegator.rb
logstash-core-5.5.1-java lib/logstash/filter_delegator.rb
logstash-filter-cache-redis-0.3.1 vendor/bundle/jruby/1.9/gems/logstash-core-5.5.1.snapshot1-java/lib/logstash/filter_delegator.rb
logstash-filter-cache-redis-0.3.0 vendor/bundle/jruby/1.9/gems/logstash-core-5.5.1.snapshot1-java/lib/logstash/filter_delegator.rb
logstash-core-5.5.1.snapshot1-java lib/logstash/filter_delegator.rb
logstash-filter-cache-redis-0.2.0 vendor/bundle/jruby/1.9/gems/logstash-core-5.5.0-java/lib/logstash/filter_delegator.rb
logstash-filter-cache-redis-0.1.0 vendor/bundle/jruby/1.9/gems/logstash-core-5.4.0-java/lib/logstash/filter_delegator.rb
logstash-core-5.5.0-java lib/logstash/filter_delegator.rb
logstash-core-5.4.3-java lib/logstash/filter_delegator.rb
logstash-core-5.4.2-java lib/logstash/filter_delegator.rb
logstash-core-6.0.0.alpha2-java lib/logstash/filter_delegator.rb
logstash-core-5.4.1-java lib/logstash/filter_delegator.rb
logstash-filter-htmlentities-0.1.0 vendor/bundle/jruby/1.9/gems/logstash-core-5.4.0-java/lib/logstash/filter_delegator.rb
logstash-core-6.0.0.alpha1-java lib/logstash/filter_delegator.rb
logstash-core-5.4.0-java lib/logstash/filter_delegator.rb