lib/syslogstash/logstash_writer.rb in syslogstash-0.3.0 vs lib/syslogstash/logstash_writer.rb in syslogstash-0.4.0
- old
+ new
@@ -8,12 +8,12 @@
# Create a new logstash writer.
#
# Give it a list of servers, and your writer will be ready to go.
# No messages will actually be *delivered*, though, until you call #run.
#
- def initialize(servers, backlog)
- @servers, @backlog = servers.map { |s| URI(s) }, backlog
+ def initialize(servers, backlog, metrics)
+ @servers, @backlog, @metrics = servers.map { |s| URI(s) }, backlog, metrics
unless @servers.all? { |url| url.scheme == 'tcp' }
raise ArgumentError,
"Unsupported URL scheme: #{@servers.select { |url| url.scheme != 'tcp' }.join(', ')}"
end
@@ -26,11 +26,11 @@
# message delivery will happen in a worker thread that is started with
# #run.
#
def send_entry(e)
@entries_mutex.synchronize do
- @entries << e
+ @entries << { content: e, arrival_timestamp: Time.now }
@entries.shift while @entries.length > @backlog
end
@worker.run if @worker
end
@@ -51,13 +51,15 @@
else
begin
entry = @entries_mutex.synchronize { @entries.shift }
current_server do |s|
- s.puts entry
+ s.puts entry[:content]
end
+ @metrics.sent(@servers.last, entry[:arrival_timestamp])
+
# If we got here, we sent successfully, so we don't want
# to put the entry back on the queue in the ensure block
entry = nil
rescue StandardError => ex
$stderr.puts "Unhandled exception: #{ex.message} (#{ex.class})"
@@ -73,10 +75,10 @@
# be accepting log entries, so that something can send log entries to
# it.
#
# The yielding is very deliberate: it allows us to centralise all
# error detection and handling within this one method, and retry
- # sending just be calling `yield` again when we've connected to
+ # sending just by calling `yield` again when we've connected to
# another server.
#
def current_server
# I could handle this more cleanly with recursion, but I don't want
# to fill the stack if we have to retry a lot of times