lib/syslogstash.rb in syslogstash-2.2.0 vs lib/syslogstash.rb in syslogstash-3.0.0

- old
+ new

@@ -1,55 +1,60 @@ require 'uri' require 'socket' require 'json' require 'thwait' +require 'logstash_writer' +require 'service_skeleton' # Read syslog messages from one or more sockets, and send it to a logstash # server. # -class Syslogstash - def initialize(cfg) - @cfg = cfg - @stats = PrometheusExporter.new(cfg) - @writer = LogstashWriter.new(cfg, @stats) - @reader = SyslogReader.new(cfg, @writer, @stats) - @logger = cfg.logger - end +class Syslogstash < ServiceSkeleton + string :SYSLOGSTASH_LOGSTASH_SERVER + string :SYSLOGSTASH_SYSLOG_SOCKET + string :SYSLOGSTASH_RELAY_TO_STDOUT, default: false + string :SYSLOGSTASH_DROP_REGEX, default: nil + integer :SYSLOGSTASH_BACKLOG_SIZE, default: 1_000_000, range: 0..(2**31-1) + path_list :SYSLOGSTASH_RELAY_SOCKETS, default: [] + kv_list :SYSLOGSTASH_ADD_FIELDS, default: {}, key_pattern: /\ASYSLOGSTASH_ADD_FIELD_(.*)\z/ - def run - if @cfg.stats_server - @logger.debug("main") { "Running stats server" } - @stats.run - end + def initialize(*_) + super - @writer.run - @reader.run + hook_signal("URG") do + config.relay_to_stdout = !config.relay_to_stdout + logger.info(logloc) { "SIGURG received; relay_to_stdout is now #{config.relay_to_stdout.inspect}" } + end - dead_thread = ThreadsWait.new(@reader.thread, @writer.thread).next_wait + @shutdown_reader, @shutdown_writer = IO.pipe - if dead_thread == @writer.thread - @logger.error("main") { "Writer thread crashed." } - elsif dead_thread == @reader.thread - @logger.error("main") { "Reader thread crashed." } - else - @logger.fatal("main") { "ThreadsWait#next_wait returned unexpected value #{dead_thread.inspect}" } - exit 1 - end + metrics.counter(:syslogstash_messages_received_total, "The number of syslog messages received from the log socket") + metrics.counter(:syslogstash_messages_sent_total, "The number of logstash messages sent to each logstash server") + metrics.gauge(:syslogstash_last_relayed_message_timestamp, "When the last message that was successfully relayed to logstash was originally received") + metrics.gauge(:syslogstash_queue_size, "How many messages are currently in the queue to be sent") + metrics.counter(:syslogstash_dropped_total, "Number of log entries that were not forwarded due to matching the drop regex") - begin - dead_thread.join - rescue Exception => ex - @logger.error("main") { (["Exception in crashed thread was: #{ex.message} (#{ex.class})"] + ex.backtrace).join("\n ") } - end + @writer = LogstashWriter.new(server_name: config.logstash_server, backlog: config.backlog_size, logger: config.logger, metrics_registry: metrics) + @reader = SyslogReader.new(config, @writer, metrics) + end - exit 1 - end + def run + @writer.run + @reader.start! - def force_disconnect! - @writer.force_disconnect! - end + @shutdown_reader.getc + @shutdown_reader.close + end + + def shutdown + @reader.stop! + @writer.stop + + @shutdown_writer.close + end + + def force_disconnect! + @writer.force_disconnect! + end end -require_relative 'syslogstash/config' require_relative 'syslogstash/syslog_reader' -require_relative 'syslogstash/logstash_writer' -require_relative 'syslogstash/prometheus_exporter'