lib/instrumental/agent.rb in instrumental_agent-3.0.0.alpha vs lib/instrumental/agent.rb in instrumental_agent-3.0.0.beta
- old
+ new
@@ -1,7 +1,9 @@
require 'instrumental/version'
require 'instrumental/system_timer'
+require 'instrumental/command_structs'
+require 'instrumental/event_aggregator'
require 'logger'
require 'openssl' rescue nil
require 'resolv'
require 'thread'
require 'socket'
@@ -13,18 +15,21 @@
BACKOFF = 2.0
CONNECT_TIMEOUT = 20
EXIT_FLUSH_TIMEOUT = 5
HOSTNAME = Socket.gethostbyname(Socket.gethostname).first rescue Socket.gethostname
MAX_BUFFER = 5000
+ MAX_AGGREGATOR_SIZE = 5000
MAX_RECONNECT_DELAY = 15
REPLY_TIMEOUT = 10
RESOLUTION_FAILURES_BEFORE_WAITING = 3
RESOLUTION_WAIT = 30
RESOLVE_TIMEOUT = 1
+ DEFAULT_FREQUENCY = 0
+ VALID_FREQUENCIES = [0, 1, 2, 3, 4, 5, 6, 10, 12, 15, 20, 30, 60]
- attr_accessor :host, :port, :synchronous, :queue, :dns_resolutions, :last_connect_at
+ attr_accessor :host, :port, :synchronous, :frequency, :sender_queue, :aggregator_queue, :dns_resolutions, :last_connect_at
attr_reader :connection, :enabled, :secure
def self.logger=(l)
@logger = l
end
@@ -50,10 +55,11 @@
# defaults
# host: collector.instrumentalapp.com
# port: 8001
# enabled: true
# synchronous: false
+ # frequency: 10
# secure: true
# verify: true
@api_key = api_key
@host, @port = options[:collector].to_s.split(':')
@host ||= 'collector.instrumentalapp.com'
@@ -71,18 +77,28 @@
@verify_cert = options[:verify_cert].nil? ? true : !!options[:verify_cert]
default_port = @secure ? 8001 : 8000
@port = (@port || default_port).to_i
@enabled = options.has_key?(:enabled) ? !!options[:enabled] : true
@synchronous = !!options[:synchronous]
+
+ if options.has_key?(:frequency)
+ self.frequency = options[:frequency]
+ else
+ self.frequency = DEFAULT_FREQUENCY
+ end
+
+ @metrician = options[:metrician].nil? ? true : !!options[:metrician]
@pid = Process.pid
@allow_reconnect = true
@dns_resolutions = 0
@last_connect_at = 0
- @metrician = options[:metrician].nil? ? true : !!options[:metrician]
+
@start_worker_mutex = Mutex.new
- @queue = Queue.new
+ @aggregator_queue = Queue.new
+ @sender_queue = Queue.new
+
setup_cleanup_at_exit if @enabled
if @metrician
Metrician.activate(self)
end
@@ -91,11 +107,13 @@
# Store a gauge for a metric, optionally at a specific time.
#
# agent.gauge('load', 1.23)
def gauge(metric, value, time = Time.now, count = 1)
if valid?(metric, value, time, count) &&
- send_command("gauge", metric, value, time.to_i, count.to_i)
+ send_command(Instrumental::Command.new("gauge".freeze, metric, value, time, count))
+ # tempted to "gauge" this to a symbol? Don't. Frozen strings are very fast,
+ # and later we're going to to_s every one of these anyway.
value
else
nil
end
rescue Exception => e
@@ -139,11 +157,11 @@
# Increment a metric, optionally more than one or at a specific time.
#
# agent.increment('users')
def increment(metric, value = 1, time = Time.now, count = 1)
if valid?(metric, value, time, count) &&
- send_command("increment", metric, value, time.to_i, count.to_i)
+ send_command(Instrumental::Command.new("increment".freeze, metric, value, time, count))
value
else
nil
end
rescue Exception => e
@@ -154,11 +172,11 @@
# Send a notice to the server (deploys, downtime, etc.)
#
# agent.notice('A notice')
def notice(note, time = Time.now, duration = 0)
if valid_note?(note)
- send_command("notice", time.to_i, duration.to_i, note)
+ send_command(Instrumental::Notice.new(note, time, duration))
note
else
nil
end
rescue Exception => e
@@ -193,10 +211,26 @@
def logger
@logger || self.class.logger
end
+ def frequency=(frequency)
+ freq = frequency.to_i
+ if !VALID_FREQUENCIES.include?(freq)
+ logger.warn "Frequency must be a value that divides evenly into 60: 1, 2, 3, 4, 5, 6, 10, 12, 15, 20, 30, or 60."
+ # this will make all negative numbers and nils into 0s
+ freq = VALID_FREQUENCIES.select{ |f| f < freq }.max.to_i
+ end
+
+ @frequency = if(@synchronous)
+ logger.warn "Synchronous and Frequency should not be enabled at the same time! Defaulting to synchronous mode."
+ 0
+ else
+ freq
+ end
+ end
+
# Stopping the agent will immediately stop all communication
# to Instrumental. If you call this and submit another metric,
# the agent will start again.
#
# Calling stop will cause all metrics waiting to be sent to be
@@ -204,34 +238,48 @@
#
# agent.stop
#
def stop
disconnect
- if @thread
- @thread.kill
- @thread = nil
+ if @sender_thread
+ @sender_thread.kill
+ @sender_thread = nil
end
- if @queue
- @queue.clear
+ if @aggregator_thread
+ @aggregator_thread.kill
+ @aggregator_thread = nil
end
+ if @sender_queue
+ @sender_queue.clear
+ end
+ if @aggregator_queue
+ @aggregator_queue.clear
+ end
end
# Called when a process is exiting to give it some extra time to
# push events to the service. An at_exit handler is automatically
# registered for this method, but can be called manually in cases
# where at_exit is bypassed like Resque workers.
def cleanup
if running?
- logger.info "Cleaning up agent, queue size: #{@queue.size}, thread running: #{@thread.alive?}"
+ logger.info "Cleaning up agent, aggregator_size: #{@aggregator_queue.size}, thread_running: #{@aggregator_thread.alive?}"
+ logger.info "Cleaning up agent, queue size: #{@sender_queue.size}, thread running: #{@sender_thread.alive?}"
@allow_reconnect = false
- if @queue.size > 0
- queue_message('exit')
+ if @sender_queue.size > 0 || @aggregator_queue.size > 0
+ @sender_queue << ['exit']
+ @aggregator_queue << ['exit']
begin
- with_timeout(EXIT_FLUSH_TIMEOUT) { @thread.join }
+ with_timeout(EXIT_FLUSH_TIMEOUT) { @aggregator_thread.join }
+ with_timeout(EXIT_FLUSH_TIMEOUT) { @sender_thread.join }
rescue Timeout::Error
- if @queue.size > 0
- logger.error "Timed out working agent thread on exit, dropping #{@queue.size} metrics"
+ total_size = @sender_queue&.size.to_i +
+ @aggregator_queue&.size.to_i +
+ @event_aggregator&.size.to_i
+
+ if total_size > 0
+ logger.error "Timed out working agent thread on exit, dropping #{total_size} metrics"
else
logger.error "Timed out Instrumental Agent, exiting"
end
end
end
@@ -268,10 +316,11 @@
increment "agent.invalid_value"
logger.warn "Invalid value #{value.inspect} for #{metric}"
end
def report_exception(e)
+ # puts "--- Exception of type #{e.class} occurred:\n#{e.message}\n#{e.backtrace.join("\n")}"
logger.error "Exception of type #{e.class} occurred:\n#{e.message}\n#{e.backtrace.join("\n")}"
end
def ipv4_address_for_host(host, port, moment_to_connect = Time.now.to_i)
self.dns_resolutions = dns_resolutions + 1
@@ -288,48 +337,45 @@
logger.warn "Couldn't resolve address for #{host}:#{port}"
report_exception(e)
nil
end
- def send_command(cmd, *args)
- cmd = "%s %s\n" % [cmd, args.collect { |a| a.to_s }.join(" ")]
- if enabled?
-
- start_connection_worker
- if @queue && @queue.size < MAX_BUFFER
- @queue_full_warning = false
- logger.debug "Queueing: #{cmd.chomp}"
- queue_message(cmd, { :synchronous => @synchronous })
- else
- if !@queue_full_warning
- @queue_full_warning = true
- logger.warn "Queue full(#{@queue.size}), dropping commands..."
- end
- logger.debug "Dropping command, queue full(#{@queue.size}): #{cmd.chomp}"
- nil
- end
+ def send_command(command)
+ return logger.debug(command.to_s) unless enabled?
+ start_workers
+ critical_queue = frequency.to_i == 0 ? @sender_queue : @aggregator_queue
+ if critical_queue && critical_queue.size < MAX_BUFFER
+ @queue_full_warning = false
+ logger.debug "Queueing: #{command.to_s}"
+ queue_message(command, { :synchronous => @synchronous })
else
- logger.debug cmd.strip
+ if !@queue_full_warning
+ @queue_full_warning = true
+ logger.warn "Queue full(#{critical_queue.size}), dropping commands..."
+ end
+ logger.debug "Dropping command, queue full(#{critical_queue.size}): #{command.to_s}"
+ nil
end
end
def queue_message(message, options = {})
- if @enabled
- options ||= {}
- if options[:allow_reconnect].nil?
- options[:allow_reconnect] = @allow_reconnect
- end
- synchronous = options.delete(:synchronous)
- if synchronous
- options[:sync_resource] ||= ConditionVariable.new
- @sync_mutex.synchronize {
- @queue << [message, options]
- options[:sync_resource].wait(@sync_mutex)
- }
- else
- @queue << [message, options]
- end
+ return message unless enabled?
+
+ # imagine it's a reverse merge, but with fewer allocations
+ options[:allow_reconnect] = @allow_reconnect unless options.has_key?(:allow_reconnect)
+
+ if options.delete(:synchronous)
+ options[:sync_resource] ||= ConditionVariable.new
+ @sync_mutex.synchronize {
+ queue = message == "flush" ? @aggregator_queue : @sender_queue
+ queue << [message, options]
+ options[:sync_resource].wait(@sync_mutex)
+ }
+ elsif frequency.to_i == 0
+ @sender_queue << [message, options]
+ else
+ @aggregator_queue << [message, options]
end
message
end
def wait_exceptions
@@ -353,13 +399,13 @@
rescue *wait_exceptions
# noop
end
end
- def start_connection_worker
+ def start_workers
# NOTE: We need a mutex around both `running?` and thread creation,
- # otherwise we could create two threads.
+ # otherwise we could create too many threads.
# Return early and queue the message if another thread is
# starting the worker.
return if !@start_worker_mutex.try_lock
begin
return if running?
@@ -369,14 +415,24 @@
if address
@pid = Process.pid
@sync_mutex = Mutex.new
@failures = 0
@sockaddr_in = Socket.pack_sockaddr_in(@port, address)
- logger.info "Starting thread"
- @thread = Thread.new do
- run_worker_loop
+
+ logger.info "Starting aggregator thread"
+ if !@aggregator_thread&.alive?
+ @aggregator_thread = Thread.new do
+ run_aggregator_loop
+ end
end
+
+ if !@sender_thread&.alive?
+ logger.info "Starting sender thread"
+ @sender_thread = Thread.new do
+ run_sender_loop
+ end
+ end
end
ensure
@start_worker_mutex.unlock
end
end
@@ -407,16 +463,77 @@
sock = ssl_socket
end
sock
end
- def run_worker_loop
+ def run_aggregator_loop
+ # if the sender queue is some level of full, should we keep aggregating until it empties out?
+ # what does this mean for aggregation slices - aggregating to nearest frequency will
+ # make the object needlessly larger, when minute resolution is what we have on the server
+ begin
+ loop do
+ now = Time.now.to_i
+ time_to_wait = if frequency == 0
+ 0
+ else
+ next_frequency = (now - (now % frequency)) + frequency
+ time_to_wait = [(next_frequency - Time.now.to_f), 0].max
+ end
+
+ command_and_args, command_options = if @event_aggregator&.size.to_i > MAX_AGGREGATOR_SIZE
+ logger.info "Aggregator full, flushing early with #{MAX_AGGREGATOR_SIZE} metrics."
+ command_and_args, command_options = ['forward', {}]
+ else
+ begin
+ with_timeout(time_to_wait) do
+ @aggregator_queue.pop
+ end
+ rescue Timeout::Error
+ ['forward', {}]
+ end
+ end
+ if command_and_args
+ sync_resource = command_options && command_options[:sync_resource]
+ case command_and_args
+ when 'exit'
+ logger.info "Exiting, #{@aggregator_queue.size} commands remain"
+ return true
+ when 'flush'
+ if !@event_aggregator.nil?
+ @sender_queue << @event_aggregator
+ @event_aggregator = nil
+ end
+ @sender_queue << ['flush', command_options]
+ when 'forward'
+ if !@event_aggregator.nil?
+ next if @sender_queue.size > 0 && @sender_queue.num_waiting < 1
+ @sender_queue << @event_aggregator
+ @event_aggregator = nil
+ end
+ when Notice
+ @sender_queue << [command_and_args, command_options]
+ else
+ @event_aggregator = EventAggregator.new(frequency: @frequency) if @event_aggregator.nil?
+
+ logger.debug "Sending: #{command_and_args} to aggregator"
+ @event_aggregator.put(command_and_args)
+ end
+ command_and_args = nil
+ command_options = nil
+ end
+ end
+ rescue Exception => err
+ report_exception(err)
+ end
+ end
+
+ def run_sender_loop
@failures = 0
begin
- logger.info "connecting to collector"
- command_and_args = nil
- command_options = nil
+ logger.info "connecting to collector"
+ command_and_args = nil
+ command_options = nil
with_timeout(CONNECT_TIMEOUT) do
@socket = open_socket(@sockaddr_in, @secure, @verify_cert)
end
logger.info "connected to collector at #{host}:#{port}"
hello_options = {
@@ -429,22 +546,27 @@
send_with_reply_timeout "hello #{hello_options}"
send_with_reply_timeout "authenticate #{@api_key}"
loop do
- command_and_args, command_options = @queue.pop
+ command_and_args, command_options = @sender_queue.pop
if command_and_args
sync_resource = command_options && command_options[:sync_resource]
test_connection
case command_and_args
when 'exit'
- logger.info "Exiting, #{@queue.size} commands remain"
+ logger.info "Exiting, #{@sender_queue.size} commands remain"
return true
when 'flush'
release_resource = true
+ when EventAggregator
+ command_and_args.values.values.each do |command|
+ logger.debug "Sending: #{command}"
+ @socket.puts command
+ end
else
- logger.debug "Sending: #{command_and_args.chomp}"
+ logger.debug "Sending: #{command_and_args}"
@socket.puts command_and_args
end
command_and_args = nil
command_options = nil
if sync_resource
@@ -462,11 +584,11 @@
when Errno::ECONNREFUSED, Errno::EHOSTUNREACH, Errno::EADDRINUSE, Timeout::Error, OpenSSL::SSL::SSLError
# If the connection has been refused by Instrumental
# or we cannot reach the server
# or the connection state of this socket is in a race
# or SSL is not functioning properly for some reason
- logger.error "unable to connect to Instrumental, hanging up with #{@queue.size} messages remaining"
+ logger.error "unable to connect to Instrumental, hanging up with #{@sender_queue.size} messages remaining"
logger.debug "Exception: #{err.inspect}\n#{err.backtrace.join("\n")}"
allow_reconnect = false
else
report_exception(err)
end
@@ -476,11 +598,11 @@
@failures = 0
return
end
if command_and_args
logger.debug "requeueing: #{command_and_args}"
- @queue << command_and_args
+ @sender_queue << command_and_args
end
disconnect
@failures += 1
delay = [(@failures - 1) ** BACKOFF, MAX_RECONNECT_DELAY].min
logger.error "disconnected, #{@failures} failures in a row, reconnect in #{delay}..."
@@ -496,10 +618,14 @@
cleanup
end
end
def running?
- !@thread.nil? && @pid == Process.pid && @thread.alive?
+ !@sender_thread.nil? &&
+ !@aggregator_thread.nil? &&
+ @pid == Process.pid &&
+ @sender_thread.alive? &&
+ @aggregator_thread.alive?
end
def flush_socket(socket)
socket.flush
rescue Exception => e