lib/logstash/inputs/rss.rb in logstash-input-rss-0.1.4 vs lib/logstash/inputs/rss.rb in logstash-input-rss-2.0.1

- old
+ new

@@ -1,9 +1,12 @@ # encoding: utf-8 require "logstash/inputs/base" require "logstash/namespace" require "socket" # for Socket.gethostname +require "stud/interval" +require "faraday" +require "rss" # Run command line tools and capture the whole output as an event. # # Notes: # @@ -23,61 +26,24 @@ # Interval to run the command. Value is in seconds. config :interval, :validate => :number, :required => true public def register - require "faraday" - require "rss" @logger.info("Registering RSS Input", :url => @url, :interval => @interval) end # def register public def run(queue) - loop do + @run_thread = Thread.current + while !stop? start = Time.now @logger.info? && @logger.info("Polling RSS", :url => @url) # Pull down the RSS feed using FTW so we can make use of future cache functions response = Faraday.get @url - body = response.body - # @logger.debug("Body", :body => body) - # Parse the RSS feed - feed = RSS::Parser.parse(body) - feed.items.each do |item| - # Put each item into an event - @logger.debug("Item", :item => item.author) - case feed.feed_type - when 'rss' - @codec.decode(item.description) do |event| - event["Feed"] = @url - event["published"] = item.pubDate - event["title"] = item.title - event["link"] = item.link - event["author"] = item.author - decorate(event) - queue << event - end - when 'atom' - if ! item.content.nil? - content = item.content.content - else - content = item.summary.content - end - @codec.decode(content) do |event| - event["Feed"] = @url - event["updated"] = item.updated.content - event["title"] = item.title.content - event["link"] = item.link.href - event["author"] = item.author.name.content - unless item.published.nil? - event["published"] = item.published.content - end - decorate(event) - queue << event - end - end - end + handle_response(response) + duration = Time.now - start @logger.info? && @logger.info("Command completed", :command => @command, :duration => duration) # Sleep for the remainder of the interval, or 0 if the duration ran @@ -86,10 +52,55 @@ if sleeptime == 0 @logger.warn("Execution ran longer than the interval. Skipping sleep.", :command => @command, :duration => duration, :interval => @interval) else - sleep(sleeptime) + Stud.stoppable_sleep(sleeptime) { stop? } end end # loop - end # def run + end + + def handle_response(response) + body = response.body + # @logger.debug("Body", :body => body) + # Parse the RSS feed + feed = RSS::Parser.parse(body) + feed.items.each do |item| + # Put each item into an event + @logger.debug("Item", :item => item.author) + case feed.feed_type + when 'rss' + @codec.decode(item.description) do |event| + event["Feed"] = @url + event["published"] = item.pubDate + event["title"] = item.title + event["link"] = item.link + event["author"] = item.author + decorate(event) + queue << event + end + when 'atom' + if ! item.content.nil? + content = item.content.content + else + content = item.summary.content + end + @codec.decode(content) do |event| + event["Feed"] = @url + event["updated"] = item.updated.content + event["title"] = item.title.content + event["link"] = item.link.href + event["author"] = item.author.name.content + unless item.published.nil? + event["published"] = item.published.content + end + decorate(event) + queue << event + end + end + end + end + + def stop + Stud.stop!(@run_thread) if @run_thread + end end # class LogStash::Inputs::Exec