lib/logstash_writer.rb in logstash_writer-0.0.10 vs lib/logstash_writer.rb in logstash_writer-0.0.11

- old
+ new

@@ -72,23 +72,23 @@ # def initialize(server_name:, logger: Logger.new("/dev/null"), backlog: 1_000, metrics_registry: Prometheus::Client::Registry.new, metrics_prefix: :logstash_writer) @server_name, @logger, @backlog = server_name, logger, backlog @metrics = { - received: metrics_registry.counter(:"#{metrics_prefix}_events_received_total", "The number of logstash events which have been submitted for delivery"), - sent: metrics_registry.counter(:"#{metrics_prefix}_events_written_total", "The number of logstash events which have been delivered to the logstash server"), - queue_size: metrics_registry.gauge(:"#{metrics_prefix}_queue_size", "The number of events currently in the queue to be sent"), - dropped: metrics_registry.counter(:"#{metrics_prefix}_events_dropped_total", "The number of events which have been dropped from the queue"), + received: metrics_registry.counter(:"#{metrics_prefix}_events_received_total", "The number of logstash events which have been submitted for delivery"), + sent: metrics_registry.counter(:"#{metrics_prefix}_events_written_total", "The number of logstash events which have been delivered to the logstash server"), + queue_size: metrics_registry.gauge(:"#{metrics_prefix}_queue_size", "The number of events currently in the queue to be sent"), + dropped: metrics_registry.counter(:"#{metrics_prefix}_events_dropped_total", "The number of events which have been dropped from the queue"), - lag: metrics_registry.gauge(:"#{metrics_prefix}_last_sent_event_time_seconds", "When the last event successfully sent to logstash was originally received"), + lag: metrics_registry.gauge(:"#{metrics_prefix}_last_sent_event_time_seconds", "When the last event successfully sent to logstash was originally received"), - connected: metrics_registry.gauge(:"#{metrics_prefix}_connected_to_server", "Boolean flag indicating whether we are currently connected to a logstash server"), - connect_exception: metrics_registry.counter(:"#{metrics_prefix}_connect_exceptions_total", "The number of exceptions that have occurred whilst attempting to connect to a logstash server"), - write_exception: metrics_registry.counter(:"#{metrics_prefix}_write_exceptions_total", "The number of exceptions that have occurred whilst attempting to write an event to a logstash server"), + connected: metrics_registry.gauge(:"#{metrics_prefix}_connected_to_server", "Boolean flag indicating whether we are currently connected to a logstash server"), + connect_exception: metrics_registry.counter(:"#{metrics_prefix}_connect_exceptions_total", "The number of exceptions that have occurred whilst attempting to connect to a logstash server"), + write_exception: metrics_registry.counter(:"#{metrics_prefix}_write_exceptions_total", "The number of exceptions that have occurred whilst attempting to write an event to a logstash server"), write_loop_exception: metrics_registry.counter(:"#{metrics_prefix}_write_loop_exceptions_total", "The number of exceptions that have occurred in the writing loop"), - write_loop_ok: metrics_registry.gauge(:"#{metrics_prefix}_write_loop_ok", "Boolean flag indicating whether the writing loop is currently operating correctly, or is in a post-apocalyptic hellscape of never-ending exceptions"), + write_loop_ok: metrics_registry.gauge(:"#{metrics_prefix}_write_loop_ok", "Boolean flag indicating whether the writing loop is currently operating correctly, or is in a post-apocalyptic hellscape of never-ending exceptions"), } @metrics[:lag].set({}, 0) @metrics[:queue_size].set({}, 0) @@ -134,22 +134,52 @@ end nil end + # Send events. + # + # Does not return until `#shutdown` is called (in another thread). + # + def run + @queue_mutex.synchronize do + @terminate = false + end + + write_loop + end + + # Tell the LogstashWriter to flush its queue and terminate operation. + # + # Returns immediately. + # + def shutdown + #:nocov: + @worker_mutex.synchronize do + @queue_mutex.synchronize do + @terminate = true + @queue_cv.signal + end + end + #:nocov: + end + # Start sending events. # # This method will return almost immediately, and actual event # transmission will commence in a separate thread. # # @return [NilClass] # - def run + def start! @worker_mutex.synchronize do if @worker_thread.nil? + @queue_mutex.synchronize do + @terminate = false + end + @worker_thread = Thread.new do - Thread.current.name = "LogstashWriter" write_loop end end end @@ -162,22 +192,23 @@ # finished sending all messages that have been queued. This will # return once the worker thread has finished. # # @return [NilClass] # - def stop + def stop! @worker_mutex.synchronize do if @worker_thread - @terminate = true - @queue_cv.signal + @queue_mutex.synchronize do + @terminate = true + @queue_cv.signal + end begin - @worker_thread.join + @worker_thread.join unless @worker_thread == Thread.current rescue Exception => ex @logger.error("LogstashWriter") { (["Worker thread terminated with exception: #{ex.message} (#{ex.class})"] + ex.backtrace).join("\n ") } end @worker_thread = nil - @socket_mutex.synchronize { (@current_target.close; @current_target = nil) if @current_target } end end nil end @@ -208,48 +239,46 @@ # The main "worker" method for getting events out of the queue and # firing them at logstash. # def write_loop error_wait = INITIAL_RETRY_WAIT + Thread.current.name = "LogstashWriter" - catch :terminate do - loop do - event = nil + until @terminate do + event = nil - begin - @queue_mutex.synchronize do - while @queue.empty? && !@terminate - @queue_cv.wait(@queue_mutex) - end - - if @queue.empty? && @terminate - @terminate = false - throw :terminate - end - - event = @queue.shift + begin + @queue_mutex.synchronize do + while @queue.empty? && !@terminate + @queue_cv.wait(@queue_mutex) end + event = @queue.shift + end + + if event current_target do |t| t.socket.puts event[:content].to_json stat_sent(t.to_s, event[:arrival_timestamp]) @metrics[:write_loop_ok].set({}, 1) error_wait = INITIAL_RETRY_WAIT end - rescue StandardError => ex - @logger.error("LogstashWriter") { (["Exception in write_loop: #{ex.message} (#{ex.class})"] + ex.backtrace).join("\n ") } - @queue_mutex.synchronize { @queue.unshift(event) if event } - @metrics[:write_loop_exception].increment(class: ex.class.to_s) - @metrics[:write_loop_ok].set({}, 0) - sleep error_wait - # Increase the error wait timeout for next time, up to a maximum - # interval of about 60 seconds - error_wait *= 1.1 - error_wait = 60 if error_wait > 60 - error_wait += rand / 0.5 end + rescue StandardError => ex + @logger.error("LogstashWriter") { (["Exception in write_loop: #{ex.message} (#{ex.class})"] + ex.backtrace).join("\n ") } + @queue_mutex.synchronize { @queue.unshift(event) if event } + @metrics[:write_loop_exception].increment(class: ex.class.to_s) + @metrics[:write_loop_ok].set({}, 0) + sleep error_wait + # Increase the error wait timeout for next time, up to a maximum + # interval of about 60 seconds + error_wait *= 1.1 + error_wait = 60 if error_wait > 60 + error_wait += rand / 0.5 end end + + force_disconnect! end # Yield a Target connected to the server we currently believe to be # accepting log entries, so that something can send log entries to it. #