lib/arborist/observer_runner.rb in arborist-0.1.0 vs lib/arborist/observer_runner.rb in arborist-0.2.0.pre20170519125456

- old
+ new

@@ -1,129 +1,41 @@ # -*- ruby -*- #encoding: utf-8 -require 'rbczmq' +require 'cztop' +require 'cztop/reactor' +require 'cztop/reactor/signal_handling' require 'loggability' require 'arborist' unless defined?( Arborist ) require 'arborist/client' require 'arborist/observer' -# Undo the useless scoping -class ZMQ::Loop - public_class_method :instance -end - - # An event-driven runner for Arborist::Observers. class Arborist::ObserverRunner extend Loggability + include CZTop::Reactor::SignalHandling - log_to :arborist + # Signals the observer runner responds to + QUEUE_SIGS = [ + :INT, :TERM, :HUP, + # :TODO: :QUIT, :WINCH, :USR1, :USR2, :TTIN, :TTOU + ] & Signal.list.keys.map( &:to_sym ) - # A ZMQ::Handler object for managing IO for all running observers. - class Handler < ZMQ::Handler - extend Loggability, - Arborist::MethodUtilities - log_to :arborist + log_to :arborist - ### Create a ZMQ::Handler that acts as the agent that runs the specified - ### +observer+. - def initialize( runner, reactor ) - @runner = runner - @client = Arborist::Client.new - @pollitem = ZMQ::Pollitem.new( @client.event_api, ZMQ::POLLIN ) - @pollitem.handler = self - @subscriptions = {} - reactor.register( @pollitem ) - end - - - ###### - public - ###### - - # The Arborist::ObserverRunner that owns this handler. - attr_reader :runner - - # The Arborist::Client that will be used for creating and tearing down subscriptions - attr_reader :client - - # The map of subscription IDs to the Observer which it was created for. - attr_reader :subscriptions - - - ### Unsubscribe from and clear all current subscriptions. - def reset - self.log.warn "Resetting the observer handler." - self.subscriptions.keys.each do |subid| - self.client.event_api.unsubscribe( subid ) - end - self.subscriptions.clear - end - - - ### Add the specified +observer+ and subscribe to the events it wishes to receive. - def add_observer( observer ) - self.log.info "Adding observer: %s" % [ observer.description ] - observer.subscriptions.each do |sub| - subid = self.client.subscribe( sub ) - self.subscriptions[ subid ] = observer - self.client.event_api.subscribe( subid ) - self.log.debug " subscribed to %p with subscription %s" % [ sub, subid ] - end - end - - - ### Remove the specified +observer+ after unsubscribing from its events. - def remove_observer( observer ) - self.log.info "Removing observer: %s" % [ observer.description ] - - self.subscriptions.keys.each do |subid| - next unless self.subscriptions[ subid ] == observer - - self.client.unsubscribe( subid ) - self.subscriptions.delete( subid ) - self.client.event_api.unsubscribe( subid ) - self.log.debug " unsubscribed from %p" % [ subid ] - end - end - - - ### Read events from the event socket when it becomes readable, and dispatch them to - ### the correct observer. - def on_readable - subid = self.recv - raise "Partial write?!" unless self.pollitem.pollable.rcvmore? - raw_event = self.recv - event = MessagePack.unpack( raw_event ) - - if (( observer = self.subscriptions[subid] )) - observer.handle_event( subid, event ) - elsif subid.start_with?( 'sys.' ) - self.log.debug "System event! %p" % [ event ] - self.runner.handle_system_event( subid, event ) - else - self.log.warn "Ignoring event %p for which we have no observer." % [ subid ] - end - - return true - end - - end # class Handler - - ### Create a new Arborist::ObserverRunner def initialize - @observers = [] - @timers = [] - @handler = nil - @reactor = ZMQ::Loop.new + @observers = [] + @timers = [] + @subscriptions = {} + @reactor = CZTop::Reactor.new + @client = Arborist::Client.new @manager_last_runid = nil end ###### @@ -134,53 +46,73 @@ attr_reader :observers # The Array of registered ZMQ::Timers attr_reader :timers - # The ZMQ::Handler subclass that handles all async IO - attr_accessor :handler - - # The reactor (a ZMQ::Loop) the runner uses to drive everything + # The reactor (a CZTop::Reactor) the runner uses to drive everything attr_accessor :reactor + # The Arborist::Client that will be used for creating and tearing down subscriptions + attr_reader :client + # The map of subscription IDs to the Observer which it was created for. + attr_reader :subscriptions + + ### Load observers from the specified +enumerator+. def load_observers( enumerator ) - @observers += enumerator.to_a + self.observers.concat( enumerator.to_a ) end ### Run the specified +observers+ def run - self.handler = Arborist::ObserverRunner::Handler.new( self, self.reactor ) - + self.log.info "Starting!" self.register_observers self.register_observer_timers self.subscribe_to_system_events - self.reactor.start - rescue Interrupt - $stderr.puts "Interrupted!" - self.stop + self.reactor.register( self.client.event_api, :read, &self.method(:on_subscription_event) ) + + self.with_signal_handler( self.reactor, *QUEUE_SIGS ) do + self.reactor.start_polling( ignore_interrupts: true ) + end end ### Stop the observer def stop - self.observers.each do |observer| - self.remove_timers - self.handler.remove_observer( observer ) - end + self.log.info "Stopping!" + self.remove_timers + self.unregister_observers + self.reactor.stop_polling + end - self.reactor.stop + + ### Restart the observer, resetting all of its observers' subscriptions. + def restart + self.log.info "Restarting!" + self.reactor.timers.pause + self.unregister_observers + + self.register_observers + self.reactor.timers.resume end - ### Register each of the runner's Observers with its handler. + ### Returns true if the ObserverRunner is running. + def running? + return self.reactor && + self.client && + self.reactor.registered?( self.client.event_api ) + end + + + ### Add subscriptions for all of the observers loaded into the runner. def register_observers self.observers.each do |observer| - self.handler.add_observer( observer ) + self.add_observer( observer ) end end ### Register timers for each Observer. @@ -189,54 +121,159 @@ self.add_timers_for( observer ) end end + ### Remove the subscriptions belonging to the loaded observers. + def unregister_observers + self.observers.each do |observer| + self.remove_observer( observer ) + end + end + + ### Subscribe the runner to system events published by the Manager. def subscribe_to_system_events - self.handler.client.event_api.subscribe( 'sys.' ) + self.client.event_api.subscribe( 'sys.' ) end ### Register a timer for the specified +observer+. def add_timers_for( observer ) observer.timers.each do |interval, callback| self.log.info "Creating timer for %s observer to run %p every %ds" % [ observer.description, callback, interval ] - timer = ZMQ::Timer.new( interval, 0, &callback ) - self.reactor.register_timer( timer ) + timer = self.reactor.add_periodic_timer( interval, &callback ) self.timers << timer end end ### Remove any registered timers. def remove_timers self.timers.each do |timer| - self.reactor.cancel_timer( timer ) + self.reactor.remove_timer( timer ) end end + ### Unsubscribe from and clear all current subscriptions. + def reset + self.log.warn "Resetting observer subscriptions." + self.subscriptions.keys.each do |subid| + self.client.event_api.unsubscribe( subid ) + end + self.subscriptions.clear + end + + + ### Add the specified +observer+ and subscribe to the events it wishes to receive. + def add_observer( observer ) + self.log.info "Adding observer: %s" % [ observer.description ] + observer.subscriptions.each do |sub| + subid = self.client.subscribe( sub ) + self.subscriptions[ subid ] = observer + self.client.event_api.subscribe( subid ) + self.log.debug " subscribed to %p with subscription %s" % [ sub, subid ] + end + end + + + ### Remove the specified +observer+ after unsubscribing from its events. + def remove_observer( observer ) + self.log.info "Removing observer: %s" % [ observer.description ] + + self.subscriptions.keys.each do |subid| + next unless self.subscriptions[ subid ] == observer + + self.client.unsubscribe( subid ) + self.subscriptions.delete( subid ) + self.client.event_api.unsubscribe( subid ) + self.log.debug " unsubscribed from %p" % [ subid ] + end + end + + + ### Handle IO events from the reactor. + def on_subscription_event( event ) + if event.readable? + msg = event.socket.receive + subid, event = Arborist::EventAPI.decode( msg ) + + if (( observer = self.subscriptions[subid] )) + self.log.debug "Got %p event for %p" % [ subid, observer ] + observer.handle_event( subid, event ) + elsif subid.start_with?( 'sys.' ) + self.log.debug "System event! %p" % [ event ] + self.handle_system_event( subid, event ) + else + self.log.warn "Ignoring event %p for which we have no observer." % [ subid ] + end + else + raise "Unhandled event %p on the event socket" % [ event ] + end + end + + ### Handle a `sys.` event from the Manager being observed. def handle_system_event( event_type, event ) self.log.debug "Got a %s event from the Manager: %p" % [ event_type, event ] case event_type when 'sys.heartbeat' this_runid = event['run_id'] if @manager_last_runid && this_runid != @manager_last_runid self.log.warn "Manager run ID changed: re-subscribing" - self.handler.reset + self.reset self.register_observers end @manager_last_runid = this_runid when 'sys.node_added', 'sys.node_removed' # no-op else # no-op end end + + + # + # :section: Signal Handling + # These methods set up some behavior for starting, restarting, and stopping + # the runner when a signal is received. + # + + ### Handle signals. + def handle_signal( sig ) + self.log.debug "Handling signal %s" % [ sig ] + case sig + when :INT, :TERM + self.on_termination_signal( sig ) + + when :HUP + self.on_hangup_signal( sig ) + + else + self.log.warn "Unhandled signal %s" % [ sig ] + end + + end + + + ### Handle a TERM signal. Shuts the handler down after handling any current request/s. Also + ### aliased to #on_interrupt_signal. + def on_termination_signal( signo ) + self.log.warn "Terminated (%p)" % [ signo ] + self.stop + end + alias_method :on_interrupt_signal, :on_termination_signal + + + ### Handle a HUP signal. The default is to restart the handler. + def on_hangup_signal( signo ) + self.log.warn "Hangup (%p)" % [ signo ] + self.restart + end + end # class Arborist::ObserverRunner