lib/logstash/inputs/pipe.rb in logstash-input-pipe-0.1.4 vs lib/logstash/inputs/pipe.rb in logstash-input-pipe-0.1.5
- old
+ new
@@ -24,39 +24,57 @@
# Example:
# [source,ruby]
# command => "echo hello world"
config :command, :validate => :string, :required => true
+ def initialize(params)
+ super
+ @shutdown_requested = false
+ @pipe = nil
+ end # def initialize
+
public
def register
@logger.info("Registering pipe input", :command => @command)
end # def register
public
def run(queue)
- loop do
+ while !@shutdown_requested
begin
- @pipe = IO.popen(@command, mode="r")
+ @pipe = IO.popen(@command, mode = "r")
hostname = Socket.gethostname
@pipe.each do |line|
line = line.chomp
- source = "pipe://#{hostname}/#{@command}"
@logger.debug? && @logger.debug("Received line", :command => @command, :line => line)
@codec.decode(line) do |event|
event["host"] = hostname
event["command"] = @command
decorate(event)
queue << event
end
end
+ @pipe.close
+ @pipe = nil
rescue LogStash::ShutdownSignal => e
break
rescue Exception => e
@logger.error("Exception while running command", :e => e, :backtrace => e.backtrace)
end
# Keep running the command forever.
sleep(10)
end
end # def run
+
+ def teardown
+ @shutdown_requested = true
+ if @pipe
+ Process.kill("KILL", @pipe.pid) rescue nil
+ @pipe.close rescue nil
+ @pipe = nil
+ end
+ finished
+ end
+
end # class LogStash::Inputs::Pipe