lib/instrumental/agent.rb in instrumental_agent-3.0.0.alpha vs lib/instrumental/agent.rb in instrumental_agent-3.0.0.beta

- old
+ new

@@ -1,7 +1,9 @@ require 'instrumental/version' require 'instrumental/system_timer' +require 'instrumental/command_structs' +require 'instrumental/event_aggregator' require 'logger' require 'openssl' rescue nil require 'resolv' require 'thread' require 'socket' @@ -13,18 +15,21 @@ BACKOFF = 2.0 CONNECT_TIMEOUT = 20 EXIT_FLUSH_TIMEOUT = 5 HOSTNAME = Socket.gethostbyname(Socket.gethostname).first rescue Socket.gethostname MAX_BUFFER = 5000 + MAX_AGGREGATOR_SIZE = 5000 MAX_RECONNECT_DELAY = 15 REPLY_TIMEOUT = 10 RESOLUTION_FAILURES_BEFORE_WAITING = 3 RESOLUTION_WAIT = 30 RESOLVE_TIMEOUT = 1 + DEFAULT_FREQUENCY = 0 + VALID_FREQUENCIES = [0, 1, 2, 3, 4, 5, 6, 10, 12, 15, 20, 30, 60] - attr_accessor :host, :port, :synchronous, :queue, :dns_resolutions, :last_connect_at + attr_accessor :host, :port, :synchronous, :frequency, :sender_queue, :aggregator_queue, :dns_resolutions, :last_connect_at attr_reader :connection, :enabled, :secure def self.logger=(l) @logger = l end @@ -50,10 +55,11 @@ # defaults # host: collector.instrumentalapp.com # port: 8001 # enabled: true # synchronous: false + # frequency: 10 # secure: true # verify: true @api_key = api_key @host, @port = options[:collector].to_s.split(':') @host ||= 'collector.instrumentalapp.com' @@ -71,18 +77,28 @@ @verify_cert = options[:verify_cert].nil? ? true : !!options[:verify_cert] default_port = @secure ? 8001 : 8000 @port = (@port || default_port).to_i @enabled = options.has_key?(:enabled) ? !!options[:enabled] : true @synchronous = !!options[:synchronous] + + if options.has_key?(:frequency) + self.frequency = options[:frequency] + else + self.frequency = DEFAULT_FREQUENCY + end + + @metrician = options[:metrician].nil? ? true : !!options[:metrician] @pid = Process.pid @allow_reconnect = true @dns_resolutions = 0 @last_connect_at = 0 - @metrician = options[:metrician].nil? ? true : !!options[:metrician] + @start_worker_mutex = Mutex.new - @queue = Queue.new + @aggregator_queue = Queue.new + @sender_queue = Queue.new + setup_cleanup_at_exit if @enabled if @metrician Metrician.activate(self) end @@ -91,11 +107,13 @@ # Store a gauge for a metric, optionally at a specific time. # # agent.gauge('load', 1.23) def gauge(metric, value, time = Time.now, count = 1) if valid?(metric, value, time, count) && - send_command("gauge", metric, value, time.to_i, count.to_i) + send_command(Instrumental::Command.new("gauge".freeze, metric, value, time, count)) + # tempted to "gauge" this to a symbol? Don't. Frozen strings are very fast, + # and later we're going to to_s every one of these anyway. value else nil end rescue Exception => e @@ -139,11 +157,11 @@ # Increment a metric, optionally more than one or at a specific time. # # agent.increment('users') def increment(metric, value = 1, time = Time.now, count = 1) if valid?(metric, value, time, count) && - send_command("increment", metric, value, time.to_i, count.to_i) + send_command(Instrumental::Command.new("increment".freeze, metric, value, time, count)) value else nil end rescue Exception => e @@ -154,11 +172,11 @@ # Send a notice to the server (deploys, downtime, etc.) # # agent.notice('A notice') def notice(note, time = Time.now, duration = 0) if valid_note?(note) - send_command("notice", time.to_i, duration.to_i, note) + send_command(Instrumental::Notice.new(note, time, duration)) note else nil end rescue Exception => e @@ -193,10 +211,26 @@ def logger @logger || self.class.logger end + def frequency=(frequency) + freq = frequency.to_i + if !VALID_FREQUENCIES.include?(freq) + logger.warn "Frequency must be a value that divides evenly into 60: 1, 2, 3, 4, 5, 6, 10, 12, 15, 20, 30, or 60." + # this will make all negative numbers and nils into 0s + freq = VALID_FREQUENCIES.select{ |f| f < freq }.max.to_i + end + + @frequency = if(@synchronous) + logger.warn "Synchronous and Frequency should not be enabled at the same time! Defaulting to synchronous mode." + 0 + else + freq + end + end + # Stopping the agent will immediately stop all communication # to Instrumental. If you call this and submit another metric, # the agent will start again. # # Calling stop will cause all metrics waiting to be sent to be @@ -204,34 +238,48 @@ # # agent.stop # def stop disconnect - if @thread - @thread.kill - @thread = nil + if @sender_thread + @sender_thread.kill + @sender_thread = nil end - if @queue - @queue.clear + if @aggregator_thread + @aggregator_thread.kill + @aggregator_thread = nil end + if @sender_queue + @sender_queue.clear + end + if @aggregator_queue + @aggregator_queue.clear + end end # Called when a process is exiting to give it some extra time to # push events to the service. An at_exit handler is automatically # registered for this method, but can be called manually in cases # where at_exit is bypassed like Resque workers. def cleanup if running? - logger.info "Cleaning up agent, queue size: #{@queue.size}, thread running: #{@thread.alive?}" + logger.info "Cleaning up agent, aggregator_size: #{@aggregator_queue.size}, thread_running: #{@aggregator_thread.alive?}" + logger.info "Cleaning up agent, queue size: #{@sender_queue.size}, thread running: #{@sender_thread.alive?}" @allow_reconnect = false - if @queue.size > 0 - queue_message('exit') + if @sender_queue.size > 0 || @aggregator_queue.size > 0 + @sender_queue << ['exit'] + @aggregator_queue << ['exit'] begin - with_timeout(EXIT_FLUSH_TIMEOUT) { @thread.join } + with_timeout(EXIT_FLUSH_TIMEOUT) { @aggregator_thread.join } + with_timeout(EXIT_FLUSH_TIMEOUT) { @sender_thread.join } rescue Timeout::Error - if @queue.size > 0 - logger.error "Timed out working agent thread on exit, dropping #{@queue.size} metrics" + total_size = @sender_queue&.size.to_i + + @aggregator_queue&.size.to_i + + @event_aggregator&.size.to_i + + if total_size > 0 + logger.error "Timed out working agent thread on exit, dropping #{total_size} metrics" else logger.error "Timed out Instrumental Agent, exiting" end end end @@ -268,10 +316,11 @@ increment "agent.invalid_value" logger.warn "Invalid value #{value.inspect} for #{metric}" end def report_exception(e) + # puts "--- Exception of type #{e.class} occurred:\n#{e.message}\n#{e.backtrace.join("\n")}" logger.error "Exception of type #{e.class} occurred:\n#{e.message}\n#{e.backtrace.join("\n")}" end def ipv4_address_for_host(host, port, moment_to_connect = Time.now.to_i) self.dns_resolutions = dns_resolutions + 1 @@ -288,48 +337,45 @@ logger.warn "Couldn't resolve address for #{host}:#{port}" report_exception(e) nil end - def send_command(cmd, *args) - cmd = "%s %s\n" % [cmd, args.collect { |a| a.to_s }.join(" ")] - if enabled? - - start_connection_worker - if @queue && @queue.size < MAX_BUFFER - @queue_full_warning = false - logger.debug "Queueing: #{cmd.chomp}" - queue_message(cmd, { :synchronous => @synchronous }) - else - if !@queue_full_warning - @queue_full_warning = true - logger.warn "Queue full(#{@queue.size}), dropping commands..." - end - logger.debug "Dropping command, queue full(#{@queue.size}): #{cmd.chomp}" - nil - end + def send_command(command) + return logger.debug(command.to_s) unless enabled? + start_workers + critical_queue = frequency.to_i == 0 ? @sender_queue : @aggregator_queue + if critical_queue && critical_queue.size < MAX_BUFFER + @queue_full_warning = false + logger.debug "Queueing: #{command.to_s}" + queue_message(command, { :synchronous => @synchronous }) else - logger.debug cmd.strip + if !@queue_full_warning + @queue_full_warning = true + logger.warn "Queue full(#{critical_queue.size}), dropping commands..." + end + logger.debug "Dropping command, queue full(#{critical_queue.size}): #{command.to_s}" + nil end end def queue_message(message, options = {}) - if @enabled - options ||= {} - if options[:allow_reconnect].nil? - options[:allow_reconnect] = @allow_reconnect - end - synchronous = options.delete(:synchronous) - if synchronous - options[:sync_resource] ||= ConditionVariable.new - @sync_mutex.synchronize { - @queue << [message, options] - options[:sync_resource].wait(@sync_mutex) - } - else - @queue << [message, options] - end + return message unless enabled? + + # imagine it's a reverse merge, but with fewer allocations + options[:allow_reconnect] = @allow_reconnect unless options.has_key?(:allow_reconnect) + + if options.delete(:synchronous) + options[:sync_resource] ||= ConditionVariable.new + @sync_mutex.synchronize { + queue = message == "flush" ? @aggregator_queue : @sender_queue + queue << [message, options] + options[:sync_resource].wait(@sync_mutex) + } + elsif frequency.to_i == 0 + @sender_queue << [message, options] + else + @aggregator_queue << [message, options] end message end def wait_exceptions @@ -353,13 +399,13 @@ rescue *wait_exceptions # noop end end - def start_connection_worker + def start_workers # NOTE: We need a mutex around both `running?` and thread creation, - # otherwise we could create two threads. + # otherwise we could create too many threads. # Return early and queue the message if another thread is # starting the worker. return if !@start_worker_mutex.try_lock begin return if running? @@ -369,14 +415,24 @@ if address @pid = Process.pid @sync_mutex = Mutex.new @failures = 0 @sockaddr_in = Socket.pack_sockaddr_in(@port, address) - logger.info "Starting thread" - @thread = Thread.new do - run_worker_loop + + logger.info "Starting aggregator thread" + if !@aggregator_thread&.alive? + @aggregator_thread = Thread.new do + run_aggregator_loop + end end + + if !@sender_thread&.alive? + logger.info "Starting sender thread" + @sender_thread = Thread.new do + run_sender_loop + end + end end ensure @start_worker_mutex.unlock end end @@ -407,16 +463,77 @@ sock = ssl_socket end sock end - def run_worker_loop + def run_aggregator_loop + # if the sender queue is some level of full, should we keep aggregating until it empties out? + # what does this mean for aggregation slices - aggregating to nearest frequency will + # make the object needlessly larger, when minute resolution is what we have on the server + begin + loop do + now = Time.now.to_i + time_to_wait = if frequency == 0 + 0 + else + next_frequency = (now - (now % frequency)) + frequency + time_to_wait = [(next_frequency - Time.now.to_f), 0].max + end + + command_and_args, command_options = if @event_aggregator&.size.to_i > MAX_AGGREGATOR_SIZE + logger.info "Aggregator full, flushing early with #{MAX_AGGREGATOR_SIZE} metrics." + command_and_args, command_options = ['forward', {}] + else + begin + with_timeout(time_to_wait) do + @aggregator_queue.pop + end + rescue Timeout::Error + ['forward', {}] + end + end + if command_and_args + sync_resource = command_options && command_options[:sync_resource] + case command_and_args + when 'exit' + logger.info "Exiting, #{@aggregator_queue.size} commands remain" + return true + when 'flush' + if !@event_aggregator.nil? + @sender_queue << @event_aggregator + @event_aggregator = nil + end + @sender_queue << ['flush', command_options] + when 'forward' + if !@event_aggregator.nil? + next if @sender_queue.size > 0 && @sender_queue.num_waiting < 1 + @sender_queue << @event_aggregator + @event_aggregator = nil + end + when Notice + @sender_queue << [command_and_args, command_options] + else + @event_aggregator = EventAggregator.new(frequency: @frequency) if @event_aggregator.nil? + + logger.debug "Sending: #{command_and_args} to aggregator" + @event_aggregator.put(command_and_args) + end + command_and_args = nil + command_options = nil + end + end + rescue Exception => err + report_exception(err) + end + end + + def run_sender_loop @failures = 0 begin - logger.info "connecting to collector" - command_and_args = nil - command_options = nil + logger.info "connecting to collector" + command_and_args = nil + command_options = nil with_timeout(CONNECT_TIMEOUT) do @socket = open_socket(@sockaddr_in, @secure, @verify_cert) end logger.info "connected to collector at #{host}:#{port}" hello_options = { @@ -429,22 +546,27 @@ send_with_reply_timeout "hello #{hello_options}" send_with_reply_timeout "authenticate #{@api_key}" loop do - command_and_args, command_options = @queue.pop + command_and_args, command_options = @sender_queue.pop if command_and_args sync_resource = command_options && command_options[:sync_resource] test_connection case command_and_args when 'exit' - logger.info "Exiting, #{@queue.size} commands remain" + logger.info "Exiting, #{@sender_queue.size} commands remain" return true when 'flush' release_resource = true + when EventAggregator + command_and_args.values.values.each do |command| + logger.debug "Sending: #{command}" + @socket.puts command + end else - logger.debug "Sending: #{command_and_args.chomp}" + logger.debug "Sending: #{command_and_args}" @socket.puts command_and_args end command_and_args = nil command_options = nil if sync_resource @@ -462,11 +584,11 @@ when Errno::ECONNREFUSED, Errno::EHOSTUNREACH, Errno::EADDRINUSE, Timeout::Error, OpenSSL::SSL::SSLError # If the connection has been refused by Instrumental # or we cannot reach the server # or the connection state of this socket is in a race # or SSL is not functioning properly for some reason - logger.error "unable to connect to Instrumental, hanging up with #{@queue.size} messages remaining" + logger.error "unable to connect to Instrumental, hanging up with #{@sender_queue.size} messages remaining" logger.debug "Exception: #{err.inspect}\n#{err.backtrace.join("\n")}" allow_reconnect = false else report_exception(err) end @@ -476,11 +598,11 @@ @failures = 0 return end if command_and_args logger.debug "requeueing: #{command_and_args}" - @queue << command_and_args + @sender_queue << command_and_args end disconnect @failures += 1 delay = [(@failures - 1) ** BACKOFF, MAX_RECONNECT_DELAY].min logger.error "disconnected, #{@failures} failures in a row, reconnect in #{delay}..." @@ -496,10 +618,14 @@ cleanup end end def running? - !@thread.nil? && @pid == Process.pid && @thread.alive? + !@sender_thread.nil? && + !@aggregator_thread.nil? && + @pid == Process.pid && + @sender_thread.alive? && + @aggregator_thread.alive? end def flush_socket(socket) socket.flush rescue Exception => e