lib/syslogstash/logstash_writer.rb in syslogstash-2.1.0 vs lib/syslogstash/logstash_writer.rb in syslogstash-2.2.0

- old
+ new

@@ -17,10 +17,11 @@ def initialize(cfg, stats) @server_name, @logger, @backlog, @stats = cfg.logstash_server, cfg.logger, cfg.backlog_size, stats @entries = [] @entries_mutex = Mutex.new + @cs_mutex = Mutex.new end # Add an entry to the list of messages to be sent to logstash. Actual # message delivery will happen in a worker thread that is started with # #run. @@ -43,10 +44,20 @@ # def run @thread = Thread.new { send_messages } end + # Cause the writer to disconnect from the currently-active server. + # + def force_disconnect! + @cs_mutex.synchronize do + @logger.info("writer") { "Forced disconnect from #{server_id(@current_server) }" } + @current_server.close if @current_server + @current_server = nil + end + end + private def send_messages loop do if @entries_mutex.synchronize { @entries.empty? } @@ -85,42 +96,45 @@ # I could handle this more cleanly with recursion, but I don't want # to fill the stack if we have to retry a lot of times done = false until done - if @current_server - begin - @logger.debug("writer") { "Using current server #{server_id(@current_server)}" } - yield @current_server - done = true - rescue SystemCallError => ex - # Something went wrong during the send; disconnect from this - # server and recycle - @logger.debug("writer") { "Error while writing to current server: #{ex.message} (#{ex.class})" } - @current_server.close - @current_server = nil - sleep 0.1 - end - else - candidates = resolve_server_name + @cs_mutex.synchronize do + if @current_server + begin + @logger.debug("writer") { "Using current server #{server_id(@current_server)}" } + yield @current_server + done = true + rescue SystemCallError => ex + # Something went wrong during the send; disconnect from this + # server and recycle + @logger.debug("writer") { "Error while writing to current server: #{ex.message} (#{ex.class})" } + @current_server.close + @current_server = nil + sleep 0.1 + end + else + candidates = resolve_server_name + @logger.debug("writer") { "Server candidates: #{candidates.inspect}" } - begin - next_server = candidates.shift + begin + next_server = candidates.shift - if next_server - @logger.debug("writer") { "Trying to connect to #{next_server.to_s}" } - @current_server = TCPSocket.new(next_server.hostname, next_server.port) - else - @logger.debug("writer") { "Could not connect to any server; pausing before trying again" } - @current_server = nil - sleep 5 + if next_server + @logger.debug("writer") { "Trying to connect to #{next_server.to_s}" } + @current_server = TCPSocket.new(next_server.hostname, next_server.port) + else + @logger.debug("writer") { "Could not connect to any server; pausing before trying again" } + @current_server = nil + sleep 5 + end + rescue SystemCallError => ex + # Connection failed for any number of reasons; try the next one in the list + @logger.warn("writer") { "Failed to connect to #{next_server.to_s}: #{ex.message} (#{ex.class})" } + sleep 0.1 + retry end - rescue SystemCallError => ex - # Connection failed for any number of reasons; try the next one in the list - @logger.warn("writer") { "Failed to connect to #{next_server.to_s}: #{ex.message} (#{ex.class})" } - sleep 0.1 - retry end end end end @@ -143,10 +157,10 @@ host, port = @server_name.split(":", 2) addrs = Resolv::DNS.new.getaddresses(host) if addrs.empty? @logger.warn("writer") { "No addresses resolved for server_name #{host.inspect}" } end - addrs.map { |a| Target.new(a.to_s, port.to_i) } + addrs.sort_by { rand }.map { |a| Target.new(a.to_s, port.to_i) } else # SRV records ftw [].tap do |list| left = Resolv::DNS.new.getresources(@server_name, Resolv::DNS::Resource::IN::SRV) if left.empty?