lib/logstash/inputs/tcp.rb in logstash-input-tcp-2.0.1 vs lib/logstash/inputs/tcp.rb in logstash-input-tcp-2.0.2
- old
+ new
@@ -1,9 +1,12 @@
# encoding: utf-8
require "logstash/inputs/base"
require "logstash/util/socket_peer"
+require "socket"
+require "openssl"
+
# Read events over a TCP socket.
#
# Like stdin and file inputs, each event is assumed to be one line of text.
#
# Can either accept connections from clients or connect to a server,
@@ -47,61 +50,31 @@
config :ssl_key_passphrase, :validate => :password, :default => nil
def initialize(*args)
super(*args)
+ # monkey patch TCPSocket and SSLSocket to include socket peer
+ TCPSocket.module_eval{include ::LogStash::Util::SocketPeer}
+ OpenSSL::SSL::SSLSocket.module_eval{include ::LogStash::Util::SocketPeer}
+
# threadsafe socket bookkeeping
@server_socket = nil
@client_socket = nil
@connection_sockets = {}
@socket_mutex = Mutex.new
+
+ @ssl_context = nil
end
def register
- require "socket"
- require "timeout"
- require "openssl"
-
- # monkey patch TCPSocket and SSLSocket to include socket peer
- TCPSocket.module_eval{include ::LogStash::Util::SocketPeer}
- OpenSSL::SSL::SSLSocket.module_eval{include ::LogStash::Util::SocketPeer}
-
fix_streaming_codecs
- if @ssl_enable
- @ssl_context = OpenSSL::SSL::SSLContext.new
- @ssl_context.cert = OpenSSL::X509::Certificate.new(File.read(@ssl_cert))
- @ssl_context.key = OpenSSL::PKey::RSA.new(File.read(@ssl_key),@ssl_key_passphrase)
- if @ssl_verify
- @cert_store = OpenSSL::X509::Store.new
- # Load the system default certificate path to the store
- @cert_store.set_default_paths
- if File.directory?(@ssl_cacert)
- @cert_store.add_path(@ssl_cacert)
- else
- @cert_store.add_file(@ssl_cacert)
- end
- @ssl_context.cert_store = @cert_store
- @ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER|OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT
- end
- end
-
# note that since we are opening a socket in register, we must also make sure we close it
# in the close method even if we also close it in the stop method since we could have
# a situation where register is called but not run & stop.
- if server?
- @logger.info("Starting tcp input listener", :address => "#{@host}:#{@port}")
- begin
- set_server_socket(TCPServer.new(@host, @port))
- rescue Errno::EADDRINUSE
- @logger.error("Could not start TCP server: Address in use", :host => @host, :port => @port)
- raise
- end
-
- set_server_socket(OpenSSL::SSL::SSLServer.new(server_socket, @ssl_context)) if @ssl_enable
- end
+ self.server_socket = new_server_socket if server?
end
def run(output_queue)
if server?
run_server(output_queue)
@@ -134,42 +107,27 @@
begin
socket = add_connection_socket(server_socket.accept)
# start a new thread for each connection.
server_connection_thread(output_queue, socket)
rescue OpenSSL::SSL::SSLError => e
+ # log error, close socket, accept next connection
@logger.error("SSL Error", :exception => e, :backtrace => e.backtrace)
- rescue
+ socket.close rescue nil
+ rescue => e
# if this exception occured while the plugin is stopping
# just ignore and exit
- raise unless stop?
+ raise e unless stop?
end
end
ensure
# catch all rescue nil on close to discard any close errors or invalid socket
server_socket.close rescue nil
end
def run_client(output_queue)
while !stop?
- set_client_socket(TCPSocket.new(@host, @port))
-
- if @ssl_enable
- set_client_socket(OpenSSL::SSL::SSLSocket.new(client_socket, @ssl_context))
- begin
- client_socket.connect
- rescue OpenSSL::SSL::SSLError => e
- @logger.error("SSL Error", :exception => e, :backtrace => e.backtrace)
- sleep(1) # prevent hammering peer
- next
- rescue
- # if this exception occured while the plugin is stopping
- # just ignore and exit
- raise unless stop?
- end
- end
-
- @logger.debug? && @logger.debug("Opened connection", :client => "#{client_socket.peer}")
+ self.client_socket = new_client_socket
handle_socket(client_socket, client_socket.peeraddr[3], output_queue, @codec.clone)
end
ensure
# catch all rescue nil on close to discard any close errors or invalid socket
client_socket.close rescue nil
@@ -199,11 +157,11 @@
@logger.debug? && @logger.debug("Connection closed", :client => socket.peer)
rescue Errno::ECONNRESET
@logger.debug? && @logger.debug("Connection reset by peer", :client => socket.peer)
rescue => e
# if plugin is stopping, don't bother logging it as an error
- @logger.error("An error occurred. Closing connection", :client => socket.peer, :exception => e, :backtrace => e.backtrace) unless stop?
+ !stop? && @logger.error("An error occurred. Closing connection", :client => socket.peer, :exception => e, :backtrace => e.backtrace)
ensure
# catch all rescue nil on close to discard any close errors or invalid socket
socket.close rescue nil
codec.respond_to?(:flush) && codec.flush do |event|
@@ -220,20 +178,83 @@
def read(socket)
socket.sysread(16384)
end
+ def ssl_context
+ return @ssl_context if @ssl_context
+
+ begin
+ @ssl_context = OpenSSL::SSL::SSLContext.new
+ @ssl_context.cert = OpenSSL::X509::Certificate.new(File.read(@ssl_cert))
+ @ssl_context.key = OpenSSL::PKey::RSA.new(File.read(@ssl_key),@ssl_key_passphrase)
+ if @ssl_verify
+ @cert_store = OpenSSL::X509::Store.new
+ # Load the system default certificate path to the store
+ @cert_store.set_default_paths
+ if File.directory?(@ssl_cacert)
+ @cert_store.add_path(@ssl_cacert)
+ else
+ @cert_store.add_file(@ssl_cacert)
+ end
+ @ssl_context.cert_store = @cert_store
+ @ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER|OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT
+ end
+ rescue => e
+ @logger.error("Could not inititalize SSL context", :exception => e, :backtrace => e.backtrace)
+ raise e
+ end
+
+ @ssl_context
+ end
+
+ def new_server_socket
+ @logger.info("Starting tcp input listener", :address => "#{@host}:#{@port}")
+
+ begin
+ socket = TCPServer.new(@host, @port)
+ rescue Errno::EADDRINUSE
+ @logger.error("Could not start TCP server: Address in use", :host => @host, :port => @port)
+ raise
+ end
+
+ @ssl_enable ? OpenSSL::SSL::SSLServer.new(socket, ssl_context) : socket
+ end
+
+ def new_client_socket
+ socket = TCPSocket.new(@host, @port)
+
+ if @ssl_enable
+ socket = OpenSSL::SSL::SSLSocket.new(socket, ssl_context)
+ socket.connect
+ end
+
+ @logger.debug? && @logger.debug("Opened connection", :client => "#{socket.peer}")
+
+ socket
+ rescue OpenSSL::SSL::SSLError => e
+ @logger.error("SSL Error", :exception => e, :backtrace => e.backtrace)
+ # catch all rescue nil on close to discard any close errors or invalid socket
+ socket.close rescue nil
+ sleep(1) # prevent hammering peer
+ retry
+ rescue
+ # if this exception occured while the plugin is stopping
+ # just ignore and exit
+ raise unless stop?
+ end
+
# threadsafe sockets bookkeeping
- def set_client_socket(socket)
+ def client_socket=(socket)
@socket_mutex.synchronize{@client_socket = socket}
end
def client_socket
@socket_mutex.synchronize{@client_socket}
end
- def set_server_socket(socket)
+ def server_socket=(socket)
@socket_mutex.synchronize{@server_socket = socket}
end
def server_socket
@socket_mutex.synchronize{@server_socket}