lib/statsd/instrument/udp_sink.rb in statsd-instrument-3.3.0 vs lib/statsd/instrument/udp_sink.rb in statsd-instrument-3.4.0

- old
+ new

@@ -10,58 +10,65 @@ new(host, Integer(port_as_string)) end attr_reader :host, :port + FINALIZER = ->(object_id) do + Thread.list.each do |thread| + if (store = thread["StatsD::UDPSink"]) + store.delete(object_id)&.close + end + end + end + def initialize(host, port) + ObjectSpace.define_finalizer(self, FINALIZER) @host = host @port = port - @mutex = Mutex.new - @socket = nil end def sample?(sample_rate) sample_rate == 1.0 || rand < sample_rate end def <<(datagram) - with_socket { |socket| socket.send(datagram, 0) } - self - rescue SocketError, IOError, SystemCallError => error - StatsD.logger.debug do - "[StatsD::Instrument::UDPSink] Resetting connection because of #{error.class}: #{error.message}" + retried = false + begin + socket.send(datagram, 0) + rescue SocketError, IOError, SystemCallError => error + StatsD.logger.debug do + "[StatsD::Instrument::UDPSink] Resetting connection because of #{error.class}: #{error.message}" + end + invalidate_socket + if retried + StatsD.logger.warn do + "[#{self.class.name}] Events were dropped because of #{error.class}: #{error.message}" + end + else + retried = true + retry + end end - invalidate_socket self end private - def synchronize(&block) - @mutex.synchronize(&block) - rescue ThreadError - # In cases where a TERM or KILL signal has been sent, and we send stats as - # part of a signal handler, locks cannot be acquired, so we do our best - # to try and send the datagram without a lock. - yield + def invalidate_socket + socket = thread_store.delete(object_id) + socket&.close end - def with_socket - synchronize { yield(socket) } - end - def socket - @socket ||= begin + thread_store[object_id] ||= begin socket = UDPSocket.new socket.connect(@host, @port) socket end end - def invalidate_socket - synchronize do - @socket = nil - end + def thread_store + Thread.current["StatsD::UDPSink"] ||= {} end end end end