lib/logstash_writer.rb in logstash_writer-0.0.6 vs lib/logstash_writer.rb in logstash_writer-0.0.7
- old
+ new
@@ -119,11 +119,11 @@
unless e.has_key?(:@timestamp) || e.has_key?("@timestamp")
e[:@timestamp] = Time.now.utc.strftime("%FT%T.%NZ")
end
if e.has_key?("@metadata")
- e[:@metadata] = (e[:@metadata] || {}).merge(e["@metadata"])
+ e[:@metadata] = (e[:@metadata] || {}).merge(e.delete("@metadata"))
end
unless e.has_key?(:@metadata)
e[:@metadata] = {}
end
@@ -283,19 +283,23 @@
until done
@socket_mutex.synchronize do
if @current_target
begin
- # Check that our socket is still good to go; if we don't do this,
- # the other end can disconnect, and because we're never normally
- # reading from the socket, we never get the EOFError that normally
- # results, and so the socket remains in CLOSE_WAIT state *forever*.
- @current_target.socket.read_nonblock(1)
+ # Check that our socket is still good to go; if we don't do
+ # this, the other end can disconnect, and because we're never
+ # normally reading from the socket, we never get the EOFError
+ # that normally results, and so the socket remains in CLOSE_WAIT
+ # state *forever*. raising an ENOTCONN gets us into the
+ # SystemCallError rescue, which is where we want to be, and
+ # "Transport endpoint is not connected" seems like a suitably
+ # appropriate error to me under the circumstances.
+ raise Errno::ENOTCONN unless IO.select([@current_target.socket], [], [], 0).nil?
yield @current_target
@metrics[:connected].set({ server: @current_target.describe_peer }, 1)
done = true
- rescue SystemCallError, IOError => ex
+ rescue SystemCallError => ex
# Something went wrong during the send; disconnect from this
# server and recycle
@metrics[:write_exception].increment(server: @current_target.describe_peer, class: ex.class.to_s)
@metrics[:connected].set({ server: @current_target.describe_peer }, 0)
@logger.info("LogstashWriter") { "Error while writing to current server #{@current_target.describe_peer}: #{ex.message} (#{ex.class})" }