lib/logstash/inputs/tcp.rb in logstash-input-tcp-1.0.0 vs lib/logstash/inputs/tcp.rb in logstash-input-tcp-2.0.1
- old
+ new
@@ -1,8 +1,7 @@
# encoding: utf-8
require "logstash/inputs/base"
-require "logstash/namespace"
require "logstash/util/socket_peer"
# Read events over a TCP socket.
#
# Like stdin and file inputs, each event is assumed to be one line of text.
@@ -20,15 +19,11 @@
# When mode is `server`, the port to listen on.
# When mode is `client`, the port to connect to.
config :port, :validate => :number, :required => true
- # The 'read' timeout in seconds. If a particular tcp connection is idle for
- # more than this timeout period, we will assume it is dead and close it.
- #
- # If you never want to timeout, use -1.
- config :data_timeout, :validate => :number, :default => -1
+ config :data_timeout, :validate => :number, :default => -1, :deprecated => "This setting is not used by this plugin. It will be removed soon."
# Mode to operate in. `server` listens for client connections,
# `client` connects to a server.
config :mode, :validate => ["server", "client"], :default => "server"
@@ -51,16 +46,18 @@
# SSL key passphrase
config :ssl_key_passphrase, :validate => :password, :default => nil
def initialize(*args)
super(*args)
- @interrupted = false
+
+ # threadsafe socket bookkeeping
@server_socket = nil
@client_socket = nil
- end # def initialize
+ @connection_sockets = {}
+ @socket_mutex = Mutex.new
+ end
- public
def register
require "socket"
require "timeout"
require "openssl"
@@ -84,153 +81,175 @@
@cert_store.add_file(@ssl_cacert)
end
@ssl_context.cert_store = @cert_store
@ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER|OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT
end
- end # @ssl_enable
+ end
+ # note that since we are opening a socket in register, we must also make sure we close it
+ # in the close method even if we also close it in the stop method since we could have
+ # a situation where register is called but not run & stop.
+
if server?
@logger.info("Starting tcp input listener", :address => "#{@host}:#{@port}")
begin
- @server_socket = TCPServer.new(@host, @port)
+ set_server_socket(TCPServer.new(@host, @port))
rescue Errno::EADDRINUSE
@logger.error("Could not start TCP server: Address in use", :host => @host, :port => @port)
raise
end
- if @ssl_enable
- @server_socket = OpenSSL::SSL::SSLServer.new(@server_socket, @ssl_context)
- end # @ssl_enable
+
+ set_server_socket(OpenSSL::SSL::SSLServer.new(server_socket, @ssl_context)) if @ssl_enable
end
- end # def register
+ end
+ def run(output_queue)
+ if server?
+ run_server(output_queue)
+ else
+ run_client(output_queue)
+ end
+ end
+
+ def stop
+ # force close all sockets which will escape any blocking read with a IO exception
+ # and any thread using them will exit.
+ # catch all rescue nil on close to discard any close errors or invalid socket
+ server_socket.close rescue nil
+ client_socket.close rescue nil
+ connection_sockets.each{|socket| socket.close rescue nil}
+ end
+
+ def close
+ # see related comment in register: we must make sure to close the server socket here
+ # because it is created in the register method and we could be in the context of having
+ # register called but never run & stop, only close.
+ # catch all rescue nil on close to discard any close errors or invalid socket
+ server_socket.close rescue nil
+ end
+
private
- def handle_socket(socket, client_address, output_queue, codec)
- while !@interrupted
- buf = nil
- # NOTE(petef): the timeout only hits after the line is read or socket dies
- # TODO(sissel): Why do we have a Timeout here? What's the point?
- if @data_timeout == -1
- buf = read(socket)
- else
- Timeout::timeout(@data_timeout) do
- buf = read(socket)
+
+ def run_server(output_queue)
+ while !stop?
+ begin
+ socket = add_connection_socket(server_socket.accept)
+ # start a new thread for each connection.
+ server_connection_thread(output_queue, socket)
+ rescue OpenSSL::SSL::SSLError => e
+ @logger.error("SSL Error", :exception => e, :backtrace => e.backtrace)
+ rescue
+ # if this exception occured while the plugin is stopping
+ # just ignore and exit
+ raise unless stop?
+ end
+ end
+ ensure
+ # catch all rescue nil on close to discard any close errors or invalid socket
+ server_socket.close rescue nil
+ end
+
+ def run_client(output_queue)
+ while !stop?
+ set_client_socket(TCPSocket.new(@host, @port))
+
+ if @ssl_enable
+ set_client_socket(OpenSSL::SSL::SSLSocket.new(client_socket, @ssl_context))
+ begin
+ client_socket.connect
+ rescue OpenSSL::SSL::SSLError => e
+ @logger.error("SSL Error", :exception => e, :backtrace => e.backtrace)
+ sleep(1) # prevent hammering peer
+ next
+ rescue
+ # if this exception occured while the plugin is stopping
+ # just ignore and exit
+ raise unless stop?
end
end
- codec.decode(buf) do |event|
+
+ @logger.debug? && @logger.debug("Opened connection", :client => "#{client_socket.peer}")
+ handle_socket(client_socket, client_socket.peeraddr[3], output_queue, @codec.clone)
+ end
+ ensure
+ # catch all rescue nil on close to discard any close errors or invalid socket
+ client_socket.close rescue nil
+ end
+
+ def server_connection_thread(output_queue, socket)
+ Thread.new(output_queue, socket) do |q, s|
+ begin
+ @logger.debug? && @logger.debug("Accepted connection", :client => s.peer, :server => "#{@host}:#{@port}")
+ handle_socket(s, s.peeraddr[3], q, @codec.clone)
+ ensure
+ delete_connection_socket(s)
+ end
+ end
+ end
+
+ def handle_socket(socket, client_address, output_queue, codec)
+ while !stop?
+ codec.decode(read(socket)) do |event|
event["host"] ||= client_address
event["sslsubject"] ||= socket.peer_cert.subject if @ssl_enable && @ssl_verify
decorate(event)
output_queue << event
end
- end # loop
+ end
rescue EOFError
@logger.debug? && @logger.debug("Connection closed", :client => socket.peer)
rescue Errno::ECONNRESET
@logger.debug? && @logger.debug("Connection reset by peer", :client => socket.peer)
rescue => e
- @logger.error("An error occurred. Closing connection", :client => socket.peer, :exception => e, :backtrace => e.backtrace)
+ # if plugin is stopping, don't bother logging it as an error
+ @logger.error("An error occurred. Closing connection", :client => socket.peer, :exception => e, :backtrace => e.backtrace) unless stop?
ensure
+ # catch all rescue nil on close to discard any close errors or invalid socket
socket.close rescue nil
codec.respond_to?(:flush) && codec.flush do |event|
event["host"] ||= client_address
event["sslsubject"] ||= socket.peer_cert.subject if @ssl_enable && @ssl_verify
decorate(event)
output_queue << event
end
end
- private
- def client_thread(output_queue, socket)
- Thread.new(output_queue, socket) do |q, s|
- begin
- @logger.debug? && @logger.debug("Accepted connection", :client => s.peer, :server => "#{@host}:#{@port}")
- handle_socket(s, s.peeraddr[3], q, @codec.clone)
- rescue LogStash::ShutdownSignal
- @interrupted = true
- s.close rescue nil
- ensure
- @client_threads_lock.synchronize{@client_threads.delete(Thread.current)}
- end
- end
- end
-
- private
def server?
@mode == "server"
- end # def server?
+ end
- private
def read(socket)
socket.sysread(16384)
- end # def readline
+ end
- public
- def run(output_queue)
- if server?
- run_server(output_queue)
- else
- run_client(output_queue)
- end
- end # def run
+ # threadsafe sockets bookkeeping
- def run_server(output_queue)
- @client_threads = []
- @client_threads_lock = Mutex.new
+ def set_client_socket(socket)
+ @socket_mutex.synchronize{@client_socket = socket}
+ end
- while !@interrupted
- begin
- socket = @server_socket.accept
- # start a new thread for each connection.
- @client_threads_lock.synchronize{@client_threads << client_thread(output_queue, socket)}
- rescue OpenSSL::SSL::SSLError => ssle
- # NOTE(mrichar1): This doesn't return a useful error message for some reason
- @logger.error("SSL Error", :exception => ssle, :backtrace => ssle.backtrace)
- rescue IOError
- raise unless @interrupted
- end
- end
- rescue LogStash::ShutdownSignal
- @interrupted = true
- ensure
- @server_socket.close rescue nil
+ def client_socket
+ @socket_mutex.synchronize{@client_socket}
+ end
- threads = @client_threads_lock.synchronize{@client_threads.dup}
- threads.each do |thread|
- thread.raise(LogStash::ShutdownSignal) if thread.alive?
- end
- end # def run_server
+ def set_server_socket(socket)
+ @socket_mutex.synchronize{@server_socket = socket}
+ end
- def run_client(output_queue)
- while !@interrupted
- @client_socket = TCPSocket.new(@host, @port)
- if @ssl_enable
- @client_socket = OpenSSL::SSL::SSLSocket.new(@client_socket, @ssl_context)
- begin
- @client_socket.connect
- rescue OpenSSL::SSL::SSLError => ssle
- @logger.error("SSL Error", :exception => ssle, :backtrace => ssle.backtrace)
- # NOTE(mrichar1): Hack to prevent hammering peer
- sleep(5)
- next
- end
- end
- @logger.debug? && @logger.debug("Opened connection", :client => "#{@client_socket.peer}")
- handle_socket(@client_socket, @client_socket.peeraddr[3], output_queue, @codec.clone)
- end # loop
- ensure
- @client_socket.close rescue nil
- end # def run
+ def server_socket
+ @socket_mutex.synchronize{@server_socket}
+ end
- public
- def teardown
- @interrupted = true
- if @server_socket
- @server_socket.close rescue nil
- @server_socket = nil
- end
- if @client_socket
- @client_socket.close rescue nil
- @client_socket = nil
- end
- end # def teardown
-end # class LogStash::Inputs::Tcp
+ def add_connection_socket(socket)
+ @socket_mutex.synchronize{@connection_sockets[socket] = true}
+ socket
+ end
+
+ def delete_connection_socket(socket)
+ @socket_mutex.synchronize{@connection_sockets.delete(socket)}
+ end
+
+ def connection_sockets
+ @socket_mutex.synchronize{@connection_sockets.keys.dup}
+ end
+end