lib/logstash/inputs/irc.rb in logstash-input-irc-3.0.5 vs lib/logstash/inputs/irc.rb in logstash-input-irc-3.0.6

- old
+ new

@@ -2,10 +2,13 @@ require "logstash/inputs/base" require "logstash/namespace" require "thread" require "stud/task" require "stud/interval" + +require "java" + # Read events from an IRC Server. # class LogStash::Inputs::Irc < LogStash::Inputs::Base config_name "irc" @@ -64,11 +67,11 @@ end def register require "cinch" @user_stats = Hash.new - @irc_queue = Queue.new + @irc_queue = java.util.concurrent.LinkedBlockingQueue.new @catch_all = true if @get_stats @logger.info("Connecting to irc server", :host => @host, :port => @port, :nick => @nick, :channels => @channels) @bot ||= Cinch::Bot.new @bot.loggers.clear @@ -108,17 +111,15 @@ request_names end end end while !stop? - begin - msg = @irc_queue.pop(true) - handle_response(msg, output_queue) - rescue ThreadError - # Empty queue - end + msg = @irc_queue.poll(1, java.util.concurrent.TimeUnit::SECONDS) + handle_response(msg, output_queue) unless msg.nil? end + ensure + stop rescue logger.warn("irc input failed to shutdown gracefully: #{$!.message}") end # def run RPL_NAMREPLY = "353" RPL_ENDOFNAMES = "366" @@ -147,11 +148,15 @@ event.set("user", msg.prefix.to_s) event.set("command", msg.command.to_s) event.set("channel", msg.channel.to_s) event.set("nick", msg.user.nick) event.set("server", "#{@host}:#{@port}") - event.set("host", msg.user.host) + # The user's host attribute is an optional part of the message format; + # when it is not included, `Cinch::User#host` times out waiting for it + # to be populated, raising an exception; use `Cinch::User#data` to get + # host, which includes the user attributes as-parsed. + event.set("host", msg.user.data[:host]) output_queue << event end end end @@ -165,9 +170,9 @@ @bot.irc.send("NAMES #{channel}") end end def stop - @request_names_thread.stop! if @request_names_thread - @bot_thread.stop! + @request_names_thread.stop! if @request_names_thread && !@request_names_thread.stop? + @bot_thread.stop! if @bot_thread && !@bot_thread.stop? end end # class LogStash::Inputs::Irc