lib/logstash/inputs/pipe.rb in logstash-input-pipe-0.1.4 vs lib/logstash/inputs/pipe.rb in logstash-input-pipe-0.1.5

- old
+ new

@@ -24,39 +24,57 @@ # Example: # [source,ruby] # command => "echo hello world" config :command, :validate => :string, :required => true + def initialize(params) + super + @shutdown_requested = false + @pipe = nil + end # def initialize + public def register @logger.info("Registering pipe input", :command => @command) end # def register public def run(queue) - loop do + while !@shutdown_requested begin - @pipe = IO.popen(@command, mode="r") + @pipe = IO.popen(@command, mode = "r") hostname = Socket.gethostname @pipe.each do |line| line = line.chomp - source = "pipe://#{hostname}/#{@command}" @logger.debug? && @logger.debug("Received line", :command => @command, :line => line) @codec.decode(line) do |event| event["host"] = hostname event["command"] = @command decorate(event) queue << event end end + @pipe.close + @pipe = nil rescue LogStash::ShutdownSignal => e break rescue Exception => e @logger.error("Exception while running command", :e => e, :backtrace => e.backtrace) end # Keep running the command forever. sleep(10) end end # def run + + def teardown + @shutdown_requested = true + if @pipe + Process.kill("KILL", @pipe.pid) rescue nil + @pipe.close rescue nil + @pipe = nil + end + finished + end + end # class LogStash::Inputs::Pipe