lib/logstash/inputs/tcp.rb in logstash-input-tcp-2.0.5 vs lib/logstash/inputs/tcp.rb in logstash-input-tcp-3.0.0
- old
+ new
@@ -33,24 +33,28 @@
# Enable SSL (must be set for other `ssl_` options to take effect).
config :ssl_enable, :validate => :boolean, :default => false
# Verify the identity of the other end of the SSL connection against the CA.
# For input, sets the field `sslsubject` to that of the client certificate.
- config :ssl_verify, :validate => :boolean, :default => false
+ config :ssl_verify, :validate => :boolean, :default => true
# The SSL CA certificate, chainfile or CA path. The system CA path is automatically included.
- config :ssl_cacert, :validate => :path
+ config :ssl_cacert, :validate => :path, :deprecated => "This setting is deprecated in favor of extra_chain_cert as it sets a more clear expectation to add more X509 certificates to the store"
# SSL certificate path
config :ssl_cert, :validate => :path
# SSL key path
config :ssl_key, :validate => :path
# SSL key passphrase
config :ssl_key_passphrase, :validate => :password, :default => nil
+ # An Array of extra X509 certificates to be added to the certificate chain.
+ # Useful when the CA chain is not necessary in the system store.
+ config :ssl_extra_chain_certs, :validate => :array, :default => []
+
def initialize(*args)
super(*args)
# monkey patch TCPSocket and SSLSocket to include socket peer
TCPSocket.module_eval{include ::LogStash::Util::SocketPeer}
@@ -124,32 +128,33 @@
end
def run_client(output_queue)
while !stop?
self.client_socket = new_client_socket
- handle_socket(client_socket, client_socket.peeraddr[3], output_queue, @codec.clone)
+ handle_socket(client_socket, client_socket.peeraddr[3], client_socket.peeraddr[1], output_queue, @codec.clone)
end
ensure
# catch all rescue nil on close to discard any close errors or invalid socket
client_socket.close rescue nil
end
def server_connection_thread(output_queue, socket)
Thread.new(output_queue, socket) do |q, s|
begin
@logger.debug? && @logger.debug("Accepted connection", :client => s.peer, :server => "#{@host}:#{@port}")
- handle_socket(s, s.peeraddr[3], q, @codec.clone)
+ handle_socket(s, s.peeraddr[3], s.peeraddr[1], q, @codec.clone)
ensure
delete_connection_socket(s)
end
end
end
- def handle_socket(socket, client_address, output_queue, codec)
+ def handle_socket(socket, client_address, client_port, output_queue, codec)
while !stop?
codec.decode(read(socket)) do |event|
event["host"] ||= client_address
+ event["port"] ||= client_port
event["sslsubject"] ||= socket.peer_cert.subject if @ssl_enable && @ssl_verify
decorate(event)
output_queue << event
end
end
@@ -168,16 +173,32 @@
# catch all rescue nil on close to discard any close errors or invalid socket
socket.close rescue nil
codec.respond_to?(:flush) && codec.flush do |event|
event["host"] ||= client_address
+ event["port"] ||= client_port
event["sslsubject"] ||= socket.peer_cert.subject if @ssl_enable && @ssl_verify
decorate(event)
output_queue << event
end
end
+ private
+ def client_thread(output_queue, socket)
+ Thread.new(output_queue, socket) do |q, s|
+ begin
+ @logger.debug? && @logger.debug("Accepted connection", :client => s.peer, :server => "#{@host}:#{@port}")
+ handle_socket(s, s.peeraddr[3], s.peeraddr[1], q, @codec.clone)
+ rescue Interrupted
+ s.close rescue nil
+ ensure
+ @client_threads_lock.synchronize{@client_threads.delete(Thread.current)}
+ end
+ end
+ end
+
+ private
def server?
@mode == "server"
end
def read(socket)
@@ -190,31 +211,36 @@
begin
@ssl_context = OpenSSL::SSL::SSLContext.new
@ssl_context.cert = OpenSSL::X509::Certificate.new(File.read(@ssl_cert))
@ssl_context.key = OpenSSL::PKey::RSA.new(File.read(@ssl_key),@ssl_key_passphrase)
if @ssl_verify
- @cert_store = OpenSSL::X509::Store.new
- # Load the system default certificate path to the store
- @cert_store.set_default_paths
- if File.directory?(@ssl_cacert)
- @cert_store.add_path(@ssl_cacert)
- else
- @cert_store.add_file(@ssl_cacert)
- end
- @ssl_context.cert_store = @cert_store
+ @ssl_context.cert_store = load_cert_store
@ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER|OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT
end
rescue => e
@logger.error("Could not inititalize SSL context", :exception => e, :backtrace => e.backtrace)
raise e
end
@ssl_context
end
+ def load_cert_store
+ cert_store = OpenSSL::X509::Store.new
+ cert_store.set_default_paths
+ if File.directory?(@ssl_cacert)
+ cert_store.add_path(@ssl_cacert)
+ else
+ cert_store.add_file(@ssl_cacert)
+ end if @ssl_cacert
+ @ssl_extra_chain_certs.each do |cert|
+ cert_store.add_file(cert)
+ end
+ cert_store
+ end
+
def new_server_socket
@logger.info("Starting tcp input listener", :address => "#{@host}:#{@port}")
-
begin
socket = TCPServer.new(@host, @port)
rescue Errno::EADDRINUSE
@logger.error("Could not start TCP server: Address in use", :host => @host, :port => @port)
raise