lib/arborist/monitor_runner.rb in arborist-0.2.0.pre20170519125456 vs lib/arborist/monitor_runner.rb in arborist-0.2.0
- old
+ new
@@ -1,8 +1,10 @@
# -*- ruby -*-
#encoding: utf-8
+require 'set'
+
require 'cztop'
require 'cztop/reactor'
require 'cztop/reactor/signal_handling'
require 'loggability'
@@ -19,21 +21,25 @@
QUEUE_SIGS = [
:INT, :TERM, :HUP, :USR1,
# :TODO: :QUIT, :WINCH, :USR2, :TTIN, :TTOU
] & Signal.list.keys.map( &:to_sym )
+ # Number of seconds between thread cleanup
+ THREAD_CLEANUP_INTERVAL = 5 # seconds
+
log_to :arborist
### Create a new Arborist::MonitorRunner
def initialize
- @monitors = []
- @handler = nil
- @reactor = CZTop::Reactor.new
- @client = Arborist::Client.new
- @request_queue = {}
+ @monitors = []
+ @handler = nil
+ @reactor = CZTop::Reactor.new
+ @client = Arborist::Client.new
+ @runner_threads = {}
+ @request_queue = {}
end
######
public
@@ -58,19 +64,29 @@
##
# The Arborist::Client that will provide the message packing and unpacking
attr_reader :client
+ ##
+ # A hash of monitor object -> thread used to contain and track running monitor threads.
+ attr_reader :runner_threads
+
### Load monitors from the specified +enumerator+.
def load_monitors( enumerator )
self.monitors.concat( enumerator.to_a )
end
### Run the specified +monitors+
def run
+ self.monitors.each do |mon|
+ self.add_timer_for( mon )
+ end
+
+ self.add_thread_cleanup_timer
+
self.with_signal_handler( self.reactor, *QUEUE_SIGS ) do
self.reactor.register( self.client.tree_api, :write, &self.method(:handle_io_event) )
self.reactor.start_polling
end
end
@@ -106,49 +122,81 @@
end
end
- ### Run the specified +monitor+ and update nodes with the results.
+ ### Update nodes with the results of a monitor's run.
def run_monitor( monitor )
positive = monitor.positive_criteria
negative = monitor.negative_criteria
- include_down = monitor.include_down?
+ exclude_down = monitor.exclude_down?
props = monitor.node_properties
- self.fetch( positive, include_down, props, negative ) do |nodes|
- results = monitor.run( nodes )
- monitor_key = monitor.key
+ self.search( positive, exclude_down, props, negative ) do |nodes|
+ self.log.info "Running %p monitor for %d node(s)" % [
+ monitor.description,
+ nodes.length
+ ]
- results.each do |ident, properties|
- properties['_monitor_key'] = monitor_key
+ unless nodes.empty?
+ self.runner_threads[ monitor ] = Thread.new do
+ Thread.current[:monitor_desc] = monitor.description
+ results = self.run_monitor_safely( monitor, nodes )
+
+ self.log.debug " updating with results: %p" % [ results ]
+ self.update( results, monitor.key ) do
+ self.log.debug "Updated %d via the '%s' monitor" %
+ [ results.length, monitor.description ]
+ end
+ end
+ self.log.debug "THREAD: Started %p for %p" % [ self.runner_threads[monitor], monitor ]
+ self.log.debug "THREAD: Runner threads have: %p" % [ self.runner_threads.to_a ]
end
+ end
+ end
- self.update( results ) do
- self.log.debug "Updated %d via the '%s' monitor" %
- [ results.length, monitor.description ]
+
+ ### Exec +monitor+ against the provided +nodes+ hash, treating
+ ### runtime exceptions as an error condition. Returns an update
+ ### hash, keyed by node identifier.
+ ###
+ def run_monitor_safely( monitor, nodes )
+ results = begin
+ monitor.run( nodes )
+ rescue => err
+ errmsg = "Exception while running %p monitor: %s: %s" % [
+ monitor.description,
+ err.class.name,
+ err.message
+ ]
+ self.log.error "%s\n%s" % [ errmsg, err.backtrace.join("\n ") ]
+ nodes.keys.each_with_object({}) do |id, results|
+ results[id] = { error: errmsg }
end
end
+
+ return results
end
- ### Create a fetch request using the runner's client, then queue the request up
+ ### Create a search request using the runner's client, then queue the request up
### with the specified +block+ as the callback.
- def fetch( criteria, include_down, properties, negative={}, &block )
- fetch = self.client.make_fetch_request( criteria,
- include_down: include_down,
+ def search( criteria, exclude_down, properties, negative={}, &block )
+ search = self.client.make_search_request( criteria,
+ exclude_down: exclude_down,
properties: properties,
exclude: negative
)
- self.queue_request( fetch, &block )
+ self.queue_request( search, &block )
end
### Create an update request using the runner's client, then queue the request up
### with the specified +block+ as the callback.
- def update( nodemap, &block )
- update = self.client.make_update_request( nodemap )
+ def update( nodemap, monitor_key, &block )
+ return if nodemap.empty?
+ update = self.client.make_update_request( nodemap, monitor_key: monitor_key )
self.queue_request( update, &block )
end
### Add the specified +event+ to the queue to be published to the console event
@@ -196,11 +244,13 @@
def add_interval_timer_for( monitor )
interval = monitor.interval
self.log.info "Creating timer for %p" % [ monitor ]
return self.reactor.add_periodic_timer( interval ) do
- self.run_monitor( monitor )
+ unless self.runner_threads.key?( monitor )
+ self.run_monitor( monitor )
+ end
end
end
### Create a one-shot ZMQ::Timer that will register the interval timer for the specified
@@ -209,9 +259,36 @@
delay = rand( monitor.splay )
self.log.debug "Splaying registration of %p for %ds" % [ monitor, delay ]
self.reactor.add_oneshot_timer( delay ) do
self.add_interval_timer_for( monitor )
+ end
+ end
+
+
+ ### Set up a timer to clean up monitor threads.
+ def add_thread_cleanup_timer
+ self.log.debug "Starting thread cleanup timer for %p." % [ self.runner_threads ]
+ self.reactor.add_periodic_timer( THREAD_CLEANUP_INTERVAL ) do
+ self.cleanup_monitor_threads
+ end
+ end
+
+
+ ### :TODO: Handle the thread-interrupt stuff?
+
+ ### Clean up any monitor runner threads that are dead.
+ def cleanup_monitor_threads
+ self.runner_threads.values.reject( &:alive? ).each do |thr|
+ monitor = self.runner_threads.key( thr )
+ self.runner_threads.delete( monitor )
+
+ begin
+ thr.join
+ rescue => err
+ self.log.error "%p while running %s: %s" %
+ [ err.class, thr[:monitor_desc], err.message ]
+ end
end
end
#