lib/arborist/monitor_runner.rb in arborist-0.1.0 vs lib/arborist/monitor_runner.rb in arborist-0.2.0.pre20170519125456

- old
+ new

@@ -1,224 +1,261 @@ # -*- ruby -*- #encoding: utf-8 -require 'rbczmq' +require 'cztop' +require 'cztop/reactor' +require 'cztop/reactor/signal_handling' require 'loggability' require 'arborist' unless defined?( Arborist ) require 'arborist/client' -# Undo the useless scoping -class ZMQ::Loop - public_class_method :instance -end - - # An event-driven runner for Arborist::Monitors. class Arborist::MonitorRunner extend Loggability + include CZTop::Reactor::SignalHandling - log_to :arborist + # Signals the runner handles + QUEUE_SIGS = [ + :INT, :TERM, :HUP, :USR1, + # :TODO: :QUIT, :WINCH, :USR2, :TTIN, :TTOU + ] & Signal.list.keys.map( &:to_sym ) - # A ZMQ::Handler object for managing IO for all running monitors. - class Handler < ZMQ::Handler - extend Loggability, - Arborist::MethodUtilities + log_to :arborist - log_to :arborist - ### Create a ZMQ::Handler that acts as the agent that runs the specified - ### +monitor+. - def initialize( reactor ) - @reactor = reactor - @client = Arborist::Client.new - @pollitem = ZMQ::Pollitem.new( @client.tree_api, ZMQ::POLLOUT ) - @pollitem.handler = self + ### Create a new Arborist::MonitorRunner + def initialize + @monitors = [] + @handler = nil + @reactor = CZTop::Reactor.new + @client = Arborist::Client.new + @request_queue = {} + end - @request_queue = {} - @registered = false - end + ###### + public + ###### - ###### - public - ###### + ## + # The Array of loaded Arborist::Monitors the runner should run. + attr_reader :monitors - # The ZMQ::Loop that this runner is registered with - attr_reader :reactor + ## + # The ZMQ::Handler subclass that handles all async IO + attr_accessor :handler - # The Queue of pending requests, keyed by the callback that should be called with the - # results. - attr_reader :request_queue + ## + # The reactor (a ZMQ::Loop) the runner uses to drive everything + attr_accessor :reactor - # The Arborist::Client that will provide the message packing and unpacking - attr_reader :client + ## + # The Queue of pending requests, keyed by the callback that should be called with the + # results. + attr_reader :request_queue - ## - # True if the Handler is registered to write one or more requests - attr_predicate :registered + ## + # The Arborist::Client that will provide the message packing and unpacking + attr_reader :client - ### Run the specified +monitor+ and update nodes with the results. - def run_monitor( monitor ) - positive = monitor.positive_criteria - negative = monitor.negative_criteria - include_down = monitor.include_down? - props = monitor.node_properties + ### Load monitors from the specified +enumerator+. + def load_monitors( enumerator ) + self.monitors.concat( enumerator.to_a ) + end - self.fetch( positive, include_down, props, negative ) do |nodes| - results = monitor.run( nodes ) - monitor_key = monitor.key - results.each do |ident, properties| - properties['_monitor_key'] = monitor_key - end - - self.update( results ) do - self.log.debug "Updated %d via the '%s' monitor" % - [ results.length, monitor.description ] - end - end + ### Run the specified +monitors+ + def run + self.with_signal_handler( self.reactor, *QUEUE_SIGS ) do + self.reactor.register( self.client.tree_api, :write, &self.method(:handle_io_event) ) + self.reactor.start_polling end + end - ### Create a fetch request using the runner's client, then queue the request up - ### with the specified +block+ as the callback. - def fetch( criteria, include_down, properties, negative={}, &block ) - fetch = self.client.make_fetch_request( criteria, - include_down: include_down, - properties: properties, - exclude: negative - ) - self.queue_request( fetch, &block ) - end + ### Restart the runner + def restart + # :TODO: Kill any running monitor children, cancel monitor timers, and reload + # monitors from the monitor enumerator + raise NotImplementedError + end - ### Create an update request using the runner's client, then queue the request up - ### with the specified +block+ as the callback. - def update( nodemap, &block ) - update = self.client.make_update_request( nodemap ) - self.queue_request( update, &block ) - end + ### Stop the runner. + def stop + self.log.info "Stopping the runner." + self.reactor.stop_polling + end - ### Add the specified +event+ to the queue to be published to the console event - ### socket - def queue_request( request, &callback ) - self.request_queue[ callback ] = request - self.register - end - - - ### Register the handler's pollitem as being ready to write if it isn't already. - def register - # self.log.debug "Registering for writing." - self.reactor.register( self.pollitem ) unless @registered - @registered = true - end - - - ### Unregister the handler's pollitem from the reactor when there's nothing ready - ### to write. - def unregister - # self.log.debug "Unregistering for writing." - self.reactor.remove( self.pollitem ) if @registered - @registered = false - end - - - ### Write commands from the queue - def on_writable + ### Reactor callback -- handle the client's socket becoming writable. + def handle_io_event( event ) + if event.writable? if (( pair = self.request_queue.shift )) callback, request = *pair res = self.client.send_tree_api_request( request ) callback.call( res ) end self.unregister if self.request_queue.empty? - return true + else + raise "Unexpected %p on the tree API socket" % [ event ] end - end # class Handler + end - ### Create a new Arborist::MonitorRunner - def initialize - @monitors = [] - @handler = nil - @reactor = ZMQ::Loop.new + ### Run the specified +monitor+ and update nodes with the results. + def run_monitor( monitor ) + positive = monitor.positive_criteria + negative = monitor.negative_criteria + include_down = monitor.include_down? + props = monitor.node_properties + + self.fetch( positive, include_down, props, negative ) do |nodes| + results = monitor.run( nodes ) + monitor_key = monitor.key + + results.each do |ident, properties| + properties['_monitor_key'] = monitor_key + end + + self.update( results ) do + self.log.debug "Updated %d via the '%s' monitor" % + [ results.length, monitor.description ] + end + end end - ###### - public - ###### + ### Create a fetch request using the runner's client, then queue the request up + ### with the specified +block+ as the callback. + def fetch( criteria, include_down, properties, negative={}, &block ) + fetch = self.client.make_fetch_request( criteria, + include_down: include_down, + properties: properties, + exclude: negative + ) + self.queue_request( fetch, &block ) + end - # The Array of loaded Arborist::Monitors the runner should run. - attr_reader :monitors - # The ZMQ::Handler subclass that handles all async IO - attr_accessor :handler + ### Create an update request using the runner's client, then queue the request up + ### with the specified +block+ as the callback. + def update( nodemap, &block ) + update = self.client.make_update_request( nodemap ) + self.queue_request( update, &block ) + end - # The reactor (a ZMQ::Loop) the runner uses to drive everything - attr_accessor :reactor + ### Add the specified +event+ to the queue to be published to the console event + ### socket + def queue_request( request, &callback ) + self.request_queue[ callback ] = request + self.register + end - ### Load monitors from the specified +enumerator+. - def load_monitors( enumerator ) - @monitors += enumerator.to_a + + ### Returns +true+ if the runner's client socket is currently registered for writing. + def registered? + return self.reactor.event_enabled?( self.client.tree_api, :write ) end - ### Run the specified +monitors+ - def run - self.handler = Arborist::MonitorRunner::Handler.new( self.reactor ) + ### Register the handler's pollitem as being ready to write if it isn't already. + def register + # self.log.debug "Registering for writing." + self.reactor.enable_events( self.client.tree_api, :write ) unless self.registered? + end - self.monitors.each do |mon| - self.add_timer_for( mon ) - end - self.reactor.start + ### Unregister the handler's pollitem from the reactor when there's nothing ready + ### to write. + def unregister + # self.log.debug "Unregistering for writing." + self.reactor.disable_events( self.client.tree_api, :write ) if self.registered? end ### Register a timer for the specified +monitor+. def add_timer_for( monitor ) interval = monitor.interval - timer = if monitor.splay.nonzero? - self.splay_timer_for( monitor ) - else - self.interval_timer_for( monitor ) - end - - self.reactor.register_timer( timer ) + if monitor.splay.nonzero? + self.add_splay_timer_for( monitor ) + else + self.add_interval_timer_for( monitor ) + end end ### Create a repeating ZMQ::Timer that will run the specified monitor on its interval. - def interval_timer_for( monitor ) + def add_interval_timer_for( monitor ) interval = monitor.interval self.log.info "Creating timer for %p" % [ monitor ] - return ZMQ::Timer.new( interval, 0 ) do - self.handler.run_monitor( monitor ) + return self.reactor.add_periodic_timer( interval ) do + self.run_monitor( monitor ) end end ### Create a one-shot ZMQ::Timer that will register the interval timer for the specified ### +monitor+ after a random number of seconds no greater than its splay. - def splay_timer_for( monitor ) + def add_splay_timer_for( monitor ) delay = rand( monitor.splay ) self.log.debug "Splaying registration of %p for %ds" % [ monitor, delay ] - return ZMQ::Timer.new( delay, 1 ) do - interval_timer = self.interval_timer_for( monitor ) - self.reactor.register_timer( interval_timer ) + self.reactor.add_oneshot_timer( delay ) do + self.add_interval_timer_for( monitor ) end end + + + # + # :section: Signal Handling + # These methods set up some behavior for starting, restarting, and stopping + # the manager when a signal is received. + # + + ### Handle signals. + def handle_signal( sig ) + self.log.debug "Handling signal %s" % [ sig ] + case sig + when :INT, :TERM + self.on_termination_signal( sig ) + + when :HUP + self.on_hangup_signal( sig ) + + when :USR1 + self.on_user1_signal( sig ) + + else + self.log.warn "Unhandled signal %s" % [ sig ] + end + + end + + + ### Handle a TERM signal. Shuts the handler down after handling any current request/s. Also + ### aliased to #on_interrupt_signal. + def on_termination_signal( signo ) + self.log.warn "Terminated (%p)" % [ signo ] + self.stop + end + alias_method :on_interrupt_signal, :on_termination_signal + + + ### Handle a hangup by restarting the runner. + def on_hangup_signal( signo ) + self.log.warn "Hangup (%p)" % [ signo ] + self.restart + end + end # class Arborist::MonitorRunner