Sha256: 1f2297379e6978cdfd16326fbeeb28f82d854aab038b7faa92183cde0a59d14d

Contents?: true

Size: 1.85 KB

Versions: 17

Compression:

Stored size: 1.85 KB

Contents

# encoding: utf-8
module LogStash module Instrument
  class WrappedWriteClient
    def initialize(write_client, pipeline, metric, plugin)
      @write_client = write_client

      pipeline_id = pipeline.pipeline_id.to_s.to_sym
      plugin_type = "#{plugin.class.plugin_type}s".to_sym

      @events_metrics = metric.namespace([:stats, :events])
      @pipeline_metrics = metric.namespace([:stats, :pipelines, pipeline_id, :events])
      @plugin_events_metrics = metric.namespace([:stats, :pipelines, pipeline_id, :plugins, plugin_type, plugin.id.to_sym, :events])

      define_initial_metrics_values
    end

    def get_new_batch
      @write_client.get_new_batch
    end

    def push(event)
      record_metric { @write_client.push(event) }
    end
    alias_method(:<<, :push)

    def push_batch(batch)
      record_metric(batch.size) { @write_client.push_batch(batch) }
    end

    private
    def record_metric(size = 1)
      @events_metrics.increment(:in, size)
      @pipeline_metrics.increment(:in, size)
      @plugin_events_metrics.increment(:out, size)

      clock = @events_metrics.time(:queue_push_duration_in_millis)

      result = yield

      # Reuse the same values for all the endpoints to make sure we don't have skew in times.
      execution_time = clock.stop

      @pipeline_metrics.report_time(:queue_push_duration_in_millis, execution_time)
      @plugin_events_metrics.report_time(:queue_push_duration_in_millis, execution_time)

      result
    end

    def define_initial_metrics_values
      @events_metrics.increment(:in, 0)
      @pipeline_metrics.increment(:in, 0)
      @plugin_events_metrics.increment(:out, 0)

      @events_metrics.report_time(:queue_push_duration_in_millis, 0)
      @pipeline_metrics.report_time(:queue_push_duration_in_millis, 0)
      @plugin_events_metrics.report_time(:queue_push_duration_in_millis, 0)
    end
  end
end end

Version data entries

17 entries across 17 versions & 3 rubygems

Version Path
logstash-core-5.5.3-java lib/logstash/instrument/wrapped_write_client.rb
logstash-core-5.5.2-java lib/logstash/instrument/wrapped_write_client.rb
logstash-core-6.0.0.beta1-java lib/logstash/instrument/wrapped_write_client.rb
logstash-core-5.5.1-java lib/logstash/instrument/wrapped_write_client.rb
logstash-filter-cache-redis-0.3.1 vendor/bundle/jruby/1.9/gems/logstash-core-5.5.1.snapshot1-java/lib/logstash/instrument/wrapped_write_client.rb
logstash-filter-cache-redis-0.3.0 vendor/bundle/jruby/1.9/gems/logstash-core-5.5.1.snapshot1-java/lib/logstash/instrument/wrapped_write_client.rb
logstash-core-5.5.1.snapshot1-java lib/logstash/instrument/wrapped_write_client.rb
logstash-filter-cache-redis-0.2.0 vendor/bundle/jruby/1.9/gems/logstash-core-5.5.0-java/lib/logstash/instrument/wrapped_write_client.rb
logstash-filter-cache-redis-0.1.0 vendor/bundle/jruby/1.9/gems/logstash-core-5.4.0-java/lib/logstash/instrument/wrapped_write_client.rb
logstash-core-5.5.0-java lib/logstash/instrument/wrapped_write_client.rb
logstash-core-5.4.3-java lib/logstash/instrument/wrapped_write_client.rb
logstash-core-5.4.2-java lib/logstash/instrument/wrapped_write_client.rb
logstash-core-6.0.0.alpha2-java lib/logstash/instrument/wrapped_write_client.rb
logstash-core-5.4.1-java lib/logstash/instrument/wrapped_write_client.rb
logstash-filter-htmlentities-0.1.0 vendor/bundle/jruby/1.9/gems/logstash-core-5.4.0-java/lib/logstash/instrument/wrapped_write_client.rb
logstash-core-6.0.0.alpha1-java lib/logstash/instrument/wrapped_write_client.rb
logstash-core-5.4.0-java lib/logstash/instrument/wrapped_write_client.rb