lib/logstash_writer.rb in logstash_writer-0.0.11 vs lib/logstash_writer.rb in logstash_writer-0.0.12

- old
+ new

@@ -1,5 +1,7 @@ +# frozen_string_literal: true + require 'ipaddr' require 'json' require 'resolv' require 'socket' require 'prometheus/client' @@ -72,29 +74,29 @@ # def initialize(server_name:, logger: Logger.new("/dev/null"), backlog: 1_000, metrics_registry: Prometheus::Client::Registry.new, metrics_prefix: :logstash_writer) @server_name, @logger, @backlog = server_name, logger, backlog @metrics = { - received: metrics_registry.counter(:"#{metrics_prefix}_events_received_total", "The number of logstash events which have been submitted for delivery"), - sent: metrics_registry.counter(:"#{metrics_prefix}_events_written_total", "The number of logstash events which have been delivered to the logstash server"), - queue_size: metrics_registry.gauge(:"#{metrics_prefix}_queue_size", "The number of events currently in the queue to be sent"), - dropped: metrics_registry.counter(:"#{metrics_prefix}_events_dropped_total", "The number of events which have been dropped from the queue"), + received: metrics_registry.counter(:"#{metrics_prefix}_events_received_total", docstring: "The number of logstash events which have been submitted for delivery"), + sent: metrics_registry.counter(:"#{metrics_prefix}_events_written_total", docstring: "The number of logstash events which have been delivered to the logstash server", labels: %i{server}), + queue_size: metrics_registry.gauge(:"#{metrics_prefix}_queue_size", docstring: "The number of events currently in the queue to be sent"), + dropped: metrics_registry.counter(:"#{metrics_prefix}_events_dropped_total", docstring: "The number of events which have been dropped from the queue"), - lag: metrics_registry.gauge(:"#{metrics_prefix}_last_sent_event_time_seconds", "When the last event successfully sent to logstash was originally received"), + lag: metrics_registry.gauge(:"#{metrics_prefix}_last_sent_event_time_seconds", docstring: "When the last event successfully sent to logstash was originally received"), - connected: metrics_registry.gauge(:"#{metrics_prefix}_connected_to_server", "Boolean flag indicating whether we are currently connected to a logstash server"), - connect_exception: metrics_registry.counter(:"#{metrics_prefix}_connect_exceptions_total", "The number of exceptions that have occurred whilst attempting to connect to a logstash server"), - write_exception: metrics_registry.counter(:"#{metrics_prefix}_write_exceptions_total", "The number of exceptions that have occurred whilst attempting to write an event to a logstash server"), + connected: metrics_registry.gauge(:"#{metrics_prefix}_connected_to_server", docstring: "Boolean flag indicating whether we are currently connected to a logstash server", labels: %i{server}), + connect_exception: metrics_registry.counter(:"#{metrics_prefix}_connect_exceptions_total", docstring: "The number of exceptions that have occurred whilst attempting to connect to a logstash server", labels: %i{server class}), + write_exception: metrics_registry.counter(:"#{metrics_prefix}_write_exceptions_total", docstring: "The number of exceptions that have occurred whilst attempting to write an event to a logstash server", labels: %i{server class}), - write_loop_exception: metrics_registry.counter(:"#{metrics_prefix}_write_loop_exceptions_total", "The number of exceptions that have occurred in the writing loop"), - write_loop_ok: metrics_registry.gauge(:"#{metrics_prefix}_write_loop_ok", "Boolean flag indicating whether the writing loop is currently operating correctly, or is in a post-apocalyptic hellscape of never-ending exceptions"), + write_loop_exception: metrics_registry.counter(:"#{metrics_prefix}_write_loop_exceptions_total", docstring: "The number of exceptions that have occurred in the writing loop", labels: %i{class}), + write_loop_ok: metrics_registry.gauge(:"#{metrics_prefix}_write_loop_ok", docstring: "Boolean flag indicating whether the writing loop is currently operating correctly, or is in a post-apocalyptic hellscape of never-ending exceptions"), } - @metrics[:lag].set({}, 0) - @metrics[:queue_size].set({}, 0) + @metrics[:lag].set(0) + @metrics[:queue_size].set(0) - metrics_registry.gauge(:"#{metrics_prefix}_queue_max", "The maximum size of the event queue").set({}, backlog) + metrics_registry.gauge(:"#{metrics_prefix}_queue_max", docstring: "The maximum size of the event queue").set(backlog) # We can't use a stdlib Queue object because we need to re-push items # onto the front of the queue in case of error @queue = [] @queue_mutex = Mutex.new @@ -257,19 +259,19 @@ if event current_target do |t| t.socket.puts event[:content].to_json stat_sent(t.to_s, event[:arrival_timestamp]) - @metrics[:write_loop_ok].set({}, 1) + @metrics[:write_loop_ok].set(1) error_wait = INITIAL_RETRY_WAIT end end rescue StandardError => ex @logger.error("LogstashWriter") { (["Exception in write_loop: #{ex.message} (#{ex.class})"] + ex.backtrace).join("\n ") } @queue_mutex.synchronize { @queue.unshift(event) if event } - @metrics[:write_loop_exception].increment(class: ex.class.to_s) - @metrics[:write_loop_ok].set({}, 0) + @metrics[:write_loop_exception].increment(labels: { class: ex.class.to_s }) + @metrics[:write_loop_ok].set(0) sleep error_wait # Increase the error wait timeout for next time, up to a maximum # interval of about 60 seconds error_wait *= 1.1 error_wait = 60 if error_wait > 60 @@ -307,17 +309,17 @@ # "Transport endpoint is not connected" seems like a suitably # appropriate error to me under the circumstances. raise Errno::ENOTCONN unless IO.select([@current_target.socket], [], [], 0).nil? yield @current_target - @metrics[:connected].set({ server: @current_target.describe_peer }, 1) + @metrics[:connected].set(1, labels: { server: @current_target.describe_peer }) done = true rescue SystemCallError => ex # Something went wrong during the send; disconnect from this # server and recycle - @metrics[:write_exception].increment(server: @current_target.describe_peer, class: ex.class.to_s) - @metrics[:connected].set({ server: @current_target.describe_peer }, 0) + @metrics[:write_exception].increment(labels: { server: @current_target.describe_peer, class: ex.class.to_s }) + @metrics[:connected].set(0, labels: { server: @current_target.describe_peer }) @logger.error("LogstashWriter") { "Error while writing to current server #{@current_target.describe_peer}: #{ex.message} (#{ex.class})" } @current_target.close @current_target = nil sleep INITIAL_RETRY_WAIT @@ -356,11 +358,11 @@ # amiss retry_delay += rand end rescue SystemCallError => ex # Connection failed for any number of reasons; try the next one in the list - @metrics[:connect_exception].increment(server: next_server.to_s, class: ex.class.to_s) + @metrics[:connect_exception].increment(labels: { server: next_server.to_s, class: ex.class.to_s }) @logger.error("LogstashWriter") { "Failed to connect to #{next_server.to_s}: #{ex.message} (#{ex.class})" } sleep INITIAL_RETRY_WAIT retry end end @@ -448,22 +450,22 @@ end end end def stat_received - @metrics[:received].increment({}) - @metrics[:queue_size].increment({}) + @metrics[:received].increment + @metrics[:queue_size].increment end def stat_sent(peer, arrived_time) - @metrics[:sent].increment(server: peer) - @metrics[:queue_size].decrement({}) - @metrics[:lag].set({}, arrived_time.to_f) + @metrics[:sent].increment(labels: { server: peer }) + @metrics[:queue_size].decrement + @metrics[:lag].set(arrived_time.to_f) end def stat_dropped - @metrics[:queue_size].decrement({}) - @metrics[:dropped].increment({}) + @metrics[:queue_size].decrement + @metrics[:dropped].increment end # An individual target for logstash messages # # Takes a host and port, gives back a socket to send data down.