lib/syslogstash/logstash_writer.rb in syslogstash-1.3.0 vs lib/syslogstash/logstash_writer.rb in syslogstash-2.1.0

- old
+ new

@@ -1,25 +1,24 @@ -require_relative 'worker' +require 'resolv' +require 'ipaddr' -# Write messages to one of a collection of logstash servers. +# Write messages to a logstash server. # class Syslogstash::LogstashWriter - include Syslogstash::Worker + Target = Struct.new(:hostname, :port) + attr_reader :thread + # Create a new logstash writer. # - # Give it a list of servers, and your writer will be ready to go. - # No messages will actually be *delivered*, though, until you call #run. + # Once the object is created, you're ready to give it messages by + # calling #send_entry. No messages will actually be *delivered* to + # logstash, though, until you call #run. # - def initialize(servers, backlog, metrics) - @servers, @backlog, @metrics = servers.map { |s| URI(s) }, backlog, metrics + def initialize(cfg, stats) + @server_name, @logger, @backlog, @stats = cfg.logstash_server, cfg.logger, cfg.backlog_size, stats - unless @servers.all? { |url| url.scheme == 'tcp' } - raise ArgumentError, - "Unsupported URL scheme: #{@servers.select { |url| url.scheme != 'tcp' }.join(', ')}" - end - @entries = [] @entries_mutex = Mutex.new end # Add an entry to the list of messages to be sent to logstash. Actual @@ -29,22 +28,23 @@ def send_entry(e) @entries_mutex.synchronize do @entries << { content: e, arrival_timestamp: Time.now } while @entries.length > @backlog @entries.shift - @metrics.dropped + @stats.dropped end end - @worker.run if @worker + + @thread.run if @thread end # Start sending messages to logstash servers. This method will return # almost immediately, and actual message sending will occur in a - # separate worker thread. + # separate thread. # def run - @worker = Thread.new { send_messages } + @thread = Thread.new { send_messages } end private def send_messages @@ -55,20 +55,18 @@ begin entry = @entries_mutex.synchronize { @entries.shift } current_server do |s| s.puts entry[:content] + @stats.sent(server_id(s), entry[:arrival_timestamp]) end - @metrics.sent(@servers.last, entry[:arrival_timestamp]) - # If we got here, we sent successfully, so we don't want # to put the entry back on the queue in the ensure block entry = nil rescue StandardError => ex - log { "Unhandled exception: #{ex.message} (#{ex.class})" } - $stderr.puts ex.backtrace.map { |l| " #{l}" }.join("\n") + @logger.error("writer") { (["Unhandled exception while writing entry: #{ex.message} (#{ex.class})"] + ex.backtrace).join("\n ") } ensure @entries_mutex.synchronize { @entries.unshift if entry } end end end @@ -89,31 +87,101 @@ done = false until done if @current_server begin - debug { "Using current server" } + @logger.debug("writer") { "Using current server #{server_id(@current_server)}" } yield @current_server done = true rescue SystemCallError => ex # Something went wrong during the send; disconnect from this # server and recycle - debug { "Error while writing to current server: #{ex.message} (#{ex.class})" } + @logger.debug("writer") { "Error while writing to current server: #{ex.message} (#{ex.class})" } @current_server.close @current_server = nil sleep 0.1 end else + candidates = resolve_server_name + begin - # Pick another server to connect to at random - next_server = @servers.sort { rand }.first - debug { "Trying to connect to #{next_server.to_s}" } - @current_server = TCPSocket.new(next_server.hostname, next_server.port) + next_server = candidates.shift + + if next_server + @logger.debug("writer") { "Trying to connect to #{next_server.to_s}" } + @current_server = TCPSocket.new(next_server.hostname, next_server.port) + else + @logger.debug("writer") { "Could not connect to any server; pausing before trying again" } + @current_server = nil + sleep 5 + end rescue SystemCallError => ex - # Connection failed for any number of reasons; try again - debug { "Failed to connect to #{next_server.to_s}: #{ex.message} (#{ex.class})" } + # Connection failed for any number of reasons; try the next one in the list + @logger.warn("writer") { "Failed to connect to #{next_server.to_s}: #{ex.message} (#{ex.class})" } sleep 0.1 retry + end + end + end + end + + def server_id(s) + pa = s.peeraddr + if pa[0] == "AF_INET6" + "[#{pa[3]}]:#{pa[1]}" + else + "#{pa[3]}:#{pa[1]}" + end + end + + def resolve_server_name + return [static_target] if static_target + + # The IPv6 literal case should have been taken care of by + # static_target, so the only two cases we have to deal with + # here are specified-port (assume A/AAAA) or no port (assume SRV). + if @server_name =~ /:/ + host, port = @server_name.split(":", 2) + addrs = Resolv::DNS.new.getaddresses(host) + if addrs.empty? + @logger.warn("writer") { "No addresses resolved for server_name #{host.inspect}" } + end + addrs.map { |a| Target.new(a.to_s, port.to_i) } + else + # SRV records ftw + [].tap do |list| + left = Resolv::DNS.new.getresources(@server_name, Resolv::DNS::Resource::IN::SRV) + if left.empty? + @logger.warn("writer") { "No SRV records found for server_name #{@server_name.inspect}" } + end + until left.empty? + prio = left.map { |rr| rr.priority }.uniq.min + candidates = left.select { |rr| rr.priority == prio } + left -= candidates + candidates.sort_by! { |rr| [rr.weight, rr.target.to_s] } + until candidates.empty? + selector = rand(candidates.inject(1) { |n, rr| n + rr.weight }) + chosen = candidates.inject(0) do |n, rr| + break rr if n + rr.weight >= selector + n + rr.weight + end + candidates.delete(chosen) + list << Target.new(chosen.target.to_s, chosen.port) + end + end + end + end + end + + def static_target + @static_target ||= begin + if @server_name =~ /\A(.*):(\d+)\z/ + begin + Target.new(IPAddr.new($1).to_s, $2.to_i) + rescue ArgumentError + # Whatever is on the LHS isn't a recognisable address; + # assume hostname and continue + nil end end end end end