lib/logstash_writer.rb in logstash_writer-0.0.10 vs lib/logstash_writer.rb in logstash_writer-0.0.11
- old
+ new
@@ -72,23 +72,23 @@
#
def initialize(server_name:, logger: Logger.new("/dev/null"), backlog: 1_000, metrics_registry: Prometheus::Client::Registry.new, metrics_prefix: :logstash_writer)
@server_name, @logger, @backlog = server_name, logger, backlog
@metrics = {
- received: metrics_registry.counter(:"#{metrics_prefix}_events_received_total", "The number of logstash events which have been submitted for delivery"),
- sent: metrics_registry.counter(:"#{metrics_prefix}_events_written_total", "The number of logstash events which have been delivered to the logstash server"),
- queue_size: metrics_registry.gauge(:"#{metrics_prefix}_queue_size", "The number of events currently in the queue to be sent"),
- dropped: metrics_registry.counter(:"#{metrics_prefix}_events_dropped_total", "The number of events which have been dropped from the queue"),
+ received: metrics_registry.counter(:"#{metrics_prefix}_events_received_total", "The number of logstash events which have been submitted for delivery"),
+ sent: metrics_registry.counter(:"#{metrics_prefix}_events_written_total", "The number of logstash events which have been delivered to the logstash server"),
+ queue_size: metrics_registry.gauge(:"#{metrics_prefix}_queue_size", "The number of events currently in the queue to be sent"),
+ dropped: metrics_registry.counter(:"#{metrics_prefix}_events_dropped_total", "The number of events which have been dropped from the queue"),
- lag: metrics_registry.gauge(:"#{metrics_prefix}_last_sent_event_time_seconds", "When the last event successfully sent to logstash was originally received"),
+ lag: metrics_registry.gauge(:"#{metrics_prefix}_last_sent_event_time_seconds", "When the last event successfully sent to logstash was originally received"),
- connected: metrics_registry.gauge(:"#{metrics_prefix}_connected_to_server", "Boolean flag indicating whether we are currently connected to a logstash server"),
- connect_exception: metrics_registry.counter(:"#{metrics_prefix}_connect_exceptions_total", "The number of exceptions that have occurred whilst attempting to connect to a logstash server"),
- write_exception: metrics_registry.counter(:"#{metrics_prefix}_write_exceptions_total", "The number of exceptions that have occurred whilst attempting to write an event to a logstash server"),
+ connected: metrics_registry.gauge(:"#{metrics_prefix}_connected_to_server", "Boolean flag indicating whether we are currently connected to a logstash server"),
+ connect_exception: metrics_registry.counter(:"#{metrics_prefix}_connect_exceptions_total", "The number of exceptions that have occurred whilst attempting to connect to a logstash server"),
+ write_exception: metrics_registry.counter(:"#{metrics_prefix}_write_exceptions_total", "The number of exceptions that have occurred whilst attempting to write an event to a logstash server"),
write_loop_exception: metrics_registry.counter(:"#{metrics_prefix}_write_loop_exceptions_total", "The number of exceptions that have occurred in the writing loop"),
- write_loop_ok: metrics_registry.gauge(:"#{metrics_prefix}_write_loop_ok", "Boolean flag indicating whether the writing loop is currently operating correctly, or is in a post-apocalyptic hellscape of never-ending exceptions"),
+ write_loop_ok: metrics_registry.gauge(:"#{metrics_prefix}_write_loop_ok", "Boolean flag indicating whether the writing loop is currently operating correctly, or is in a post-apocalyptic hellscape of never-ending exceptions"),
}
@metrics[:lag].set({}, 0)
@metrics[:queue_size].set({}, 0)
@@ -134,22 +134,52 @@
end
nil
end
+ # Send events.
+ #
+ # Does not return until `#shutdown` is called (in another thread).
+ #
+ def run
+ @queue_mutex.synchronize do
+ @terminate = false
+ end
+
+ write_loop
+ end
+
+ # Tell the LogstashWriter to flush its queue and terminate operation.
+ #
+ # Returns immediately.
+ #
+ def shutdown
+ #:nocov:
+ @worker_mutex.synchronize do
+ @queue_mutex.synchronize do
+ @terminate = true
+ @queue_cv.signal
+ end
+ end
+ #:nocov:
+ end
+
# Start sending events.
#
# This method will return almost immediately, and actual event
# transmission will commence in a separate thread.
#
# @return [NilClass]
#
- def run
+ def start!
@worker_mutex.synchronize do
if @worker_thread.nil?
+ @queue_mutex.synchronize do
+ @terminate = false
+ end
+
@worker_thread = Thread.new do
- Thread.current.name = "LogstashWriter"
write_loop
end
end
end
@@ -162,22 +192,23 @@
# finished sending all messages that have been queued. This will
# return once the worker thread has finished.
#
# @return [NilClass]
#
- def stop
+ def stop!
@worker_mutex.synchronize do
if @worker_thread
- @terminate = true
- @queue_cv.signal
+ @queue_mutex.synchronize do
+ @terminate = true
+ @queue_cv.signal
+ end
begin
- @worker_thread.join
+ @worker_thread.join unless @worker_thread == Thread.current
rescue Exception => ex
@logger.error("LogstashWriter") { (["Worker thread terminated with exception: #{ex.message} (#{ex.class})"] + ex.backtrace).join("\n ") }
end
@worker_thread = nil
- @socket_mutex.synchronize { (@current_target.close; @current_target = nil) if @current_target }
end
end
nil
end
@@ -208,48 +239,46 @@
# The main "worker" method for getting events out of the queue and
# firing them at logstash.
#
def write_loop
error_wait = INITIAL_RETRY_WAIT
+ Thread.current.name = "LogstashWriter"
- catch :terminate do
- loop do
- event = nil
+ until @terminate do
+ event = nil
- begin
- @queue_mutex.synchronize do
- while @queue.empty? && !@terminate
- @queue_cv.wait(@queue_mutex)
- end
-
- if @queue.empty? && @terminate
- @terminate = false
- throw :terminate
- end
-
- event = @queue.shift
+ begin
+ @queue_mutex.synchronize do
+ while @queue.empty? && !@terminate
+ @queue_cv.wait(@queue_mutex)
end
+ event = @queue.shift
+ end
+
+ if event
current_target do |t|
t.socket.puts event[:content].to_json
stat_sent(t.to_s, event[:arrival_timestamp])
@metrics[:write_loop_ok].set({}, 1)
error_wait = INITIAL_RETRY_WAIT
end
- rescue StandardError => ex
- @logger.error("LogstashWriter") { (["Exception in write_loop: #{ex.message} (#{ex.class})"] + ex.backtrace).join("\n ") }
- @queue_mutex.synchronize { @queue.unshift(event) if event }
- @metrics[:write_loop_exception].increment(class: ex.class.to_s)
- @metrics[:write_loop_ok].set({}, 0)
- sleep error_wait
- # Increase the error wait timeout for next time, up to a maximum
- # interval of about 60 seconds
- error_wait *= 1.1
- error_wait = 60 if error_wait > 60
- error_wait += rand / 0.5
end
+ rescue StandardError => ex
+ @logger.error("LogstashWriter") { (["Exception in write_loop: #{ex.message} (#{ex.class})"] + ex.backtrace).join("\n ") }
+ @queue_mutex.synchronize { @queue.unshift(event) if event }
+ @metrics[:write_loop_exception].increment(class: ex.class.to_s)
+ @metrics[:write_loop_ok].set({}, 0)
+ sleep error_wait
+ # Increase the error wait timeout for next time, up to a maximum
+ # interval of about 60 seconds
+ error_wait *= 1.1
+ error_wait = 60 if error_wait > 60
+ error_wait += rand / 0.5
end
end
+
+ force_disconnect!
end
# Yield a Target connected to the server we currently believe to be
# accepting log entries, so that something can send log entries to it.
#