Sha256: 6d7ab6a04d41064271bcdd6def45da7d89826262132605423272535fd2e0b80f

Contents?: true

Size: 1.69 KB

Versions: 10

Compression:

Stored size: 1.69 KB

Contents

module ::LogStash; module Plugins; module Builtin; module Pipeline; class Input < ::LogStash::Inputs::Base
  include org.logstash.plugins.pipeline.PipelineInput

  config_name "pipeline"

  config :address, :validate => :string, :required => true

  attr_reader :pipeline_bus

  def register
    # May as well set this up here, writers won't do anything until
    # @running is set to false
    @running = java.util.concurrent.atomic.AtomicBoolean.new(false)
    @pipeline_bus = execution_context.agent.pipeline_bus
    listen_successful = pipeline_bus.listen(self, address)
    if !listen_successful
      raise ::LogStash::ConfigurationError, "Internal input at '#{@address}' already bound! Addresses must be globally unique across pipelines."
    end
  end

  def run(queue)
    @queue = queue
    @running.set(true)

    while @running.get()
      sleep 0.1
    end
  end

  def running?
    @running && @running.get()
  end

  # Returns false if the receive failed due to a stopping input
  # To understand why this value is useful see Internal.send_to
  # Note, this takes a java Stream, not a ruby array
  def internalReceive(events)
    return false if !@running.get()

    # TODO This should probably push a batch at some point in the future when doing so
    # buys us some efficiency
    events.forEach do |event|
      decorate(event)
      @queue << event
    end

    true
  end

  def stop
    pipeline_bus.unlisten(self, address)
    # We stop receiving events _after_ we unlisten to pick up any events sent by upstream outputs that
    # have not yet stopped
    @running.set(false) if @running # If register wasn't yet called, no @running!
  end

  def isRunning
    @running.get
  end

end; end; end; end; end

Version data entries

10 entries across 10 versions & 1 rubygems

Version Path
logstash-core-7.5.2-java lib/logstash/plugins/builtin/pipeline/input.rb
logstash-core-7.5.1-java lib/logstash/plugins/builtin/pipeline/input.rb
logstash-core-7.5.0-java lib/logstash/plugins/builtin/pipeline/input.rb
logstash-core-7.4.2-java lib/logstash/plugins/builtin/pipeline/input.rb
logstash-core-7.4.1-java lib/logstash/plugins/builtin/pipeline/input.rb
logstash-core-7.4.0-java lib/logstash/plugins/builtin/pipeline/input.rb
logstash-core-7.3.2-java lib/logstash/plugins/builtin/pipeline/input.rb
logstash-core-7.3.1-java lib/logstash/plugins/builtin/pipeline/input.rb
logstash-core-7.3.0-java lib/logstash/plugins/builtin/pipeline/input.rb
logstash-core-7.2.1-java lib/logstash/plugins/builtin/pipeline/input.rb