lib/instrumental/agent.rb in instrumental_agent-0.8.1 vs lib/instrumental/agent.rb in instrumental_agent-0.8.2

- old
+ new

@@ -1,29 +1,31 @@ require 'instrumental/rack/middleware' require 'instrumental/version' require 'logger' require 'thread' require 'socket' +require 'timeout' # Sets up a connection to the collector. # # Instrumental::Agent.new(API_KEY) module Instrumental class Agent BACKOFF = 2.0 MAX_RECONNECT_DELAY = 15 MAX_BUFFER = 5000 + REPLY_TIMEOUT = 10 - attr_accessor :host, :port, :synchronous + attr_accessor :host, :port, :synchronous, :queue attr_reader :connection, :enabled def self.logger=(l) @logger = l end def self.logger - if !@logger + if !@logger @logger = Logger.new(STDERR) @logger.level = Logger::WARN end @logger end @@ -238,18 +240,28 @@ end end end end + def send_with_reply_timeout(message) + @socket.puts message + Timeout.timeout(REPLY_TIMEOUT) do + response = @socket.gets + if response.to_s.chomp != "ok" + raise "Bad Response #{response.inspect} to #{message.inspect}" + end + end + end + def connection_worker command_and_args = nil logger.info "connecting to collector" @socket = TCPSocket.new(host, port) - @failures = 0 logger.info "connected to collector at #{host}:#{port}" - @socket.puts "hello version #{Instrumental::VERSION} test_mode #{@test_mode}" - @socket.puts "authenticate #{@api_key}" + send_with_reply_timeout "hello version #{Instrumental::VERSION} test_mode #{@test_mode}" + send_with_reply_timeout "authenticate #{@api_key}" + @failures = 0 loop do command_and_args = @queue.pop test_connection case command_and_args @@ -270,10 +282,10 @@ @queue << command_and_args end disconnect @failures += 1 delay = [(@failures - 1) ** BACKOFF, MAX_RECONNECT_DELAY].min - logger.info "disconnected, reconnect in #{delay}..." + logger.error "disconnected, #{@failures} failures in a row, reconnect in #{delay}..." sleep delay retry ensure disconnect end