lib/instrumental/agent.rb in instrumental_agent-1.0.0 vs lib/instrumental/agent.rb in instrumental_agent-1.0.1

- old
+ new

@@ -75,10 +75,12 @@ @pid = Process.pid @allow_reconnect = true @certs = certificates @dns_resolutions = 0 @last_connect_at = 0 + @start_worker_mutex = Mutex.new + @queue = Queue.new setup_cleanup_at_exit if @enabled end # Store a gauge for a metric, optionally at a specific time. @@ -285,11 +287,11 @@ def send_command(cmd, *args) cmd = "%s %s\n" % [cmd, args.collect { |a| a.to_s }.join(" ")] if enabled? - start_connection_worker if !running? + start_connection_worker if @queue && @queue.size < MAX_BUFFER @queue_full_warning = false logger.debug "Queueing: #{cmd.chomp}" queue_message(cmd, { :synchronous => @synchronous }) else @@ -363,13 +365,19 @@ # noop end end def start_connection_worker - if enabled? + # NOTE: We need a mutex around both `running?` and thread creation, + # otherwise we could create two threads. + # Return early and queue the message if another thread is + # starting the worker. + return if !@start_worker_mutex.try_lock + begin + return if running? + return unless enabled? disconnect - @queue ||= Queue.new address = ipv4_address_for_host(@host, @port) if address @pid = Process.pid @sync_mutex = Mutex.new @failures = 0 @@ -377,9 +385,11 @@ logger.info "Starting thread" @thread = Thread.new do run_worker_loop end end + ensure + @start_worker_mutex.unlock end end def send_with_reply_timeout(message) @socket.puts message