lib/syslogstash/logstash_writer.rb in syslogstash-2.1.0 vs lib/syslogstash/logstash_writer.rb in syslogstash-2.2.0
- old
+ new
@@ -17,10 +17,11 @@
def initialize(cfg, stats)
@server_name, @logger, @backlog, @stats = cfg.logstash_server, cfg.logger, cfg.backlog_size, stats
@entries = []
@entries_mutex = Mutex.new
+ @cs_mutex = Mutex.new
end
# Add an entry to the list of messages to be sent to logstash. Actual
# message delivery will happen in a worker thread that is started with
# #run.
@@ -43,10 +44,20 @@
#
def run
@thread = Thread.new { send_messages }
end
+ # Cause the writer to disconnect from the currently-active server.
+ #
+ def force_disconnect!
+ @cs_mutex.synchronize do
+ @logger.info("writer") { "Forced disconnect from #{server_id(@current_server) }" }
+ @current_server.close if @current_server
+ @current_server = nil
+ end
+ end
+
private
def send_messages
loop do
if @entries_mutex.synchronize { @entries.empty? }
@@ -85,42 +96,45 @@
# I could handle this more cleanly with recursion, but I don't want
# to fill the stack if we have to retry a lot of times
done = false
until done
- if @current_server
- begin
- @logger.debug("writer") { "Using current server #{server_id(@current_server)}" }
- yield @current_server
- done = true
- rescue SystemCallError => ex
- # Something went wrong during the send; disconnect from this
- # server and recycle
- @logger.debug("writer") { "Error while writing to current server: #{ex.message} (#{ex.class})" }
- @current_server.close
- @current_server = nil
- sleep 0.1
- end
- else
- candidates = resolve_server_name
+ @cs_mutex.synchronize do
+ if @current_server
+ begin
+ @logger.debug("writer") { "Using current server #{server_id(@current_server)}" }
+ yield @current_server
+ done = true
+ rescue SystemCallError => ex
+ # Something went wrong during the send; disconnect from this
+ # server and recycle
+ @logger.debug("writer") { "Error while writing to current server: #{ex.message} (#{ex.class})" }
+ @current_server.close
+ @current_server = nil
+ sleep 0.1
+ end
+ else
+ candidates = resolve_server_name
+ @logger.debug("writer") { "Server candidates: #{candidates.inspect}" }
- begin
- next_server = candidates.shift
+ begin
+ next_server = candidates.shift
- if next_server
- @logger.debug("writer") { "Trying to connect to #{next_server.to_s}" }
- @current_server = TCPSocket.new(next_server.hostname, next_server.port)
- else
- @logger.debug("writer") { "Could not connect to any server; pausing before trying again" }
- @current_server = nil
- sleep 5
+ if next_server
+ @logger.debug("writer") { "Trying to connect to #{next_server.to_s}" }
+ @current_server = TCPSocket.new(next_server.hostname, next_server.port)
+ else
+ @logger.debug("writer") { "Could not connect to any server; pausing before trying again" }
+ @current_server = nil
+ sleep 5
+ end
+ rescue SystemCallError => ex
+ # Connection failed for any number of reasons; try the next one in the list
+ @logger.warn("writer") { "Failed to connect to #{next_server.to_s}: #{ex.message} (#{ex.class})" }
+ sleep 0.1
+ retry
end
- rescue SystemCallError => ex
- # Connection failed for any number of reasons; try the next one in the list
- @logger.warn("writer") { "Failed to connect to #{next_server.to_s}: #{ex.message} (#{ex.class})" }
- sleep 0.1
- retry
end
end
end
end
@@ -143,10 +157,10 @@
host, port = @server_name.split(":", 2)
addrs = Resolv::DNS.new.getaddresses(host)
if addrs.empty?
@logger.warn("writer") { "No addresses resolved for server_name #{host.inspect}" }
end
- addrs.map { |a| Target.new(a.to_s, port.to_i) }
+ addrs.sort_by { rand }.map { |a| Target.new(a.to_s, port.to_i) }
else
# SRV records ftw
[].tap do |list|
left = Resolv::DNS.new.getresources(@server_name, Resolv::DNS::Resource::IN::SRV)
if left.empty?