Sha256: 3784992bf93c07df881b925b32a3de5857ff7e951c5da29e724751667fb24a54

Contents?: true

Size: 1.69 KB

Versions: 52

Compression:

Stored size: 1.69 KB

Contents

module ::LogStash; module Plugins; module Builtin; module Pipeline; class Input < ::LogStash::Inputs::Base
  include org.logstash.plugins.pipeline.PipelineInput

  config_name "pipeline"

  config :address, :validate => :string, :required => true

  attr_reader :pipeline_bus

  def register
    # May as well set this up here, writers won't do anything until
    # @running is set to false
    @running = java.util.concurrent.atomic.AtomicBoolean.new(false)
    @pipeline_bus = execution_context.agent.pipeline_bus
    listen_successful = pipeline_bus.listen(self, address)
    if !listen_successful
      raise ::LogStash::ConfigurationError, "Internal input at '#{@address}' already bound! Addresses must be globally unique across pipelines."
    end
  end

  def run(queue)
    @queue = queue
    @running.set(true)

    while @running.get()
      sleep 0.1
    end
  end

  def running?
    @running && @running.get()
  end

  # Returns false if the receive failed due to a stopping input
  # To understand why this value is useful see Internal.send_to
  # Note, this takes a java Stream, not a ruby array
  def internalReceive(events)
    return false if !@running.get()

    # TODO This should probably push a batch at some point in the future when doing so
    # buys us some efficiency
    events.forEach do |event|
      decorate(event)
      @queue << event
    end

    return true
  rescue => e
    require 'pry'; binding.pry
    return true
  end

  def stop
    # We stop receiving events before we unlisten to prevent races
    @running.set(false) if @running # If register wasn't yet called, no @running!
    pipeline_bus.unlisten(self, address)
  end

  def isRunning
    @running.get
  end

end; end; end; end; end

Version data entries

52 entries across 52 versions & 1 rubygems

Version Path
logstash-core-6.8.23-java lib/logstash/plugins/builtin/pipeline/input.rb
logstash-core-6.8.22-java lib/logstash/plugins/builtin/pipeline/input.rb
logstash-core-6.8.21-java lib/logstash/plugins/builtin/pipeline/input.rb
logstash-core-6.8.20-java lib/logstash/plugins/builtin/pipeline/input.rb
logstash-core-6.8.19-java lib/logstash/plugins/builtin/pipeline/input.rb
logstash-core-6.8.18-java lib/logstash/plugins/builtin/pipeline/input.rb
logstash-core-6.8.17-java lib/logstash/plugins/builtin/pipeline/input.rb
logstash-core-6.8.16-java lib/logstash/plugins/builtin/pipeline/input.rb
logstash-core-6.8.15-java lib/logstash/plugins/builtin/pipeline/input.rb
logstash-core-6.8.14-java lib/logstash/plugins/builtin/pipeline/input.rb
logstash-core-6.8.13-java lib/logstash/plugins/builtin/pipeline/input.rb
logstash-core-6.8.12-java lib/logstash/plugins/builtin/pipeline/input.rb
logstash-core-6.8.11-java lib/logstash/plugins/builtin/pipeline/input.rb
logstash-core-6.8.10-java lib/logstash/plugins/builtin/pipeline/input.rb
logstash-core-6.8.9-java lib/logstash/plugins/builtin/pipeline/input.rb
logstash-core-6.8.8-java lib/logstash/plugins/builtin/pipeline/input.rb
logstash-core-6.8.7-java lib/logstash/plugins/builtin/pipeline/input.rb
logstash-core-6.8.6-java lib/logstash/plugins/builtin/pipeline/input.rb
logstash-core-6.8.5-java lib/logstash/plugins/builtin/pipeline/input.rb
logstash-core-6.8.4-java lib/logstash/plugins/builtin/pipeline/input.rb