lib/logstash_writer.rb in logstash_writer-0.0.4 vs lib/logstash_writer.rb in logstash_writer-0.0.5

- old
+ new

@@ -181,11 +181,11 @@ @worker_thread.join rescue Exception => ex @logger.error("LogstashWriter") { (["Worker thread terminated with exception: #{ex.message} (#{ex.class})"] + ex.backtrace).join("\n ") } end @worker_thread = nil - @socket_mutex.synchronize { (@current_socket.close; @current_socket = nil) if @current_socket } + @socket_mutex.synchronize { (@current_target.close; @current_target = nil) if @current_target } end end nil end @@ -199,15 +199,15 @@ # # @return [NilClass] # def force_disconnect! @socket_mutex.synchronize do - return if @current_socket.nil? + return if @current_target.nil? - @logger.info("LogstashWriter") { "Forced disconnect from #{describe_peer(@current_socket) }" } - @current_socket.close if @current_socket - @current_socket = nil + @logger.info("LogstashWriter") { "Forced disconnect from #{@current_target.describe_peer}" } + @current_target.close + @current_target = nil end nil end @@ -235,24 +235,18 @@ end event = @queue.shift end - current_socket do |s| - s.puts event[:content].to_json - stat_sent(describe_peer(s), event[:arrival_timestamp]) + current_target do |t| + t.socket.puts event[:content].to_json + stat_sent(t.to_s, event[:arrival_timestamp]) @metrics[:write_loop_ok].set({}, 1) error_wait = INITIAL_RETRY_WAIT end rescue StandardError => ex @logger.error("LogstashWriter") { (["Exception in write_loop: #{ex.message} (#{ex.class})"] + ex.backtrace).join("\n ") } - # If there was some sort of error, there's a non-trivial chance the - # socket has gone *boom*, so let's invalidate it and go around again - if @current_socket - @current_socket.close - @current_socket = nil - end @queue_mutex.synchronize { @queue.unshift(event) if event } @metrics[:write_loop_exception].increment(class: ex.class.to_s) @metrics[:write_loop_ok].set({}, 0) sleep error_wait # Increase the error wait timeout for next time, up to a maximum @@ -263,39 +257,45 @@ end end end end - # Yield a TCPSocket connected to the server we currently believe to be + # Yield a Target connected to the server we currently believe to be # accepting log entries, so that something can send log entries to it. # # The yielding allows us to centralise all error detection and handling # within this one method, and retry sending just by calling `yield` again # when we've connected to another server. # - def current_socket + def current_target # This could all be handled more cleanly with recursion, but I don't # want to fill the stack if we have to retry a lot of times. Also # can't just use `retry` because not all of the "go around again" # conditions are due to exceptions. done = false until done @socket_mutex.synchronize do - if @current_socket + if @current_target begin - yield @current_socket - @metrics[:connected].set({}, 1) + # Check that our socket is still good to go; if we don't do this, + # the other end can disconnect, and because we're never normally + # reading from the socket, we never get the EOFError that normally + # results, and so the socket remains in CLOSE_WAIT state *forever*. + @current_target.socket.read_nonblock(1) + + yield @current_target + @metrics[:connected].set({ server: @current_target.describe_peer }, 1) done = true - rescue SystemCallError => ex + rescue SystemCallError, IOError => ex # Something went wrong during the send; disconnect from this # server and recycle - @metrics[:write_exception].increment(server: describe_peer(@current_socket), class: ex.class.to_s) - @logger.info("LogstashWriter") { "Error while writing to current server: #{ex.message} (#{ex.class})" } - @current_socket.close - @current_socket = nil - @metrics[:connected].set({}, 0) + @metrics[:write_exception].increment(server: @current_target.describe_peer, class: ex.class.to_s) + @metrics[:connected].set({ server: @current_target.describe_peer }, 0) + @logger.info("LogstashWriter") { "Error while writing to current server #{@current_target.describe_peer}: #{ex.message} (#{ex.class})" } + @current_target.close + @current_target = nil sleep INITIAL_RETRY_WAIT end else retry_delay = INITIAL_RETRY_WAIT * 10 @@ -311,14 +311,16 @@ begin next_server = candidates.shift if next_server @logger.debug("LogstashWriter") { "Trying to connect to #{next_server.to_s}" } - @current_socket = next_server.socket + @current_target = next_server + # Trigger a connection attempt + @current_target.socket else @logger.debug("LogstashWriter") { "Could not connect to any server; pausing before trying again" } - @current_socket = nil + @current_target = nil sleep retry_delay # Calculate a longer retry delay next time we fail to connect # to every server in the list, up to a maximum of (roughly) 60 # seconds. @@ -339,22 +341,10 @@ end end end end - # Generate a human-readable description of the remote end of the given - # socket. - # - def describe_peer(s) - pa = s.peeraddr - if pa[0] == "AF_INET6" - "[#{pa[3]}]:#{pa[1]}" - else - "#{pa[3]}:#{pa[1]}" - end - end - # Turn the server_name given in the constructor into a list of Target # objects, suitable for iterating through to find someone to talk to. # def resolve_server_name return [static_target] if static_target @@ -487,18 +477,56 @@ # # @raise [SystemCallError] if connection cannot be established # for any reason. # def socket - TCPSocket.new(@addr, @port) + @socket ||= TCPSocket.new(@addr, @port) end + # Shut down the connection. + # + # @return [NilClass] + # + def close + @socket.close if @socket + @socket = nil + @describe_peer = nil + end + # Simple string representation of the target. # # @return [String] # def to_s "#{@addr}:#{@port}" + end + + # Provide as accurate a representation of what we're *actually* connected + # to as we can, given the constraints of whether we're connected. + # + # To prevent unpleasantness when the other end disconnects but we still + # want to know who we *were* connected to, we cache the result of our + # cogitations. Just in case. + # + # @return [String] + # + def describe_peer + @describe_peer ||= begin + if @socket + pa = @socket.peeraddr + if pa[0] == "AF_INET6" + "[#{pa[3]}]:#{pa[1]}" + else + "#{pa[3]}:#{pa[1]}" + end + else + nil + end + rescue Errno::ENOTCONN + # Peer disconnected apparently means "I forgot who I was connected + # to"... ¯\_(ツ)_/¯ + nil + end || to_s end end private_constant :Target end