Sha256: d195b47f15a1fb8459b1531d5df97c6e76cb0b02c519c89f36700c3c84b64af3

Contents?: true

Size: 1.49 KB

Versions: 3

Compression:

Stored size: 1.49 KB

Contents

require "logstash/output_delegator_strategy_registry"

require "logstash/output_delegator_strategies/shared"
require "logstash/output_delegator_strategies/single"
require "logstash/output_delegator_strategies/legacy"

module LogStash class OutputDelegator
  attr_reader :metric, :metric_events, :strategy, :namespaced_metric, :metric_events, :id

  def initialize(logger, output_class, metric, strategy_registry, plugin_args)
    @logger = logger
    @output_class = output_class
    @metric = metric
    @id = plugin_args["id"]

    raise ArgumentError, "No strategy registry specified" unless strategy_registry
    raise ArgumentError, "No ID specified! Got args #{plugin_args}" unless id

    @namespaced_metric = metric.namespace(id.to_sym)
    @namespaced_metric.gauge(:name, config_name)
    @metric_events = @namespaced_metric.namespace(:events)

    @strategy = strategy_registry.
                  class_for(self.concurrency).
                  new(@logger, @output_class, @namespaced_metric, plugin_args)
  end

  def config_name
    @output_class.config_name
  end

  def reloadable?
    @output_class.reloadable?
  end

  def concurrency
    @output_class.concurrency
  end

  def register
    @strategy.register
  end

  def multi_receive(events)
    @metric_events.increment(:in, events.length)
    clock = @metric_events.time(:duration_in_millis)
    @strategy.multi_receive(events)
    clock.stop
    @metric_events.increment(:out, events.length)
  end

  def do_close
    @strategy.do_close
  end
end; end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
logstash-core-5.3.3-java lib/logstash/output_delegator.rb
logstash-core-5.3.2-java lib/logstash/output_delegator.rb
logstash-core-5.3.1-java lib/logstash/output_delegator.rb