lib/arborist/monitor_runner.rb in arborist-0.1.0 vs lib/arborist/monitor_runner.rb in arborist-0.2.0.pre20170519125456
- old
+ new
@@ -1,224 +1,261 @@
# -*- ruby -*-
#encoding: utf-8
-require 'rbczmq'
+require 'cztop'
+require 'cztop/reactor'
+require 'cztop/reactor/signal_handling'
require 'loggability'
require 'arborist' unless defined?( Arborist )
require 'arborist/client'
-# Undo the useless scoping
-class ZMQ::Loop
- public_class_method :instance
-end
-
-
# An event-driven runner for Arborist::Monitors.
class Arborist::MonitorRunner
extend Loggability
+ include CZTop::Reactor::SignalHandling
- log_to :arborist
+ # Signals the runner handles
+ QUEUE_SIGS = [
+ :INT, :TERM, :HUP, :USR1,
+ # :TODO: :QUIT, :WINCH, :USR2, :TTIN, :TTOU
+ ] & Signal.list.keys.map( &:to_sym )
- # A ZMQ::Handler object for managing IO for all running monitors.
- class Handler < ZMQ::Handler
- extend Loggability,
- Arborist::MethodUtilities
+ log_to :arborist
- log_to :arborist
- ### Create a ZMQ::Handler that acts as the agent that runs the specified
- ### +monitor+.
- def initialize( reactor )
- @reactor = reactor
- @client = Arborist::Client.new
- @pollitem = ZMQ::Pollitem.new( @client.tree_api, ZMQ::POLLOUT )
- @pollitem.handler = self
+ ### Create a new Arborist::MonitorRunner
+ def initialize
+ @monitors = []
+ @handler = nil
+ @reactor = CZTop::Reactor.new
+ @client = Arborist::Client.new
+ @request_queue = {}
+ end
- @request_queue = {}
- @registered = false
- end
+ ######
+ public
+ ######
- ######
- public
- ######
+ ##
+ # The Array of loaded Arborist::Monitors the runner should run.
+ attr_reader :monitors
- # The ZMQ::Loop that this runner is registered with
- attr_reader :reactor
+ ##
+ # The ZMQ::Handler subclass that handles all async IO
+ attr_accessor :handler
- # The Queue of pending requests, keyed by the callback that should be called with the
- # results.
- attr_reader :request_queue
+ ##
+ # The reactor (a ZMQ::Loop) the runner uses to drive everything
+ attr_accessor :reactor
- # The Arborist::Client that will provide the message packing and unpacking
- attr_reader :client
+ ##
+ # The Queue of pending requests, keyed by the callback that should be called with the
+ # results.
+ attr_reader :request_queue
- ##
- # True if the Handler is registered to write one or more requests
- attr_predicate :registered
+ ##
+ # The Arborist::Client that will provide the message packing and unpacking
+ attr_reader :client
- ### Run the specified +monitor+ and update nodes with the results.
- def run_monitor( monitor )
- positive = monitor.positive_criteria
- negative = monitor.negative_criteria
- include_down = monitor.include_down?
- props = monitor.node_properties
+ ### Load monitors from the specified +enumerator+.
+ def load_monitors( enumerator )
+ self.monitors.concat( enumerator.to_a )
+ end
- self.fetch( positive, include_down, props, negative ) do |nodes|
- results = monitor.run( nodes )
- monitor_key = monitor.key
- results.each do |ident, properties|
- properties['_monitor_key'] = monitor_key
- end
-
- self.update( results ) do
- self.log.debug "Updated %d via the '%s' monitor" %
- [ results.length, monitor.description ]
- end
- end
+ ### Run the specified +monitors+
+ def run
+ self.with_signal_handler( self.reactor, *QUEUE_SIGS ) do
+ self.reactor.register( self.client.tree_api, :write, &self.method(:handle_io_event) )
+ self.reactor.start_polling
end
+ end
- ### Create a fetch request using the runner's client, then queue the request up
- ### with the specified +block+ as the callback.
- def fetch( criteria, include_down, properties, negative={}, &block )
- fetch = self.client.make_fetch_request( criteria,
- include_down: include_down,
- properties: properties,
- exclude: negative
- )
- self.queue_request( fetch, &block )
- end
+ ### Restart the runner
+ def restart
+ # :TODO: Kill any running monitor children, cancel monitor timers, and reload
+ # monitors from the monitor enumerator
+ raise NotImplementedError
+ end
- ### Create an update request using the runner's client, then queue the request up
- ### with the specified +block+ as the callback.
- def update( nodemap, &block )
- update = self.client.make_update_request( nodemap )
- self.queue_request( update, &block )
- end
+ ### Stop the runner.
+ def stop
+ self.log.info "Stopping the runner."
+ self.reactor.stop_polling
+ end
- ### Add the specified +event+ to the queue to be published to the console event
- ### socket
- def queue_request( request, &callback )
- self.request_queue[ callback ] = request
- self.register
- end
-
-
- ### Register the handler's pollitem as being ready to write if it isn't already.
- def register
- # self.log.debug "Registering for writing."
- self.reactor.register( self.pollitem ) unless @registered
- @registered = true
- end
-
-
- ### Unregister the handler's pollitem from the reactor when there's nothing ready
- ### to write.
- def unregister
- # self.log.debug "Unregistering for writing."
- self.reactor.remove( self.pollitem ) if @registered
- @registered = false
- end
-
-
- ### Write commands from the queue
- def on_writable
+ ### Reactor callback -- handle the client's socket becoming writable.
+ def handle_io_event( event )
+ if event.writable?
if (( pair = self.request_queue.shift ))
callback, request = *pair
res = self.client.send_tree_api_request( request )
callback.call( res )
end
self.unregister if self.request_queue.empty?
- return true
+ else
+ raise "Unexpected %p on the tree API socket" % [ event ]
end
- end # class Handler
+ end
- ### Create a new Arborist::MonitorRunner
- def initialize
- @monitors = []
- @handler = nil
- @reactor = ZMQ::Loop.new
+ ### Run the specified +monitor+ and update nodes with the results.
+ def run_monitor( monitor )
+ positive = monitor.positive_criteria
+ negative = monitor.negative_criteria
+ include_down = monitor.include_down?
+ props = monitor.node_properties
+
+ self.fetch( positive, include_down, props, negative ) do |nodes|
+ results = monitor.run( nodes )
+ monitor_key = monitor.key
+
+ results.each do |ident, properties|
+ properties['_monitor_key'] = monitor_key
+ end
+
+ self.update( results ) do
+ self.log.debug "Updated %d via the '%s' monitor" %
+ [ results.length, monitor.description ]
+ end
+ end
end
- ######
- public
- ######
+ ### Create a fetch request using the runner's client, then queue the request up
+ ### with the specified +block+ as the callback.
+ def fetch( criteria, include_down, properties, negative={}, &block )
+ fetch = self.client.make_fetch_request( criteria,
+ include_down: include_down,
+ properties: properties,
+ exclude: negative
+ )
+ self.queue_request( fetch, &block )
+ end
- # The Array of loaded Arborist::Monitors the runner should run.
- attr_reader :monitors
- # The ZMQ::Handler subclass that handles all async IO
- attr_accessor :handler
+ ### Create an update request using the runner's client, then queue the request up
+ ### with the specified +block+ as the callback.
+ def update( nodemap, &block )
+ update = self.client.make_update_request( nodemap )
+ self.queue_request( update, &block )
+ end
- # The reactor (a ZMQ::Loop) the runner uses to drive everything
- attr_accessor :reactor
+ ### Add the specified +event+ to the queue to be published to the console event
+ ### socket
+ def queue_request( request, &callback )
+ self.request_queue[ callback ] = request
+ self.register
+ end
- ### Load monitors from the specified +enumerator+.
- def load_monitors( enumerator )
- @monitors += enumerator.to_a
+
+ ### Returns +true+ if the runner's client socket is currently registered for writing.
+ def registered?
+ return self.reactor.event_enabled?( self.client.tree_api, :write )
end
- ### Run the specified +monitors+
- def run
- self.handler = Arborist::MonitorRunner::Handler.new( self.reactor )
+ ### Register the handler's pollitem as being ready to write if it isn't already.
+ def register
+ # self.log.debug "Registering for writing."
+ self.reactor.enable_events( self.client.tree_api, :write ) unless self.registered?
+ end
- self.monitors.each do |mon|
- self.add_timer_for( mon )
- end
- self.reactor.start
+ ### Unregister the handler's pollitem from the reactor when there's nothing ready
+ ### to write.
+ def unregister
+ # self.log.debug "Unregistering for writing."
+ self.reactor.disable_events( self.client.tree_api, :write ) if self.registered?
end
### Register a timer for the specified +monitor+.
def add_timer_for( monitor )
interval = monitor.interval
- timer = if monitor.splay.nonzero?
- self.splay_timer_for( monitor )
- else
- self.interval_timer_for( monitor )
- end
-
- self.reactor.register_timer( timer )
+ if monitor.splay.nonzero?
+ self.add_splay_timer_for( monitor )
+ else
+ self.add_interval_timer_for( monitor )
+ end
end
### Create a repeating ZMQ::Timer that will run the specified monitor on its interval.
- def interval_timer_for( monitor )
+ def add_interval_timer_for( monitor )
interval = monitor.interval
self.log.info "Creating timer for %p" % [ monitor ]
- return ZMQ::Timer.new( interval, 0 ) do
- self.handler.run_monitor( monitor )
+ return self.reactor.add_periodic_timer( interval ) do
+ self.run_monitor( monitor )
end
end
### Create a one-shot ZMQ::Timer that will register the interval timer for the specified
### +monitor+ after a random number of seconds no greater than its splay.
- def splay_timer_for( monitor )
+ def add_splay_timer_for( monitor )
delay = rand( monitor.splay )
self.log.debug "Splaying registration of %p for %ds" % [ monitor, delay ]
- return ZMQ::Timer.new( delay, 1 ) do
- interval_timer = self.interval_timer_for( monitor )
- self.reactor.register_timer( interval_timer )
+ self.reactor.add_oneshot_timer( delay ) do
+ self.add_interval_timer_for( monitor )
end
end
+
+
+ #
+ # :section: Signal Handling
+ # These methods set up some behavior for starting, restarting, and stopping
+ # the manager when a signal is received.
+ #
+
+ ### Handle signals.
+ def handle_signal( sig )
+ self.log.debug "Handling signal %s" % [ sig ]
+ case sig
+ when :INT, :TERM
+ self.on_termination_signal( sig )
+
+ when :HUP
+ self.on_hangup_signal( sig )
+
+ when :USR1
+ self.on_user1_signal( sig )
+
+ else
+ self.log.warn "Unhandled signal %s" % [ sig ]
+ end
+
+ end
+
+
+ ### Handle a TERM signal. Shuts the handler down after handling any current request/s. Also
+ ### aliased to #on_interrupt_signal.
+ def on_termination_signal( signo )
+ self.log.warn "Terminated (%p)" % [ signo ]
+ self.stop
+ end
+ alias_method :on_interrupt_signal, :on_termination_signal
+
+
+ ### Handle a hangup by restarting the runner.
+ def on_hangup_signal( signo )
+ self.log.warn "Hangup (%p)" % [ signo ]
+ self.restart
+ end
+
end # class Arborist::MonitorRunner