lib/syslogstash/logstash_writer.rb in syslogstash-0.3.0 vs lib/syslogstash/logstash_writer.rb in syslogstash-0.4.0

- old
+ new

@@ -8,12 +8,12 @@ # Create a new logstash writer. # # Give it a list of servers, and your writer will be ready to go. # No messages will actually be *delivered*, though, until you call #run. # - def initialize(servers, backlog) - @servers, @backlog = servers.map { |s| URI(s) }, backlog + def initialize(servers, backlog, metrics) + @servers, @backlog, @metrics = servers.map { |s| URI(s) }, backlog, metrics unless @servers.all? { |url| url.scheme == 'tcp' } raise ArgumentError, "Unsupported URL scheme: #{@servers.select { |url| url.scheme != 'tcp' }.join(', ')}" end @@ -26,11 +26,11 @@ # message delivery will happen in a worker thread that is started with # #run. # def send_entry(e) @entries_mutex.synchronize do - @entries << e + @entries << { content: e, arrival_timestamp: Time.now } @entries.shift while @entries.length > @backlog end @worker.run if @worker end @@ -51,13 +51,15 @@ else begin entry = @entries_mutex.synchronize { @entries.shift } current_server do |s| - s.puts entry + s.puts entry[:content] end + @metrics.sent(@servers.last, entry[:arrival_timestamp]) + # If we got here, we sent successfully, so we don't want # to put the entry back on the queue in the ensure block entry = nil rescue StandardError => ex $stderr.puts "Unhandled exception: #{ex.message} (#{ex.class})" @@ -73,10 +75,10 @@ # be accepting log entries, so that something can send log entries to # it. # # The yielding is very deliberate: it allows us to centralise all # error detection and handling within this one method, and retry - # sending just be calling `yield` again when we've connected to + # sending just by calling `yield` again when we've connected to # another server. # def current_server # I could handle this more cleanly with recursion, but I don't want # to fill the stack if we have to retry a lot of times