lib/logstash/inputs/tcp.rb in logstash-input-tcp-0.1.3 vs lib/logstash/inputs/tcp.rb in logstash-input-tcp-0.1.4

- old
+ new

@@ -8,11 +8,10 @@ # Like stdin and file inputs, each event is assumed to be one line of text. # # Can either accept connections from clients or connect to a server, # depending on `mode`. class LogStash::Inputs::Tcp < LogStash::Inputs::Base - class Interrupted < StandardError; end config_name "tcp" default :codec, "line" # When mode is `server`, the address to listen on. @@ -52,10 +51,13 @@ # SSL key passphrase config :ssl_key_passphrase, :validate => :password, :default => nil def initialize(*args) super(*args) + @interrupted = false + @server_socket = nil + @client_socket = nil end # def initialize public def register require "socket" @@ -100,14 +102,14 @@ end end # def register private def handle_socket(socket, client_address, output_queue, codec) - while true + while !@interrupted buf = nil # NOTE(petef): the timeout only hits after the line is read or socket dies - # TODO(sissel): Why do we have a timeout here? What's the point? + # TODO(sissel): Why do we have a Timeout here? What's the point? if @data_timeout == -1 buf = read(socket) else Timeout::timeout(@data_timeout) do buf = read(socket) @@ -141,11 +143,12 @@ def client_thread(output_queue, socket) Thread.new(output_queue, socket) do |q, s| begin @logger.debug? && @logger.debug("Accepted connection", :client => s.peer, :server => "#{@host}:#{@port}") handle_socket(s, s.peeraddr[3], q, @codec.clone) - rescue Interrupted + rescue LogStash::ShutdownSignal + @interrupted = true s.close rescue nil ensure @client_threads_lock.synchronize{@client_threads.delete(Thread.current)} end end @@ -156,11 +159,11 @@ @mode == "server" end # def server? private def read(socket) - return socket.sysread(16384) + socket.sysread(16384) end # def readline public def run(output_queue) if server? @@ -169,69 +172,65 @@ run_client(output_queue) end end # def run def run_server(output_queue) - @thread = Thread.current @client_threads = [] @client_threads_lock = Mutex.new - while true + while !@interrupted begin socket = @server_socket.accept # start a new thread for each connection. @client_threads_lock.synchronize{@client_threads << client_thread(output_queue, socket)} rescue OpenSSL::SSL::SSLError => ssle # NOTE(mrichar1): This doesn't return a useful error message for some reason @logger.error("SSL Error", :exception => ssle, :backtrace => ssle.backtrace) - rescue IOError, LogStash::ShutdownSignal - if @interrupted - @server_socket.close rescue nil - - threads = @client_threads_lock.synchronize{@client_threads.dup} - threads.each do |thread| - thread.raise(LogStash::ShutdownSignal) if thread.alive? - end - - # intended shutdown, get out of the loop - break - else - # it was a genuine IOError, propagate it up - raise - end + rescue IOError + raise unless @interrupted end - end # loop + end rescue LogStash::ShutdownSignal - # nothing to do + @interrupted = true ensure @server_socket.close rescue nil + + threads = @client_threads_lock.synchronize{@client_threads.dup} + threads.each do |thread| + thread.raise(LogStash::ShutdownSignal) if thread.alive? + end end # def run_server def run_client(output_queue) - @thread = Thread.current - while true - client_socket = TCPSocket.new(@host, @port) + while !@interrupted + @client_socket = TCPSocket.new(@host, @port) if @ssl_enable - client_socket = OpenSSL::SSL::SSLSocket.new(client_socket, @ssl_context) + @client_socket = OpenSSL::SSL::SSLSocket.new(@client_socket, @ssl_context) begin - client_socket.connect + @client_socket.connect rescue OpenSSL::SSL::SSLError => ssle @logger.error("SSL Error", :exception => ssle, :backtrace => ssle.backtrace) # NOTE(mrichar1): Hack to prevent hammering peer sleep(5) next end end - @logger.debug("Opened connection", :client => "#{client_socket.peer}") - handle_socket(client_socket, client_socket.peeraddr[3], output_queue, @codec.clone) + @logger.debug? && @logger.debug("Opened connection", :client => "#{@client_socket.peer}") + handle_socket(@client_socket, @client_socket.peeraddr[3], output_queue, @codec.clone) end # loop ensure - client_socket.close rescue nil + @client_socket.close rescue nil end # def run public def teardown - if server? - @interrupted = true + @interrupted = true + if @server_socket + @server_socket.close rescue nil + @server_socket = nil + end + if @client_socket + @client_socket.close rescue nil + @client_socket = nil end end # def teardown end # class LogStash::Inputs::Tcp