lib/arborist/monitor_runner.rb in arborist-0.2.0.pre20170519125456 vs lib/arborist/monitor_runner.rb in arborist-0.2.0

- old
+ new

@@ -1,8 +1,10 @@ # -*- ruby -*- #encoding: utf-8 +require 'set' + require 'cztop' require 'cztop/reactor' require 'cztop/reactor/signal_handling' require 'loggability' @@ -19,21 +21,25 @@ QUEUE_SIGS = [ :INT, :TERM, :HUP, :USR1, # :TODO: :QUIT, :WINCH, :USR2, :TTIN, :TTOU ] & Signal.list.keys.map( &:to_sym ) + # Number of seconds between thread cleanup + THREAD_CLEANUP_INTERVAL = 5 # seconds + log_to :arborist ### Create a new Arborist::MonitorRunner def initialize - @monitors = [] - @handler = nil - @reactor = CZTop::Reactor.new - @client = Arborist::Client.new - @request_queue = {} + @monitors = [] + @handler = nil + @reactor = CZTop::Reactor.new + @client = Arborist::Client.new + @runner_threads = {} + @request_queue = {} end ###### public @@ -58,19 +64,29 @@ ## # The Arborist::Client that will provide the message packing and unpacking attr_reader :client + ## + # A hash of monitor object -> thread used to contain and track running monitor threads. + attr_reader :runner_threads + ### Load monitors from the specified +enumerator+. def load_monitors( enumerator ) self.monitors.concat( enumerator.to_a ) end ### Run the specified +monitors+ def run + self.monitors.each do |mon| + self.add_timer_for( mon ) + end + + self.add_thread_cleanup_timer + self.with_signal_handler( self.reactor, *QUEUE_SIGS ) do self.reactor.register( self.client.tree_api, :write, &self.method(:handle_io_event) ) self.reactor.start_polling end end @@ -106,49 +122,81 @@ end end - ### Run the specified +monitor+ and update nodes with the results. + ### Update nodes with the results of a monitor's run. def run_monitor( monitor ) positive = monitor.positive_criteria negative = monitor.negative_criteria - include_down = monitor.include_down? + exclude_down = monitor.exclude_down? props = monitor.node_properties - self.fetch( positive, include_down, props, negative ) do |nodes| - results = monitor.run( nodes ) - monitor_key = monitor.key + self.search( positive, exclude_down, props, negative ) do |nodes| + self.log.info "Running %p monitor for %d node(s)" % [ + monitor.description, + nodes.length + ] - results.each do |ident, properties| - properties['_monitor_key'] = monitor_key + unless nodes.empty? + self.runner_threads[ monitor ] = Thread.new do + Thread.current[:monitor_desc] = monitor.description + results = self.run_monitor_safely( monitor, nodes ) + + self.log.debug " updating with results: %p" % [ results ] + self.update( results, monitor.key ) do + self.log.debug "Updated %d via the '%s' monitor" % + [ results.length, monitor.description ] + end + end + self.log.debug "THREAD: Started %p for %p" % [ self.runner_threads[monitor], monitor ] + self.log.debug "THREAD: Runner threads have: %p" % [ self.runner_threads.to_a ] end + end + end - self.update( results ) do - self.log.debug "Updated %d via the '%s' monitor" % - [ results.length, monitor.description ] + + ### Exec +monitor+ against the provided +nodes+ hash, treating + ### runtime exceptions as an error condition. Returns an update + ### hash, keyed by node identifier. + ### + def run_monitor_safely( monitor, nodes ) + results = begin + monitor.run( nodes ) + rescue => err + errmsg = "Exception while running %p monitor: %s: %s" % [ + monitor.description, + err.class.name, + err.message + ] + self.log.error "%s\n%s" % [ errmsg, err.backtrace.join("\n ") ] + nodes.keys.each_with_object({}) do |id, results| + results[id] = { error: errmsg } end end + + return results end - ### Create a fetch request using the runner's client, then queue the request up + ### Create a search request using the runner's client, then queue the request up ### with the specified +block+ as the callback. - def fetch( criteria, include_down, properties, negative={}, &block ) - fetch = self.client.make_fetch_request( criteria, - include_down: include_down, + def search( criteria, exclude_down, properties, negative={}, &block ) + search = self.client.make_search_request( criteria, + exclude_down: exclude_down, properties: properties, exclude: negative ) - self.queue_request( fetch, &block ) + self.queue_request( search, &block ) end ### Create an update request using the runner's client, then queue the request up ### with the specified +block+ as the callback. - def update( nodemap, &block ) - update = self.client.make_update_request( nodemap ) + def update( nodemap, monitor_key, &block ) + return if nodemap.empty? + update = self.client.make_update_request( nodemap, monitor_key: monitor_key ) self.queue_request( update, &block ) end ### Add the specified +event+ to the queue to be published to the console event @@ -196,11 +244,13 @@ def add_interval_timer_for( monitor ) interval = monitor.interval self.log.info "Creating timer for %p" % [ monitor ] return self.reactor.add_periodic_timer( interval ) do - self.run_monitor( monitor ) + unless self.runner_threads.key?( monitor ) + self.run_monitor( monitor ) + end end end ### Create a one-shot ZMQ::Timer that will register the interval timer for the specified @@ -209,9 +259,36 @@ delay = rand( monitor.splay ) self.log.debug "Splaying registration of %p for %ds" % [ monitor, delay ] self.reactor.add_oneshot_timer( delay ) do self.add_interval_timer_for( monitor ) + end + end + + + ### Set up a timer to clean up monitor threads. + def add_thread_cleanup_timer + self.log.debug "Starting thread cleanup timer for %p." % [ self.runner_threads ] + self.reactor.add_periodic_timer( THREAD_CLEANUP_INTERVAL ) do + self.cleanup_monitor_threads + end + end + + + ### :TODO: Handle the thread-interrupt stuff? + + ### Clean up any monitor runner threads that are dead. + def cleanup_monitor_threads + self.runner_threads.values.reject( &:alive? ).each do |thr| + monitor = self.runner_threads.key( thr ) + self.runner_threads.delete( monitor ) + + begin + thr.join + rescue => err + self.log.error "%p while running %s: %s" % + [ err.class, thr[:monitor_desc], err.message ] + end end end #