lib/logstash/inputs/tcp.rb in logstash-input-tcp-0.1.3 vs lib/logstash/inputs/tcp.rb in logstash-input-tcp-0.1.4
- old
+ new
@@ -8,11 +8,10 @@
# Like stdin and file inputs, each event is assumed to be one line of text.
#
# Can either accept connections from clients or connect to a server,
# depending on `mode`.
class LogStash::Inputs::Tcp < LogStash::Inputs::Base
- class Interrupted < StandardError; end
config_name "tcp"
default :codec, "line"
# When mode is `server`, the address to listen on.
@@ -52,10 +51,13 @@
# SSL key passphrase
config :ssl_key_passphrase, :validate => :password, :default => nil
def initialize(*args)
super(*args)
+ @interrupted = false
+ @server_socket = nil
+ @client_socket = nil
end # def initialize
public
def register
require "socket"
@@ -100,14 +102,14 @@
end
end # def register
private
def handle_socket(socket, client_address, output_queue, codec)
- while true
+ while !@interrupted
buf = nil
# NOTE(petef): the timeout only hits after the line is read or socket dies
- # TODO(sissel): Why do we have a timeout here? What's the point?
+ # TODO(sissel): Why do we have a Timeout here? What's the point?
if @data_timeout == -1
buf = read(socket)
else
Timeout::timeout(@data_timeout) do
buf = read(socket)
@@ -141,11 +143,12 @@
def client_thread(output_queue, socket)
Thread.new(output_queue, socket) do |q, s|
begin
@logger.debug? && @logger.debug("Accepted connection", :client => s.peer, :server => "#{@host}:#{@port}")
handle_socket(s, s.peeraddr[3], q, @codec.clone)
- rescue Interrupted
+ rescue LogStash::ShutdownSignal
+ @interrupted = true
s.close rescue nil
ensure
@client_threads_lock.synchronize{@client_threads.delete(Thread.current)}
end
end
@@ -156,11 +159,11 @@
@mode == "server"
end # def server?
private
def read(socket)
- return socket.sysread(16384)
+ socket.sysread(16384)
end # def readline
public
def run(output_queue)
if server?
@@ -169,69 +172,65 @@
run_client(output_queue)
end
end # def run
def run_server(output_queue)
- @thread = Thread.current
@client_threads = []
@client_threads_lock = Mutex.new
- while true
+ while !@interrupted
begin
socket = @server_socket.accept
# start a new thread for each connection.
@client_threads_lock.synchronize{@client_threads << client_thread(output_queue, socket)}
rescue OpenSSL::SSL::SSLError => ssle
# NOTE(mrichar1): This doesn't return a useful error message for some reason
@logger.error("SSL Error", :exception => ssle, :backtrace => ssle.backtrace)
- rescue IOError, LogStash::ShutdownSignal
- if @interrupted
- @server_socket.close rescue nil
-
- threads = @client_threads_lock.synchronize{@client_threads.dup}
- threads.each do |thread|
- thread.raise(LogStash::ShutdownSignal) if thread.alive?
- end
-
- # intended shutdown, get out of the loop
- break
- else
- # it was a genuine IOError, propagate it up
- raise
- end
+ rescue IOError
+ raise unless @interrupted
end
- end # loop
+ end
rescue LogStash::ShutdownSignal
- # nothing to do
+ @interrupted = true
ensure
@server_socket.close rescue nil
+
+ threads = @client_threads_lock.synchronize{@client_threads.dup}
+ threads.each do |thread|
+ thread.raise(LogStash::ShutdownSignal) if thread.alive?
+ end
end # def run_server
def run_client(output_queue)
- @thread = Thread.current
- while true
- client_socket = TCPSocket.new(@host, @port)
+ while !@interrupted
+ @client_socket = TCPSocket.new(@host, @port)
if @ssl_enable
- client_socket = OpenSSL::SSL::SSLSocket.new(client_socket, @ssl_context)
+ @client_socket = OpenSSL::SSL::SSLSocket.new(@client_socket, @ssl_context)
begin
- client_socket.connect
+ @client_socket.connect
rescue OpenSSL::SSL::SSLError => ssle
@logger.error("SSL Error", :exception => ssle, :backtrace => ssle.backtrace)
# NOTE(mrichar1): Hack to prevent hammering peer
sleep(5)
next
end
end
- @logger.debug("Opened connection", :client => "#{client_socket.peer}")
- handle_socket(client_socket, client_socket.peeraddr[3], output_queue, @codec.clone)
+ @logger.debug? && @logger.debug("Opened connection", :client => "#{@client_socket.peer}")
+ handle_socket(@client_socket, @client_socket.peeraddr[3], output_queue, @codec.clone)
end # loop
ensure
- client_socket.close rescue nil
+ @client_socket.close rescue nil
end # def run
public
def teardown
- if server?
- @interrupted = true
+ @interrupted = true
+ if @server_socket
+ @server_socket.close rescue nil
+ @server_socket = nil
+ end
+ if @client_socket
+ @client_socket.close rescue nil
+ @client_socket = nil
end
end # def teardown
end # class LogStash::Inputs::Tcp