lib/logstash_writer.rb in logstash_writer-0.0.4 vs lib/logstash_writer.rb in logstash_writer-0.0.5
- old
+ new
@@ -181,11 +181,11 @@
@worker_thread.join
rescue Exception => ex
@logger.error("LogstashWriter") { (["Worker thread terminated with exception: #{ex.message} (#{ex.class})"] + ex.backtrace).join("\n ") }
end
@worker_thread = nil
- @socket_mutex.synchronize { (@current_socket.close; @current_socket = nil) if @current_socket }
+ @socket_mutex.synchronize { (@current_target.close; @current_target = nil) if @current_target }
end
end
nil
end
@@ -199,15 +199,15 @@
#
# @return [NilClass]
#
def force_disconnect!
@socket_mutex.synchronize do
- return if @current_socket.nil?
+ return if @current_target.nil?
- @logger.info("LogstashWriter") { "Forced disconnect from #{describe_peer(@current_socket) }" }
- @current_socket.close if @current_socket
- @current_socket = nil
+ @logger.info("LogstashWriter") { "Forced disconnect from #{@current_target.describe_peer}" }
+ @current_target.close
+ @current_target = nil
end
nil
end
@@ -235,24 +235,18 @@
end
event = @queue.shift
end
- current_socket do |s|
- s.puts event[:content].to_json
- stat_sent(describe_peer(s), event[:arrival_timestamp])
+ current_target do |t|
+ t.socket.puts event[:content].to_json
+ stat_sent(t.to_s, event[:arrival_timestamp])
@metrics[:write_loop_ok].set({}, 1)
error_wait = INITIAL_RETRY_WAIT
end
rescue StandardError => ex
@logger.error("LogstashWriter") { (["Exception in write_loop: #{ex.message} (#{ex.class})"] + ex.backtrace).join("\n ") }
- # If there was some sort of error, there's a non-trivial chance the
- # socket has gone *boom*, so let's invalidate it and go around again
- if @current_socket
- @current_socket.close
- @current_socket = nil
- end
@queue_mutex.synchronize { @queue.unshift(event) if event }
@metrics[:write_loop_exception].increment(class: ex.class.to_s)
@metrics[:write_loop_ok].set({}, 0)
sleep error_wait
# Increase the error wait timeout for next time, up to a maximum
@@ -263,39 +257,45 @@
end
end
end
end
- # Yield a TCPSocket connected to the server we currently believe to be
+ # Yield a Target connected to the server we currently believe to be
# accepting log entries, so that something can send log entries to it.
#
# The yielding allows us to centralise all error detection and handling
# within this one method, and retry sending just by calling `yield` again
# when we've connected to another server.
#
- def current_socket
+ def current_target
# This could all be handled more cleanly with recursion, but I don't
# want to fill the stack if we have to retry a lot of times. Also
# can't just use `retry` because not all of the "go around again"
# conditions are due to exceptions.
done = false
until done
@socket_mutex.synchronize do
- if @current_socket
+ if @current_target
begin
- yield @current_socket
- @metrics[:connected].set({}, 1)
+ # Check that our socket is still good to go; if we don't do this,
+ # the other end can disconnect, and because we're never normally
+ # reading from the socket, we never get the EOFError that normally
+ # results, and so the socket remains in CLOSE_WAIT state *forever*.
+ @current_target.socket.read_nonblock(1)
+
+ yield @current_target
+ @metrics[:connected].set({ server: @current_target.describe_peer }, 1)
done = true
- rescue SystemCallError => ex
+ rescue SystemCallError, IOError => ex
# Something went wrong during the send; disconnect from this
# server and recycle
- @metrics[:write_exception].increment(server: describe_peer(@current_socket), class: ex.class.to_s)
- @logger.info("LogstashWriter") { "Error while writing to current server: #{ex.message} (#{ex.class})" }
- @current_socket.close
- @current_socket = nil
- @metrics[:connected].set({}, 0)
+ @metrics[:write_exception].increment(server: @current_target.describe_peer, class: ex.class.to_s)
+ @metrics[:connected].set({ server: @current_target.describe_peer }, 0)
+ @logger.info("LogstashWriter") { "Error while writing to current server #{@current_target.describe_peer}: #{ex.message} (#{ex.class})" }
+ @current_target.close
+ @current_target = nil
sleep INITIAL_RETRY_WAIT
end
else
retry_delay = INITIAL_RETRY_WAIT * 10
@@ -311,14 +311,16 @@
begin
next_server = candidates.shift
if next_server
@logger.debug("LogstashWriter") { "Trying to connect to #{next_server.to_s}" }
- @current_socket = next_server.socket
+ @current_target = next_server
+ # Trigger a connection attempt
+ @current_target.socket
else
@logger.debug("LogstashWriter") { "Could not connect to any server; pausing before trying again" }
- @current_socket = nil
+ @current_target = nil
sleep retry_delay
# Calculate a longer retry delay next time we fail to connect
# to every server in the list, up to a maximum of (roughly) 60
# seconds.
@@ -339,22 +341,10 @@
end
end
end
end
- # Generate a human-readable description of the remote end of the given
- # socket.
- #
- def describe_peer(s)
- pa = s.peeraddr
- if pa[0] == "AF_INET6"
- "[#{pa[3]}]:#{pa[1]}"
- else
- "#{pa[3]}:#{pa[1]}"
- end
- end
-
# Turn the server_name given in the constructor into a list of Target
# objects, suitable for iterating through to find someone to talk to.
#
def resolve_server_name
return [static_target] if static_target
@@ -487,18 +477,56 @@
#
# @raise [SystemCallError] if connection cannot be established
# for any reason.
#
def socket
- TCPSocket.new(@addr, @port)
+ @socket ||= TCPSocket.new(@addr, @port)
end
+ # Shut down the connection.
+ #
+ # @return [NilClass]
+ #
+ def close
+ @socket.close if @socket
+ @socket = nil
+ @describe_peer = nil
+ end
+
# Simple string representation of the target.
#
# @return [String]
#
def to_s
"#{@addr}:#{@port}"
+ end
+
+ # Provide as accurate a representation of what we're *actually* connected
+ # to as we can, given the constraints of whether we're connected.
+ #
+ # To prevent unpleasantness when the other end disconnects but we still
+ # want to know who we *were* connected to, we cache the result of our
+ # cogitations. Just in case.
+ #
+ # @return [String]
+ #
+ def describe_peer
+ @describe_peer ||= begin
+ if @socket
+ pa = @socket.peeraddr
+ if pa[0] == "AF_INET6"
+ "[#{pa[3]}]:#{pa[1]}"
+ else
+ "#{pa[3]}:#{pa[1]}"
+ end
+ else
+ nil
+ end
+ rescue Errno::ENOTCONN
+ # Peer disconnected apparently means "I forgot who I was connected
+ # to"... ¯\_(ツ)_/¯
+ nil
+ end || to_s
end
end
private_constant :Target
end