lib/logstash/inputs/tcp.rb in logstash-input-tcp-2.0.5 vs lib/logstash/inputs/tcp.rb in logstash-input-tcp-3.0.0

- old
+ new

@@ -33,24 +33,28 @@ # Enable SSL (must be set for other `ssl_` options to take effect). config :ssl_enable, :validate => :boolean, :default => false # Verify the identity of the other end of the SSL connection against the CA. # For input, sets the field `sslsubject` to that of the client certificate. - config :ssl_verify, :validate => :boolean, :default => false + config :ssl_verify, :validate => :boolean, :default => true # The SSL CA certificate, chainfile or CA path. The system CA path is automatically included. - config :ssl_cacert, :validate => :path + config :ssl_cacert, :validate => :path, :deprecated => "This setting is deprecated in favor of extra_chain_cert as it sets a more clear expectation to add more X509 certificates to the store" # SSL certificate path config :ssl_cert, :validate => :path # SSL key path config :ssl_key, :validate => :path # SSL key passphrase config :ssl_key_passphrase, :validate => :password, :default => nil + # An Array of extra X509 certificates to be added to the certificate chain. + # Useful when the CA chain is not necessary in the system store. + config :ssl_extra_chain_certs, :validate => :array, :default => [] + def initialize(*args) super(*args) # monkey patch TCPSocket and SSLSocket to include socket peer TCPSocket.module_eval{include ::LogStash::Util::SocketPeer} @@ -124,32 +128,33 @@ end def run_client(output_queue) while !stop? self.client_socket = new_client_socket - handle_socket(client_socket, client_socket.peeraddr[3], output_queue, @codec.clone) + handle_socket(client_socket, client_socket.peeraddr[3], client_socket.peeraddr[1], output_queue, @codec.clone) end ensure # catch all rescue nil on close to discard any close errors or invalid socket client_socket.close rescue nil end def server_connection_thread(output_queue, socket) Thread.new(output_queue, socket) do |q, s| begin @logger.debug? && @logger.debug("Accepted connection", :client => s.peer, :server => "#{@host}:#{@port}") - handle_socket(s, s.peeraddr[3], q, @codec.clone) + handle_socket(s, s.peeraddr[3], s.peeraddr[1], q, @codec.clone) ensure delete_connection_socket(s) end end end - def handle_socket(socket, client_address, output_queue, codec) + def handle_socket(socket, client_address, client_port, output_queue, codec) while !stop? codec.decode(read(socket)) do |event| event["host"] ||= client_address + event["port"] ||= client_port event["sslsubject"] ||= socket.peer_cert.subject if @ssl_enable && @ssl_verify decorate(event) output_queue << event end end @@ -168,16 +173,32 @@ # catch all rescue nil on close to discard any close errors or invalid socket socket.close rescue nil codec.respond_to?(:flush) && codec.flush do |event| event["host"] ||= client_address + event["port"] ||= client_port event["sslsubject"] ||= socket.peer_cert.subject if @ssl_enable && @ssl_verify decorate(event) output_queue << event end end + private + def client_thread(output_queue, socket) + Thread.new(output_queue, socket) do |q, s| + begin + @logger.debug? && @logger.debug("Accepted connection", :client => s.peer, :server => "#{@host}:#{@port}") + handle_socket(s, s.peeraddr[3], s.peeraddr[1], q, @codec.clone) + rescue Interrupted + s.close rescue nil + ensure + @client_threads_lock.synchronize{@client_threads.delete(Thread.current)} + end + end + end + + private def server? @mode == "server" end def read(socket) @@ -190,31 +211,36 @@ begin @ssl_context = OpenSSL::SSL::SSLContext.new @ssl_context.cert = OpenSSL::X509::Certificate.new(File.read(@ssl_cert)) @ssl_context.key = OpenSSL::PKey::RSA.new(File.read(@ssl_key),@ssl_key_passphrase) if @ssl_verify - @cert_store = OpenSSL::X509::Store.new - # Load the system default certificate path to the store - @cert_store.set_default_paths - if File.directory?(@ssl_cacert) - @cert_store.add_path(@ssl_cacert) - else - @cert_store.add_file(@ssl_cacert) - end - @ssl_context.cert_store = @cert_store + @ssl_context.cert_store = load_cert_store @ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER|OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT end rescue => e @logger.error("Could not inititalize SSL context", :exception => e, :backtrace => e.backtrace) raise e end @ssl_context end + def load_cert_store + cert_store = OpenSSL::X509::Store.new + cert_store.set_default_paths + if File.directory?(@ssl_cacert) + cert_store.add_path(@ssl_cacert) + else + cert_store.add_file(@ssl_cacert) + end if @ssl_cacert + @ssl_extra_chain_certs.each do |cert| + cert_store.add_file(cert) + end + cert_store + end + def new_server_socket @logger.info("Starting tcp input listener", :address => "#{@host}:#{@port}") - begin socket = TCPServer.new(@host, @port) rescue Errno::EADDRINUSE @logger.error("Could not start TCP server: Address in use", :host => @host, :port => @port) raise