lib/syslogstash.rb in syslogstash-2.2.0 vs lib/syslogstash.rb in syslogstash-3.0.0
- old
+ new
@@ -1,55 +1,60 @@
require 'uri'
require 'socket'
require 'json'
require 'thwait'
+require 'logstash_writer'
+require 'service_skeleton'
# Read syslog messages from one or more sockets, and send it to a logstash
# server.
#
-class Syslogstash
- def initialize(cfg)
- @cfg = cfg
- @stats = PrometheusExporter.new(cfg)
- @writer = LogstashWriter.new(cfg, @stats)
- @reader = SyslogReader.new(cfg, @writer, @stats)
- @logger = cfg.logger
- end
+class Syslogstash < ServiceSkeleton
+ string :SYSLOGSTASH_LOGSTASH_SERVER
+ string :SYSLOGSTASH_SYSLOG_SOCKET
+ string :SYSLOGSTASH_RELAY_TO_STDOUT, default: false
+ string :SYSLOGSTASH_DROP_REGEX, default: nil
+ integer :SYSLOGSTASH_BACKLOG_SIZE, default: 1_000_000, range: 0..(2**31-1)
+ path_list :SYSLOGSTASH_RELAY_SOCKETS, default: []
+ kv_list :SYSLOGSTASH_ADD_FIELDS, default: {}, key_pattern: /\ASYSLOGSTASH_ADD_FIELD_(.*)\z/
- def run
- if @cfg.stats_server
- @logger.debug("main") { "Running stats server" }
- @stats.run
- end
+ def initialize(*_)
+ super
- @writer.run
- @reader.run
+ hook_signal("URG") do
+ config.relay_to_stdout = !config.relay_to_stdout
+ logger.info(logloc) { "SIGURG received; relay_to_stdout is now #{config.relay_to_stdout.inspect}" }
+ end
- dead_thread = ThreadsWait.new(@reader.thread, @writer.thread).next_wait
+ @shutdown_reader, @shutdown_writer = IO.pipe
- if dead_thread == @writer.thread
- @logger.error("main") { "Writer thread crashed." }
- elsif dead_thread == @reader.thread
- @logger.error("main") { "Reader thread crashed." }
- else
- @logger.fatal("main") { "ThreadsWait#next_wait returned unexpected value #{dead_thread.inspect}" }
- exit 1
- end
+ metrics.counter(:syslogstash_messages_received_total, "The number of syslog messages received from the log socket")
+ metrics.counter(:syslogstash_messages_sent_total, "The number of logstash messages sent to each logstash server")
+ metrics.gauge(:syslogstash_last_relayed_message_timestamp, "When the last message that was successfully relayed to logstash was originally received")
+ metrics.gauge(:syslogstash_queue_size, "How many messages are currently in the queue to be sent")
+ metrics.counter(:syslogstash_dropped_total, "Number of log entries that were not forwarded due to matching the drop regex")
- begin
- dead_thread.join
- rescue Exception => ex
- @logger.error("main") { (["Exception in crashed thread was: #{ex.message} (#{ex.class})"] + ex.backtrace).join("\n ") }
- end
+ @writer = LogstashWriter.new(server_name: config.logstash_server, backlog: config.backlog_size, logger: config.logger, metrics_registry: metrics)
+ @reader = SyslogReader.new(config, @writer, metrics)
+ end
- exit 1
- end
+ def run
+ @writer.run
+ @reader.start!
- def force_disconnect!
- @writer.force_disconnect!
- end
+ @shutdown_reader.getc
+ @shutdown_reader.close
+ end
+
+ def shutdown
+ @reader.stop!
+ @writer.stop
+
+ @shutdown_writer.close
+ end
+
+ def force_disconnect!
+ @writer.force_disconnect!
+ end
end
-require_relative 'syslogstash/config'
require_relative 'syslogstash/syslog_reader'
-require_relative 'syslogstash/logstash_writer'
-require_relative 'syslogstash/prometheus_exporter'