lib/statsd/instrument/udp_sink.rb in statsd-instrument-3.3.0 vs lib/statsd/instrument/udp_sink.rb in statsd-instrument-3.4.0
- old
+ new
@@ -10,58 +10,65 @@
new(host, Integer(port_as_string))
end
attr_reader :host, :port
+ FINALIZER = ->(object_id) do
+ Thread.list.each do |thread|
+ if (store = thread["StatsD::UDPSink"])
+ store.delete(object_id)&.close
+ end
+ end
+ end
+
def initialize(host, port)
+ ObjectSpace.define_finalizer(self, FINALIZER)
@host = host
@port = port
- @mutex = Mutex.new
- @socket = nil
end
def sample?(sample_rate)
sample_rate == 1.0 || rand < sample_rate
end
def <<(datagram)
- with_socket { |socket| socket.send(datagram, 0) }
- self
- rescue SocketError, IOError, SystemCallError => error
- StatsD.logger.debug do
- "[StatsD::Instrument::UDPSink] Resetting connection because of #{error.class}: #{error.message}"
+ retried = false
+ begin
+ socket.send(datagram, 0)
+ rescue SocketError, IOError, SystemCallError => error
+ StatsD.logger.debug do
+ "[StatsD::Instrument::UDPSink] Resetting connection because of #{error.class}: #{error.message}"
+ end
+ invalidate_socket
+ if retried
+ StatsD.logger.warn do
+ "[#{self.class.name}] Events were dropped because of #{error.class}: #{error.message}"
+ end
+ else
+ retried = true
+ retry
+ end
end
- invalidate_socket
self
end
private
- def synchronize(&block)
- @mutex.synchronize(&block)
- rescue ThreadError
- # In cases where a TERM or KILL signal has been sent, and we send stats as
- # part of a signal handler, locks cannot be acquired, so we do our best
- # to try and send the datagram without a lock.
- yield
+ def invalidate_socket
+ socket = thread_store.delete(object_id)
+ socket&.close
end
- def with_socket
- synchronize { yield(socket) }
- end
-
def socket
- @socket ||= begin
+ thread_store[object_id] ||= begin
socket = UDPSocket.new
socket.connect(@host, @port)
socket
end
end
- def invalidate_socket
- synchronize do
- @socket = nil
- end
+ def thread_store
+ Thread.current["StatsD::UDPSink"] ||= {}
end
end
end
end