lib/logstash/inputs/irc.rb in logstash-input-irc-3.0.5 vs lib/logstash/inputs/irc.rb in logstash-input-irc-3.0.6
- old
+ new
@@ -2,10 +2,13 @@
require "logstash/inputs/base"
require "logstash/namespace"
require "thread"
require "stud/task"
require "stud/interval"
+
+require "java"
+
# Read events from an IRC Server.
#
class LogStash::Inputs::Irc < LogStash::Inputs::Base
config_name "irc"
@@ -64,11 +67,11 @@
end
def register
require "cinch"
@user_stats = Hash.new
- @irc_queue = Queue.new
+ @irc_queue = java.util.concurrent.LinkedBlockingQueue.new
@catch_all = true if @get_stats
@logger.info("Connecting to irc server", :host => @host, :port => @port, :nick => @nick, :channels => @channels)
@bot ||= Cinch::Bot.new
@bot.loggers.clear
@@ -108,17 +111,15 @@
request_names
end
end
end
while !stop?
- begin
- msg = @irc_queue.pop(true)
- handle_response(msg, output_queue)
- rescue ThreadError
- # Empty queue
- end
+ msg = @irc_queue.poll(1, java.util.concurrent.TimeUnit::SECONDS)
+ handle_response(msg, output_queue) unless msg.nil?
end
+ ensure
+ stop rescue logger.warn("irc input failed to shutdown gracefully: #{$!.message}")
end # def run
RPL_NAMREPLY = "353"
RPL_ENDOFNAMES = "366"
@@ -147,11 +148,15 @@
event.set("user", msg.prefix.to_s)
event.set("command", msg.command.to_s)
event.set("channel", msg.channel.to_s)
event.set("nick", msg.user.nick)
event.set("server", "#{@host}:#{@port}")
- event.set("host", msg.user.host)
+ # The user's host attribute is an optional part of the message format;
+ # when it is not included, `Cinch::User#host` times out waiting for it
+ # to be populated, raising an exception; use `Cinch::User#data` to get
+ # host, which includes the user attributes as-parsed.
+ event.set("host", msg.user.data[:host])
output_queue << event
end
end
end
@@ -165,9 +170,9 @@
@bot.irc.send("NAMES #{channel}")
end
end
def stop
- @request_names_thread.stop! if @request_names_thread
- @bot_thread.stop!
+ @request_names_thread.stop! if @request_names_thread && !@request_names_thread.stop?
+ @bot_thread.stop! if @bot_thread && !@bot_thread.stop?
end
end # class LogStash::Inputs::Irc