lib/logstash/inputs/tcp.rb in logstash-input-tcp-1.0.0 vs lib/logstash/inputs/tcp.rb in logstash-input-tcp-2.0.1

- old
+ new

@@ -1,8 +1,7 @@ # encoding: utf-8 require "logstash/inputs/base" -require "logstash/namespace" require "logstash/util/socket_peer" # Read events over a TCP socket. # # Like stdin and file inputs, each event is assumed to be one line of text. @@ -20,15 +19,11 @@ # When mode is `server`, the port to listen on. # When mode is `client`, the port to connect to. config :port, :validate => :number, :required => true - # The 'read' timeout in seconds. If a particular tcp connection is idle for - # more than this timeout period, we will assume it is dead and close it. - # - # If you never want to timeout, use -1. - config :data_timeout, :validate => :number, :default => -1 + config :data_timeout, :validate => :number, :default => -1, :deprecated => "This setting is not used by this plugin. It will be removed soon." # Mode to operate in. `server` listens for client connections, # `client` connects to a server. config :mode, :validate => ["server", "client"], :default => "server" @@ -51,16 +46,18 @@ # SSL key passphrase config :ssl_key_passphrase, :validate => :password, :default => nil def initialize(*args) super(*args) - @interrupted = false + + # threadsafe socket bookkeeping @server_socket = nil @client_socket = nil - end # def initialize + @connection_sockets = {} + @socket_mutex = Mutex.new + end - public def register require "socket" require "timeout" require "openssl" @@ -84,153 +81,175 @@ @cert_store.add_file(@ssl_cacert) end @ssl_context.cert_store = @cert_store @ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER|OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT end - end # @ssl_enable + end + # note that since we are opening a socket in register, we must also make sure we close it + # in the close method even if we also close it in the stop method since we could have + # a situation where register is called but not run & stop. + if server? @logger.info("Starting tcp input listener", :address => "#{@host}:#{@port}") begin - @server_socket = TCPServer.new(@host, @port) + set_server_socket(TCPServer.new(@host, @port)) rescue Errno::EADDRINUSE @logger.error("Could not start TCP server: Address in use", :host => @host, :port => @port) raise end - if @ssl_enable - @server_socket = OpenSSL::SSL::SSLServer.new(@server_socket, @ssl_context) - end # @ssl_enable + + set_server_socket(OpenSSL::SSL::SSLServer.new(server_socket, @ssl_context)) if @ssl_enable end - end # def register + end + def run(output_queue) + if server? + run_server(output_queue) + else + run_client(output_queue) + end + end + + def stop + # force close all sockets which will escape any blocking read with a IO exception + # and any thread using them will exit. + # catch all rescue nil on close to discard any close errors or invalid socket + server_socket.close rescue nil + client_socket.close rescue nil + connection_sockets.each{|socket| socket.close rescue nil} + end + + def close + # see related comment in register: we must make sure to close the server socket here + # because it is created in the register method and we could be in the context of having + # register called but never run & stop, only close. + # catch all rescue nil on close to discard any close errors or invalid socket + server_socket.close rescue nil + end + private - def handle_socket(socket, client_address, output_queue, codec) - while !@interrupted - buf = nil - # NOTE(petef): the timeout only hits after the line is read or socket dies - # TODO(sissel): Why do we have a Timeout here? What's the point? - if @data_timeout == -1 - buf = read(socket) - else - Timeout::timeout(@data_timeout) do - buf = read(socket) + + def run_server(output_queue) + while !stop? + begin + socket = add_connection_socket(server_socket.accept) + # start a new thread for each connection. + server_connection_thread(output_queue, socket) + rescue OpenSSL::SSL::SSLError => e + @logger.error("SSL Error", :exception => e, :backtrace => e.backtrace) + rescue + # if this exception occured while the plugin is stopping + # just ignore and exit + raise unless stop? + end + end + ensure + # catch all rescue nil on close to discard any close errors or invalid socket + server_socket.close rescue nil + end + + def run_client(output_queue) + while !stop? + set_client_socket(TCPSocket.new(@host, @port)) + + if @ssl_enable + set_client_socket(OpenSSL::SSL::SSLSocket.new(client_socket, @ssl_context)) + begin + client_socket.connect + rescue OpenSSL::SSL::SSLError => e + @logger.error("SSL Error", :exception => e, :backtrace => e.backtrace) + sleep(1) # prevent hammering peer + next + rescue + # if this exception occured while the plugin is stopping + # just ignore and exit + raise unless stop? end end - codec.decode(buf) do |event| + + @logger.debug? && @logger.debug("Opened connection", :client => "#{client_socket.peer}") + handle_socket(client_socket, client_socket.peeraddr[3], output_queue, @codec.clone) + end + ensure + # catch all rescue nil on close to discard any close errors or invalid socket + client_socket.close rescue nil + end + + def server_connection_thread(output_queue, socket) + Thread.new(output_queue, socket) do |q, s| + begin + @logger.debug? && @logger.debug("Accepted connection", :client => s.peer, :server => "#{@host}:#{@port}") + handle_socket(s, s.peeraddr[3], q, @codec.clone) + ensure + delete_connection_socket(s) + end + end + end + + def handle_socket(socket, client_address, output_queue, codec) + while !stop? + codec.decode(read(socket)) do |event| event["host"] ||= client_address event["sslsubject"] ||= socket.peer_cert.subject if @ssl_enable && @ssl_verify decorate(event) output_queue << event end - end # loop + end rescue EOFError @logger.debug? && @logger.debug("Connection closed", :client => socket.peer) rescue Errno::ECONNRESET @logger.debug? && @logger.debug("Connection reset by peer", :client => socket.peer) rescue => e - @logger.error("An error occurred. Closing connection", :client => socket.peer, :exception => e, :backtrace => e.backtrace) + # if plugin is stopping, don't bother logging it as an error + @logger.error("An error occurred. Closing connection", :client => socket.peer, :exception => e, :backtrace => e.backtrace) unless stop? ensure + # catch all rescue nil on close to discard any close errors or invalid socket socket.close rescue nil codec.respond_to?(:flush) && codec.flush do |event| event["host"] ||= client_address event["sslsubject"] ||= socket.peer_cert.subject if @ssl_enable && @ssl_verify decorate(event) output_queue << event end end - private - def client_thread(output_queue, socket) - Thread.new(output_queue, socket) do |q, s| - begin - @logger.debug? && @logger.debug("Accepted connection", :client => s.peer, :server => "#{@host}:#{@port}") - handle_socket(s, s.peeraddr[3], q, @codec.clone) - rescue LogStash::ShutdownSignal - @interrupted = true - s.close rescue nil - ensure - @client_threads_lock.synchronize{@client_threads.delete(Thread.current)} - end - end - end - - private def server? @mode == "server" - end # def server? + end - private def read(socket) socket.sysread(16384) - end # def readline + end - public - def run(output_queue) - if server? - run_server(output_queue) - else - run_client(output_queue) - end - end # def run + # threadsafe sockets bookkeeping - def run_server(output_queue) - @client_threads = [] - @client_threads_lock = Mutex.new + def set_client_socket(socket) + @socket_mutex.synchronize{@client_socket = socket} + end - while !@interrupted - begin - socket = @server_socket.accept - # start a new thread for each connection. - @client_threads_lock.synchronize{@client_threads << client_thread(output_queue, socket)} - rescue OpenSSL::SSL::SSLError => ssle - # NOTE(mrichar1): This doesn't return a useful error message for some reason - @logger.error("SSL Error", :exception => ssle, :backtrace => ssle.backtrace) - rescue IOError - raise unless @interrupted - end - end - rescue LogStash::ShutdownSignal - @interrupted = true - ensure - @server_socket.close rescue nil + def client_socket + @socket_mutex.synchronize{@client_socket} + end - threads = @client_threads_lock.synchronize{@client_threads.dup} - threads.each do |thread| - thread.raise(LogStash::ShutdownSignal) if thread.alive? - end - end # def run_server + def set_server_socket(socket) + @socket_mutex.synchronize{@server_socket = socket} + end - def run_client(output_queue) - while !@interrupted - @client_socket = TCPSocket.new(@host, @port) - if @ssl_enable - @client_socket = OpenSSL::SSL::SSLSocket.new(@client_socket, @ssl_context) - begin - @client_socket.connect - rescue OpenSSL::SSL::SSLError => ssle - @logger.error("SSL Error", :exception => ssle, :backtrace => ssle.backtrace) - # NOTE(mrichar1): Hack to prevent hammering peer - sleep(5) - next - end - end - @logger.debug? && @logger.debug("Opened connection", :client => "#{@client_socket.peer}") - handle_socket(@client_socket, @client_socket.peeraddr[3], output_queue, @codec.clone) - end # loop - ensure - @client_socket.close rescue nil - end # def run + def server_socket + @socket_mutex.synchronize{@server_socket} + end - public - def teardown - @interrupted = true - if @server_socket - @server_socket.close rescue nil - @server_socket = nil - end - if @client_socket - @client_socket.close rescue nil - @client_socket = nil - end - end # def teardown -end # class LogStash::Inputs::Tcp + def add_connection_socket(socket) + @socket_mutex.synchronize{@connection_sockets[socket] = true} + socket + end + + def delete_connection_socket(socket) + @socket_mutex.synchronize{@connection_sockets.delete(socket)} + end + + def connection_sockets + @socket_mutex.synchronize{@connection_sockets.keys.dup} + end +end