lib/logstash/inputs/rss.rb in logstash-input-rss-0.1.4 vs lib/logstash/inputs/rss.rb in logstash-input-rss-2.0.1
- old
+ new
@@ -1,9 +1,12 @@
# encoding: utf-8
require "logstash/inputs/base"
require "logstash/namespace"
require "socket" # for Socket.gethostname
+require "stud/interval"
+require "faraday"
+require "rss"
# Run command line tools and capture the whole output as an event.
#
# Notes:
#
@@ -23,61 +26,24 @@
# Interval to run the command. Value is in seconds.
config :interval, :validate => :number, :required => true
public
def register
- require "faraday"
- require "rss"
@logger.info("Registering RSS Input", :url => @url, :interval => @interval)
end # def register
public
def run(queue)
- loop do
+ @run_thread = Thread.current
+ while !stop?
start = Time.now
@logger.info? && @logger.info("Polling RSS", :url => @url)
# Pull down the RSS feed using FTW so we can make use of future cache functions
response = Faraday.get @url
- body = response.body
- # @logger.debug("Body", :body => body)
- # Parse the RSS feed
- feed = RSS::Parser.parse(body)
- feed.items.each do |item|
- # Put each item into an event
- @logger.debug("Item", :item => item.author)
- case feed.feed_type
- when 'rss'
- @codec.decode(item.description) do |event|
- event["Feed"] = @url
- event["published"] = item.pubDate
- event["title"] = item.title
- event["link"] = item.link
- event["author"] = item.author
- decorate(event)
- queue << event
- end
- when 'atom'
- if ! item.content.nil?
- content = item.content.content
- else
- content = item.summary.content
- end
- @codec.decode(content) do |event|
- event["Feed"] = @url
- event["updated"] = item.updated.content
- event["title"] = item.title.content
- event["link"] = item.link.href
- event["author"] = item.author.name.content
- unless item.published.nil?
- event["published"] = item.published.content
- end
- decorate(event)
- queue << event
- end
- end
- end
+ handle_response(response)
+
duration = Time.now - start
@logger.info? && @logger.info("Command completed", :command => @command,
:duration => duration)
# Sleep for the remainder of the interval, or 0 if the duration ran
@@ -86,10 +52,55 @@
if sleeptime == 0
@logger.warn("Execution ran longer than the interval. Skipping sleep.",
:command => @command, :duration => duration,
:interval => @interval)
else
- sleep(sleeptime)
+ Stud.stoppable_sleep(sleeptime) { stop? }
end
end # loop
- end # def run
+ end
+
+ def handle_response(response)
+ body = response.body
+ # @logger.debug("Body", :body => body)
+ # Parse the RSS feed
+ feed = RSS::Parser.parse(body)
+ feed.items.each do |item|
+ # Put each item into an event
+ @logger.debug("Item", :item => item.author)
+ case feed.feed_type
+ when 'rss'
+ @codec.decode(item.description) do |event|
+ event["Feed"] = @url
+ event["published"] = item.pubDate
+ event["title"] = item.title
+ event["link"] = item.link
+ event["author"] = item.author
+ decorate(event)
+ queue << event
+ end
+ when 'atom'
+ if ! item.content.nil?
+ content = item.content.content
+ else
+ content = item.summary.content
+ end
+ @codec.decode(content) do |event|
+ event["Feed"] = @url
+ event["updated"] = item.updated.content
+ event["title"] = item.title.content
+ event["link"] = item.link.href
+ event["author"] = item.author.name.content
+ unless item.published.nil?
+ event["published"] = item.published.content
+ end
+ decorate(event)
+ queue << event
+ end
+ end
+ end
+ end
+
+ def stop
+ Stud.stop!(@run_thread) if @run_thread
+ end
end # class LogStash::Inputs::Exec