lib/logstash/inputs/tcp.rb in logstash-input-tcp-2.0.1 vs lib/logstash/inputs/tcp.rb in logstash-input-tcp-2.0.2

- old
+ new

@@ -1,9 +1,12 @@ # encoding: utf-8 require "logstash/inputs/base" require "logstash/util/socket_peer" +require "socket" +require "openssl" + # Read events over a TCP socket. # # Like stdin and file inputs, each event is assumed to be one line of text. # # Can either accept connections from clients or connect to a server, @@ -47,61 +50,31 @@ config :ssl_key_passphrase, :validate => :password, :default => nil def initialize(*args) super(*args) + # monkey patch TCPSocket and SSLSocket to include socket peer + TCPSocket.module_eval{include ::LogStash::Util::SocketPeer} + OpenSSL::SSL::SSLSocket.module_eval{include ::LogStash::Util::SocketPeer} + # threadsafe socket bookkeeping @server_socket = nil @client_socket = nil @connection_sockets = {} @socket_mutex = Mutex.new + + @ssl_context = nil end def register - require "socket" - require "timeout" - require "openssl" - - # monkey patch TCPSocket and SSLSocket to include socket peer - TCPSocket.module_eval{include ::LogStash::Util::SocketPeer} - OpenSSL::SSL::SSLSocket.module_eval{include ::LogStash::Util::SocketPeer} - fix_streaming_codecs - if @ssl_enable - @ssl_context = OpenSSL::SSL::SSLContext.new - @ssl_context.cert = OpenSSL::X509::Certificate.new(File.read(@ssl_cert)) - @ssl_context.key = OpenSSL::PKey::RSA.new(File.read(@ssl_key),@ssl_key_passphrase) - if @ssl_verify - @cert_store = OpenSSL::X509::Store.new - # Load the system default certificate path to the store - @cert_store.set_default_paths - if File.directory?(@ssl_cacert) - @cert_store.add_path(@ssl_cacert) - else - @cert_store.add_file(@ssl_cacert) - end - @ssl_context.cert_store = @cert_store - @ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER|OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT - end - end - # note that since we are opening a socket in register, we must also make sure we close it # in the close method even if we also close it in the stop method since we could have # a situation where register is called but not run & stop. - if server? - @logger.info("Starting tcp input listener", :address => "#{@host}:#{@port}") - begin - set_server_socket(TCPServer.new(@host, @port)) - rescue Errno::EADDRINUSE - @logger.error("Could not start TCP server: Address in use", :host => @host, :port => @port) - raise - end - - set_server_socket(OpenSSL::SSL::SSLServer.new(server_socket, @ssl_context)) if @ssl_enable - end + self.server_socket = new_server_socket if server? end def run(output_queue) if server? run_server(output_queue) @@ -134,42 +107,27 @@ begin socket = add_connection_socket(server_socket.accept) # start a new thread for each connection. server_connection_thread(output_queue, socket) rescue OpenSSL::SSL::SSLError => e + # log error, close socket, accept next connection @logger.error("SSL Error", :exception => e, :backtrace => e.backtrace) - rescue + socket.close rescue nil + rescue => e # if this exception occured while the plugin is stopping # just ignore and exit - raise unless stop? + raise e unless stop? end end ensure # catch all rescue nil on close to discard any close errors or invalid socket server_socket.close rescue nil end def run_client(output_queue) while !stop? - set_client_socket(TCPSocket.new(@host, @port)) - - if @ssl_enable - set_client_socket(OpenSSL::SSL::SSLSocket.new(client_socket, @ssl_context)) - begin - client_socket.connect - rescue OpenSSL::SSL::SSLError => e - @logger.error("SSL Error", :exception => e, :backtrace => e.backtrace) - sleep(1) # prevent hammering peer - next - rescue - # if this exception occured while the plugin is stopping - # just ignore and exit - raise unless stop? - end - end - - @logger.debug? && @logger.debug("Opened connection", :client => "#{client_socket.peer}") + self.client_socket = new_client_socket handle_socket(client_socket, client_socket.peeraddr[3], output_queue, @codec.clone) end ensure # catch all rescue nil on close to discard any close errors or invalid socket client_socket.close rescue nil @@ -199,11 +157,11 @@ @logger.debug? && @logger.debug("Connection closed", :client => socket.peer) rescue Errno::ECONNRESET @logger.debug? && @logger.debug("Connection reset by peer", :client => socket.peer) rescue => e # if plugin is stopping, don't bother logging it as an error - @logger.error("An error occurred. Closing connection", :client => socket.peer, :exception => e, :backtrace => e.backtrace) unless stop? + !stop? && @logger.error("An error occurred. Closing connection", :client => socket.peer, :exception => e, :backtrace => e.backtrace) ensure # catch all rescue nil on close to discard any close errors or invalid socket socket.close rescue nil codec.respond_to?(:flush) && codec.flush do |event| @@ -220,20 +178,83 @@ def read(socket) socket.sysread(16384) end + def ssl_context + return @ssl_context if @ssl_context + + begin + @ssl_context = OpenSSL::SSL::SSLContext.new + @ssl_context.cert = OpenSSL::X509::Certificate.new(File.read(@ssl_cert)) + @ssl_context.key = OpenSSL::PKey::RSA.new(File.read(@ssl_key),@ssl_key_passphrase) + if @ssl_verify + @cert_store = OpenSSL::X509::Store.new + # Load the system default certificate path to the store + @cert_store.set_default_paths + if File.directory?(@ssl_cacert) + @cert_store.add_path(@ssl_cacert) + else + @cert_store.add_file(@ssl_cacert) + end + @ssl_context.cert_store = @cert_store + @ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER|OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT + end + rescue => e + @logger.error("Could not inititalize SSL context", :exception => e, :backtrace => e.backtrace) + raise e + end + + @ssl_context + end + + def new_server_socket + @logger.info("Starting tcp input listener", :address => "#{@host}:#{@port}") + + begin + socket = TCPServer.new(@host, @port) + rescue Errno::EADDRINUSE + @logger.error("Could not start TCP server: Address in use", :host => @host, :port => @port) + raise + end + + @ssl_enable ? OpenSSL::SSL::SSLServer.new(socket, ssl_context) : socket + end + + def new_client_socket + socket = TCPSocket.new(@host, @port) + + if @ssl_enable + socket = OpenSSL::SSL::SSLSocket.new(socket, ssl_context) + socket.connect + end + + @logger.debug? && @logger.debug("Opened connection", :client => "#{socket.peer}") + + socket + rescue OpenSSL::SSL::SSLError => e + @logger.error("SSL Error", :exception => e, :backtrace => e.backtrace) + # catch all rescue nil on close to discard any close errors or invalid socket + socket.close rescue nil + sleep(1) # prevent hammering peer + retry + rescue + # if this exception occured while the plugin is stopping + # just ignore and exit + raise unless stop? + end + # threadsafe sockets bookkeeping - def set_client_socket(socket) + def client_socket=(socket) @socket_mutex.synchronize{@client_socket = socket} end def client_socket @socket_mutex.synchronize{@client_socket} end - def set_server_socket(socket) + def server_socket=(socket) @socket_mutex.synchronize{@server_socket = socket} end def server_socket @socket_mutex.synchronize{@server_socket}