lib/syslogstash/logstash_writer.rb in syslogstash-1.3.0 vs lib/syslogstash/logstash_writer.rb in syslogstash-2.1.0
- old
+ new
@@ -1,25 +1,24 @@
-require_relative 'worker'
+require 'resolv'
+require 'ipaddr'
-# Write messages to one of a collection of logstash servers.
+# Write messages to a logstash server.
#
class Syslogstash::LogstashWriter
- include Syslogstash::Worker
+ Target = Struct.new(:hostname, :port)
+ attr_reader :thread
+
# Create a new logstash writer.
#
- # Give it a list of servers, and your writer will be ready to go.
- # No messages will actually be *delivered*, though, until you call #run.
+ # Once the object is created, you're ready to give it messages by
+ # calling #send_entry. No messages will actually be *delivered* to
+ # logstash, though, until you call #run.
#
- def initialize(servers, backlog, metrics)
- @servers, @backlog, @metrics = servers.map { |s| URI(s) }, backlog, metrics
+ def initialize(cfg, stats)
+ @server_name, @logger, @backlog, @stats = cfg.logstash_server, cfg.logger, cfg.backlog_size, stats
- unless @servers.all? { |url| url.scheme == 'tcp' }
- raise ArgumentError,
- "Unsupported URL scheme: #{@servers.select { |url| url.scheme != 'tcp' }.join(', ')}"
- end
-
@entries = []
@entries_mutex = Mutex.new
end
# Add an entry to the list of messages to be sent to logstash. Actual
@@ -29,22 +28,23 @@
def send_entry(e)
@entries_mutex.synchronize do
@entries << { content: e, arrival_timestamp: Time.now }
while @entries.length > @backlog
@entries.shift
- @metrics.dropped
+ @stats.dropped
end
end
- @worker.run if @worker
+
+ @thread.run if @thread
end
# Start sending messages to logstash servers. This method will return
# almost immediately, and actual message sending will occur in a
- # separate worker thread.
+ # separate thread.
#
def run
- @worker = Thread.new { send_messages }
+ @thread = Thread.new { send_messages }
end
private
def send_messages
@@ -55,20 +55,18 @@
begin
entry = @entries_mutex.synchronize { @entries.shift }
current_server do |s|
s.puts entry[:content]
+ @stats.sent(server_id(s), entry[:arrival_timestamp])
end
- @metrics.sent(@servers.last, entry[:arrival_timestamp])
-
# If we got here, we sent successfully, so we don't want
# to put the entry back on the queue in the ensure block
entry = nil
rescue StandardError => ex
- log { "Unhandled exception: #{ex.message} (#{ex.class})" }
- $stderr.puts ex.backtrace.map { |l| " #{l}" }.join("\n")
+ @logger.error("writer") { (["Unhandled exception while writing entry: #{ex.message} (#{ex.class})"] + ex.backtrace).join("\n ") }
ensure
@entries_mutex.synchronize { @entries.unshift if entry }
end
end
end
@@ -89,31 +87,101 @@
done = false
until done
if @current_server
begin
- debug { "Using current server" }
+ @logger.debug("writer") { "Using current server #{server_id(@current_server)}" }
yield @current_server
done = true
rescue SystemCallError => ex
# Something went wrong during the send; disconnect from this
# server and recycle
- debug { "Error while writing to current server: #{ex.message} (#{ex.class})" }
+ @logger.debug("writer") { "Error while writing to current server: #{ex.message} (#{ex.class})" }
@current_server.close
@current_server = nil
sleep 0.1
end
else
+ candidates = resolve_server_name
+
begin
- # Pick another server to connect to at random
- next_server = @servers.sort { rand }.first
- debug { "Trying to connect to #{next_server.to_s}" }
- @current_server = TCPSocket.new(next_server.hostname, next_server.port)
+ next_server = candidates.shift
+
+ if next_server
+ @logger.debug("writer") { "Trying to connect to #{next_server.to_s}" }
+ @current_server = TCPSocket.new(next_server.hostname, next_server.port)
+ else
+ @logger.debug("writer") { "Could not connect to any server; pausing before trying again" }
+ @current_server = nil
+ sleep 5
+ end
rescue SystemCallError => ex
- # Connection failed for any number of reasons; try again
- debug { "Failed to connect to #{next_server.to_s}: #{ex.message} (#{ex.class})" }
+ # Connection failed for any number of reasons; try the next one in the list
+ @logger.warn("writer") { "Failed to connect to #{next_server.to_s}: #{ex.message} (#{ex.class})" }
sleep 0.1
retry
+ end
+ end
+ end
+ end
+
+ def server_id(s)
+ pa = s.peeraddr
+ if pa[0] == "AF_INET6"
+ "[#{pa[3]}]:#{pa[1]}"
+ else
+ "#{pa[3]}:#{pa[1]}"
+ end
+ end
+
+ def resolve_server_name
+ return [static_target] if static_target
+
+ # The IPv6 literal case should have been taken care of by
+ # static_target, so the only two cases we have to deal with
+ # here are specified-port (assume A/AAAA) or no port (assume SRV).
+ if @server_name =~ /:/
+ host, port = @server_name.split(":", 2)
+ addrs = Resolv::DNS.new.getaddresses(host)
+ if addrs.empty?
+ @logger.warn("writer") { "No addresses resolved for server_name #{host.inspect}" }
+ end
+ addrs.map { |a| Target.new(a.to_s, port.to_i) }
+ else
+ # SRV records ftw
+ [].tap do |list|
+ left = Resolv::DNS.new.getresources(@server_name, Resolv::DNS::Resource::IN::SRV)
+ if left.empty?
+ @logger.warn("writer") { "No SRV records found for server_name #{@server_name.inspect}" }
+ end
+ until left.empty?
+ prio = left.map { |rr| rr.priority }.uniq.min
+ candidates = left.select { |rr| rr.priority == prio }
+ left -= candidates
+ candidates.sort_by! { |rr| [rr.weight, rr.target.to_s] }
+ until candidates.empty?
+ selector = rand(candidates.inject(1) { |n, rr| n + rr.weight })
+ chosen = candidates.inject(0) do |n, rr|
+ break rr if n + rr.weight >= selector
+ n + rr.weight
+ end
+ candidates.delete(chosen)
+ list << Target.new(chosen.target.to_s, chosen.port)
+ end
+ end
+ end
+ end
+ end
+
+ def static_target
+ @static_target ||= begin
+ if @server_name =~ /\A(.*):(\d+)\z/
+ begin
+ Target.new(IPAddr.new($1).to_s, $2.to_i)
+ rescue ArgumentError
+ # Whatever is on the LHS isn't a recognisable address;
+ # assume hostname and continue
+ nil
end
end
end
end
end