lib/instrumental/agent.rb in instrumental_agent-0.8.1 vs lib/instrumental/agent.rb in instrumental_agent-0.8.2
- old
+ new
@@ -1,29 +1,31 @@
require 'instrumental/rack/middleware'
require 'instrumental/version'
require 'logger'
require 'thread'
require 'socket'
+require 'timeout'
# Sets up a connection to the collector.
#
# Instrumental::Agent.new(API_KEY)
module Instrumental
class Agent
BACKOFF = 2.0
MAX_RECONNECT_DELAY = 15
MAX_BUFFER = 5000
+ REPLY_TIMEOUT = 10
- attr_accessor :host, :port, :synchronous
+ attr_accessor :host, :port, :synchronous, :queue
attr_reader :connection, :enabled
def self.logger=(l)
@logger = l
end
def self.logger
- if !@logger
+ if !@logger
@logger = Logger.new(STDERR)
@logger.level = Logger::WARN
end
@logger
end
@@ -238,18 +240,28 @@
end
end
end
end
+ def send_with_reply_timeout(message)
+ @socket.puts message
+ Timeout.timeout(REPLY_TIMEOUT) do
+ response = @socket.gets
+ if response.to_s.chomp != "ok"
+ raise "Bad Response #{response.inspect} to #{message.inspect}"
+ end
+ end
+ end
+
def connection_worker
command_and_args = nil
logger.info "connecting to collector"
@socket = TCPSocket.new(host, port)
- @failures = 0
logger.info "connected to collector at #{host}:#{port}"
- @socket.puts "hello version #{Instrumental::VERSION} test_mode #{@test_mode}"
- @socket.puts "authenticate #{@api_key}"
+ send_with_reply_timeout "hello version #{Instrumental::VERSION} test_mode #{@test_mode}"
+ send_with_reply_timeout "authenticate #{@api_key}"
+ @failures = 0
loop do
command_and_args = @queue.pop
test_connection
case command_and_args
@@ -270,10 +282,10 @@
@queue << command_and_args
end
disconnect
@failures += 1
delay = [(@failures - 1) ** BACKOFF, MAX_RECONNECT_DELAY].min
- logger.info "disconnected, reconnect in #{delay}..."
+ logger.error "disconnected, #{@failures} failures in a row, reconnect in #{delay}..."
sleep delay
retry
ensure
disconnect
end