Sha256: 3784992bf93c07df881b925b32a3de5857ff7e951c5da29e724751667fb24a54
Contents?: true
Size: 1.69 KB
Versions: 52
Compression:
Stored size: 1.69 KB
Contents
module ::LogStash; module Plugins; module Builtin; module Pipeline; class Input < ::LogStash::Inputs::Base include org.logstash.plugins.pipeline.PipelineInput config_name "pipeline" config :address, :validate => :string, :required => true attr_reader :pipeline_bus def register # May as well set this up here, writers won't do anything until # @running is set to false @running = java.util.concurrent.atomic.AtomicBoolean.new(false) @pipeline_bus = execution_context.agent.pipeline_bus listen_successful = pipeline_bus.listen(self, address) if !listen_successful raise ::LogStash::ConfigurationError, "Internal input at '#{@address}' already bound! Addresses must be globally unique across pipelines." end end def run(queue) @queue = queue @running.set(true) while @running.get() sleep 0.1 end end def running? @running && @running.get() end # Returns false if the receive failed due to a stopping input # To understand why this value is useful see Internal.send_to # Note, this takes a java Stream, not a ruby array def internalReceive(events) return false if !@running.get() # TODO This should probably push a batch at some point in the future when doing so # buys us some efficiency events.forEach do |event| decorate(event) @queue << event end return true rescue => e require 'pry'; binding.pry return true end def stop # We stop receiving events before we unlisten to prevent races @running.set(false) if @running # If register wasn't yet called, no @running! pipeline_bus.unlisten(self, address) end def isRunning @running.get end end; end; end; end; end
Version data entries
52 entries across 52 versions & 1 rubygems