lib/logstash_writer.rb in logstash_writer-0.0.11 vs lib/logstash_writer.rb in logstash_writer-0.0.12
- old
+ new
@@ -1,5 +1,7 @@
+# frozen_string_literal: true
+
require 'ipaddr'
require 'json'
require 'resolv'
require 'socket'
require 'prometheus/client'
@@ -72,29 +74,29 @@
#
def initialize(server_name:, logger: Logger.new("/dev/null"), backlog: 1_000, metrics_registry: Prometheus::Client::Registry.new, metrics_prefix: :logstash_writer)
@server_name, @logger, @backlog = server_name, logger, backlog
@metrics = {
- received: metrics_registry.counter(:"#{metrics_prefix}_events_received_total", "The number of logstash events which have been submitted for delivery"),
- sent: metrics_registry.counter(:"#{metrics_prefix}_events_written_total", "The number of logstash events which have been delivered to the logstash server"),
- queue_size: metrics_registry.gauge(:"#{metrics_prefix}_queue_size", "The number of events currently in the queue to be sent"),
- dropped: metrics_registry.counter(:"#{metrics_prefix}_events_dropped_total", "The number of events which have been dropped from the queue"),
+ received: metrics_registry.counter(:"#{metrics_prefix}_events_received_total", docstring: "The number of logstash events which have been submitted for delivery"),
+ sent: metrics_registry.counter(:"#{metrics_prefix}_events_written_total", docstring: "The number of logstash events which have been delivered to the logstash server", labels: %i{server}),
+ queue_size: metrics_registry.gauge(:"#{metrics_prefix}_queue_size", docstring: "The number of events currently in the queue to be sent"),
+ dropped: metrics_registry.counter(:"#{metrics_prefix}_events_dropped_total", docstring: "The number of events which have been dropped from the queue"),
- lag: metrics_registry.gauge(:"#{metrics_prefix}_last_sent_event_time_seconds", "When the last event successfully sent to logstash was originally received"),
+ lag: metrics_registry.gauge(:"#{metrics_prefix}_last_sent_event_time_seconds", docstring: "When the last event successfully sent to logstash was originally received"),
- connected: metrics_registry.gauge(:"#{metrics_prefix}_connected_to_server", "Boolean flag indicating whether we are currently connected to a logstash server"),
- connect_exception: metrics_registry.counter(:"#{metrics_prefix}_connect_exceptions_total", "The number of exceptions that have occurred whilst attempting to connect to a logstash server"),
- write_exception: metrics_registry.counter(:"#{metrics_prefix}_write_exceptions_total", "The number of exceptions that have occurred whilst attempting to write an event to a logstash server"),
+ connected: metrics_registry.gauge(:"#{metrics_prefix}_connected_to_server", docstring: "Boolean flag indicating whether we are currently connected to a logstash server", labels: %i{server}),
+ connect_exception: metrics_registry.counter(:"#{metrics_prefix}_connect_exceptions_total", docstring: "The number of exceptions that have occurred whilst attempting to connect to a logstash server", labels: %i{server class}),
+ write_exception: metrics_registry.counter(:"#{metrics_prefix}_write_exceptions_total", docstring: "The number of exceptions that have occurred whilst attempting to write an event to a logstash server", labels: %i{server class}),
- write_loop_exception: metrics_registry.counter(:"#{metrics_prefix}_write_loop_exceptions_total", "The number of exceptions that have occurred in the writing loop"),
- write_loop_ok: metrics_registry.gauge(:"#{metrics_prefix}_write_loop_ok", "Boolean flag indicating whether the writing loop is currently operating correctly, or is in a post-apocalyptic hellscape of never-ending exceptions"),
+ write_loop_exception: metrics_registry.counter(:"#{metrics_prefix}_write_loop_exceptions_total", docstring: "The number of exceptions that have occurred in the writing loop", labels: %i{class}),
+ write_loop_ok: metrics_registry.gauge(:"#{metrics_prefix}_write_loop_ok", docstring: "Boolean flag indicating whether the writing loop is currently operating correctly, or is in a post-apocalyptic hellscape of never-ending exceptions"),
}
- @metrics[:lag].set({}, 0)
- @metrics[:queue_size].set({}, 0)
+ @metrics[:lag].set(0)
+ @metrics[:queue_size].set(0)
- metrics_registry.gauge(:"#{metrics_prefix}_queue_max", "The maximum size of the event queue").set({}, backlog)
+ metrics_registry.gauge(:"#{metrics_prefix}_queue_max", docstring: "The maximum size of the event queue").set(backlog)
# We can't use a stdlib Queue object because we need to re-push items
# onto the front of the queue in case of error
@queue = []
@queue_mutex = Mutex.new
@@ -257,19 +259,19 @@
if event
current_target do |t|
t.socket.puts event[:content].to_json
stat_sent(t.to_s, event[:arrival_timestamp])
- @metrics[:write_loop_ok].set({}, 1)
+ @metrics[:write_loop_ok].set(1)
error_wait = INITIAL_RETRY_WAIT
end
end
rescue StandardError => ex
@logger.error("LogstashWriter") { (["Exception in write_loop: #{ex.message} (#{ex.class})"] + ex.backtrace).join("\n ") }
@queue_mutex.synchronize { @queue.unshift(event) if event }
- @metrics[:write_loop_exception].increment(class: ex.class.to_s)
- @metrics[:write_loop_ok].set({}, 0)
+ @metrics[:write_loop_exception].increment(labels: { class: ex.class.to_s })
+ @metrics[:write_loop_ok].set(0)
sleep error_wait
# Increase the error wait timeout for next time, up to a maximum
# interval of about 60 seconds
error_wait *= 1.1
error_wait = 60 if error_wait > 60
@@ -307,17 +309,17 @@
# "Transport endpoint is not connected" seems like a suitably
# appropriate error to me under the circumstances.
raise Errno::ENOTCONN unless IO.select([@current_target.socket], [], [], 0).nil?
yield @current_target
- @metrics[:connected].set({ server: @current_target.describe_peer }, 1)
+ @metrics[:connected].set(1, labels: { server: @current_target.describe_peer })
done = true
rescue SystemCallError => ex
# Something went wrong during the send; disconnect from this
# server and recycle
- @metrics[:write_exception].increment(server: @current_target.describe_peer, class: ex.class.to_s)
- @metrics[:connected].set({ server: @current_target.describe_peer }, 0)
+ @metrics[:write_exception].increment(labels: { server: @current_target.describe_peer, class: ex.class.to_s })
+ @metrics[:connected].set(0, labels: { server: @current_target.describe_peer })
@logger.error("LogstashWriter") { "Error while writing to current server #{@current_target.describe_peer}: #{ex.message} (#{ex.class})" }
@current_target.close
@current_target = nil
sleep INITIAL_RETRY_WAIT
@@ -356,11 +358,11 @@
# amiss
retry_delay += rand
end
rescue SystemCallError => ex
# Connection failed for any number of reasons; try the next one in the list
- @metrics[:connect_exception].increment(server: next_server.to_s, class: ex.class.to_s)
+ @metrics[:connect_exception].increment(labels: { server: next_server.to_s, class: ex.class.to_s })
@logger.error("LogstashWriter") { "Failed to connect to #{next_server.to_s}: #{ex.message} (#{ex.class})" }
sleep INITIAL_RETRY_WAIT
retry
end
end
@@ -448,22 +450,22 @@
end
end
end
def stat_received
- @metrics[:received].increment({})
- @metrics[:queue_size].increment({})
+ @metrics[:received].increment
+ @metrics[:queue_size].increment
end
def stat_sent(peer, arrived_time)
- @metrics[:sent].increment(server: peer)
- @metrics[:queue_size].decrement({})
- @metrics[:lag].set({}, arrived_time.to_f)
+ @metrics[:sent].increment(labels: { server: peer })
+ @metrics[:queue_size].decrement
+ @metrics[:lag].set(arrived_time.to_f)
end
def stat_dropped
- @metrics[:queue_size].decrement({})
- @metrics[:dropped].increment({})
+ @metrics[:queue_size].decrement
+ @metrics[:dropped].increment
end
# An individual target for logstash messages
#
# Takes a host and port, gives back a socket to send data down.