lib/instrumental/agent.rb in instrumental_agent-1.0.0 vs lib/instrumental/agent.rb in instrumental_agent-1.0.1
- old
+ new
@@ -75,10 +75,12 @@
@pid = Process.pid
@allow_reconnect = true
@certs = certificates
@dns_resolutions = 0
@last_connect_at = 0
+ @start_worker_mutex = Mutex.new
+ @queue = Queue.new
setup_cleanup_at_exit if @enabled
end
# Store a gauge for a metric, optionally at a specific time.
@@ -285,11 +287,11 @@
def send_command(cmd, *args)
cmd = "%s %s\n" % [cmd, args.collect { |a| a.to_s }.join(" ")]
if enabled?
- start_connection_worker if !running?
+ start_connection_worker
if @queue && @queue.size < MAX_BUFFER
@queue_full_warning = false
logger.debug "Queueing: #{cmd.chomp}"
queue_message(cmd, { :synchronous => @synchronous })
else
@@ -363,13 +365,19 @@
# noop
end
end
def start_connection_worker
- if enabled?
+ # NOTE: We need a mutex around both `running?` and thread creation,
+ # otherwise we could create two threads.
+ # Return early and queue the message if another thread is
+ # starting the worker.
+ return if !@start_worker_mutex.try_lock
+ begin
+ return if running?
+ return unless enabled?
disconnect
- @queue ||= Queue.new
address = ipv4_address_for_host(@host, @port)
if address
@pid = Process.pid
@sync_mutex = Mutex.new
@failures = 0
@@ -377,9 +385,11 @@
logger.info "Starting thread"
@thread = Thread.new do
run_worker_loop
end
end
+ ensure
+ @start_worker_mutex.unlock
end
end
def send_with_reply_timeout(message)
@socket.puts message