lib/arborist/observer_runner.rb in arborist-0.1.0 vs lib/arborist/observer_runner.rb in arborist-0.2.0.pre20170519125456
- old
+ new
@@ -1,129 +1,41 @@
# -*- ruby -*-
#encoding: utf-8
-require 'rbczmq'
+require 'cztop'
+require 'cztop/reactor'
+require 'cztop/reactor/signal_handling'
require 'loggability'
require 'arborist' unless defined?( Arborist )
require 'arborist/client'
require 'arborist/observer'
-# Undo the useless scoping
-class ZMQ::Loop
- public_class_method :instance
-end
-
-
# An event-driven runner for Arborist::Observers.
class Arborist::ObserverRunner
extend Loggability
+ include CZTop::Reactor::SignalHandling
- log_to :arborist
+ # Signals the observer runner responds to
+ QUEUE_SIGS = [
+ :INT, :TERM, :HUP,
+ # :TODO: :QUIT, :WINCH, :USR1, :USR2, :TTIN, :TTOU
+ ] & Signal.list.keys.map( &:to_sym )
- # A ZMQ::Handler object for managing IO for all running observers.
- class Handler < ZMQ::Handler
- extend Loggability,
- Arborist::MethodUtilities
- log_to :arborist
+ log_to :arborist
- ### Create a ZMQ::Handler that acts as the agent that runs the specified
- ### +observer+.
- def initialize( runner, reactor )
- @runner = runner
- @client = Arborist::Client.new
- @pollitem = ZMQ::Pollitem.new( @client.event_api, ZMQ::POLLIN )
- @pollitem.handler = self
- @subscriptions = {}
- reactor.register( @pollitem )
- end
-
-
- ######
- public
- ######
-
- # The Arborist::ObserverRunner that owns this handler.
- attr_reader :runner
-
- # The Arborist::Client that will be used for creating and tearing down subscriptions
- attr_reader :client
-
- # The map of subscription IDs to the Observer which it was created for.
- attr_reader :subscriptions
-
-
- ### Unsubscribe from and clear all current subscriptions.
- def reset
- self.log.warn "Resetting the observer handler."
- self.subscriptions.keys.each do |subid|
- self.client.event_api.unsubscribe( subid )
- end
- self.subscriptions.clear
- end
-
-
- ### Add the specified +observer+ and subscribe to the events it wishes to receive.
- def add_observer( observer )
- self.log.info "Adding observer: %s" % [ observer.description ]
- observer.subscriptions.each do |sub|
- subid = self.client.subscribe( sub )
- self.subscriptions[ subid ] = observer
- self.client.event_api.subscribe( subid )
- self.log.debug " subscribed to %p with subscription %s" % [ sub, subid ]
- end
- end
-
-
- ### Remove the specified +observer+ after unsubscribing from its events.
- def remove_observer( observer )
- self.log.info "Removing observer: %s" % [ observer.description ]
-
- self.subscriptions.keys.each do |subid|
- next unless self.subscriptions[ subid ] == observer
-
- self.client.unsubscribe( subid )
- self.subscriptions.delete( subid )
- self.client.event_api.unsubscribe( subid )
- self.log.debug " unsubscribed from %p" % [ subid ]
- end
- end
-
-
- ### Read events from the event socket when it becomes readable, and dispatch them to
- ### the correct observer.
- def on_readable
- subid = self.recv
- raise "Partial write?!" unless self.pollitem.pollable.rcvmore?
- raw_event = self.recv
- event = MessagePack.unpack( raw_event )
-
- if (( observer = self.subscriptions[subid] ))
- observer.handle_event( subid, event )
- elsif subid.start_with?( 'sys.' )
- self.log.debug "System event! %p" % [ event ]
- self.runner.handle_system_event( subid, event )
- else
- self.log.warn "Ignoring event %p for which we have no observer." % [ subid ]
- end
-
- return true
- end
-
- end # class Handler
-
-
### Create a new Arborist::ObserverRunner
def initialize
- @observers = []
- @timers = []
- @handler = nil
- @reactor = ZMQ::Loop.new
+ @observers = []
+ @timers = []
+ @subscriptions = {}
+ @reactor = CZTop::Reactor.new
+ @client = Arborist::Client.new
@manager_last_runid = nil
end
######
@@ -134,53 +46,73 @@
attr_reader :observers
# The Array of registered ZMQ::Timers
attr_reader :timers
- # The ZMQ::Handler subclass that handles all async IO
- attr_accessor :handler
-
- # The reactor (a ZMQ::Loop) the runner uses to drive everything
+ # The reactor (a CZTop::Reactor) the runner uses to drive everything
attr_accessor :reactor
+ # The Arborist::Client that will be used for creating and tearing down subscriptions
+ attr_reader :client
+ # The map of subscription IDs to the Observer which it was created for.
+ attr_reader :subscriptions
+
+
### Load observers from the specified +enumerator+.
def load_observers( enumerator )
- @observers += enumerator.to_a
+ self.observers.concat( enumerator.to_a )
end
### Run the specified +observers+
def run
- self.handler = Arborist::ObserverRunner::Handler.new( self, self.reactor )
-
+ self.log.info "Starting!"
self.register_observers
self.register_observer_timers
self.subscribe_to_system_events
- self.reactor.start
- rescue Interrupt
- $stderr.puts "Interrupted!"
- self.stop
+ self.reactor.register( self.client.event_api, :read, &self.method(:on_subscription_event) )
+
+ self.with_signal_handler( self.reactor, *QUEUE_SIGS ) do
+ self.reactor.start_polling( ignore_interrupts: true )
+ end
end
### Stop the observer
def stop
- self.observers.each do |observer|
- self.remove_timers
- self.handler.remove_observer( observer )
- end
+ self.log.info "Stopping!"
+ self.remove_timers
+ self.unregister_observers
+ self.reactor.stop_polling
+ end
- self.reactor.stop
+
+ ### Restart the observer, resetting all of its observers' subscriptions.
+ def restart
+ self.log.info "Restarting!"
+ self.reactor.timers.pause
+ self.unregister_observers
+
+ self.register_observers
+ self.reactor.timers.resume
end
- ### Register each of the runner's Observers with its handler.
+ ### Returns true if the ObserverRunner is running.
+ def running?
+ return self.reactor &&
+ self.client &&
+ self.reactor.registered?( self.client.event_api )
+ end
+
+
+ ### Add subscriptions for all of the observers loaded into the runner.
def register_observers
self.observers.each do |observer|
- self.handler.add_observer( observer )
+ self.add_observer( observer )
end
end
### Register timers for each Observer.
@@ -189,54 +121,159 @@
self.add_timers_for( observer )
end
end
+ ### Remove the subscriptions belonging to the loaded observers.
+ def unregister_observers
+ self.observers.each do |observer|
+ self.remove_observer( observer )
+ end
+ end
+
+
### Subscribe the runner to system events published by the Manager.
def subscribe_to_system_events
- self.handler.client.event_api.subscribe( 'sys.' )
+ self.client.event_api.subscribe( 'sys.' )
end
### Register a timer for the specified +observer+.
def add_timers_for( observer )
observer.timers.each do |interval, callback|
self.log.info "Creating timer for %s observer to run %p every %ds" %
[ observer.description, callback, interval ]
- timer = ZMQ::Timer.new( interval, 0, &callback )
- self.reactor.register_timer( timer )
+ timer = self.reactor.add_periodic_timer( interval, &callback )
self.timers << timer
end
end
### Remove any registered timers.
def remove_timers
self.timers.each do |timer|
- self.reactor.cancel_timer( timer )
+ self.reactor.remove_timer( timer )
end
end
+ ### Unsubscribe from and clear all current subscriptions.
+ def reset
+ self.log.warn "Resetting observer subscriptions."
+ self.subscriptions.keys.each do |subid|
+ self.client.event_api.unsubscribe( subid )
+ end
+ self.subscriptions.clear
+ end
+
+
+ ### Add the specified +observer+ and subscribe to the events it wishes to receive.
+ def add_observer( observer )
+ self.log.info "Adding observer: %s" % [ observer.description ]
+ observer.subscriptions.each do |sub|
+ subid = self.client.subscribe( sub )
+ self.subscriptions[ subid ] = observer
+ self.client.event_api.subscribe( subid )
+ self.log.debug " subscribed to %p with subscription %s" % [ sub, subid ]
+ end
+ end
+
+
+ ### Remove the specified +observer+ after unsubscribing from its events.
+ def remove_observer( observer )
+ self.log.info "Removing observer: %s" % [ observer.description ]
+
+ self.subscriptions.keys.each do |subid|
+ next unless self.subscriptions[ subid ] == observer
+
+ self.client.unsubscribe( subid )
+ self.subscriptions.delete( subid )
+ self.client.event_api.unsubscribe( subid )
+ self.log.debug " unsubscribed from %p" % [ subid ]
+ end
+ end
+
+
+ ### Handle IO events from the reactor.
+ def on_subscription_event( event )
+ if event.readable?
+ msg = event.socket.receive
+ subid, event = Arborist::EventAPI.decode( msg )
+
+ if (( observer = self.subscriptions[subid] ))
+ self.log.debug "Got %p event for %p" % [ subid, observer ]
+ observer.handle_event( subid, event )
+ elsif subid.start_with?( 'sys.' )
+ self.log.debug "System event! %p" % [ event ]
+ self.handle_system_event( subid, event )
+ else
+ self.log.warn "Ignoring event %p for which we have no observer." % [ subid ]
+ end
+ else
+ raise "Unhandled event %p on the event socket" % [ event ]
+ end
+ end
+
+
### Handle a `sys.` event from the Manager being observed.
def handle_system_event( event_type, event )
self.log.debug "Got a %s event from the Manager: %p" % [ event_type, event ]
case event_type
when 'sys.heartbeat'
this_runid = event['run_id']
if @manager_last_runid && this_runid != @manager_last_runid
self.log.warn "Manager run ID changed: re-subscribing"
- self.handler.reset
+ self.reset
self.register_observers
end
@manager_last_runid = this_runid
when 'sys.node_added', 'sys.node_removed'
# no-op
else
# no-op
end
end
+
+
+ #
+ # :section: Signal Handling
+ # These methods set up some behavior for starting, restarting, and stopping
+ # the runner when a signal is received.
+ #
+
+ ### Handle signals.
+ def handle_signal( sig )
+ self.log.debug "Handling signal %s" % [ sig ]
+ case sig
+ when :INT, :TERM
+ self.on_termination_signal( sig )
+
+ when :HUP
+ self.on_hangup_signal( sig )
+
+ else
+ self.log.warn "Unhandled signal %s" % [ sig ]
+ end
+
+ end
+
+
+ ### Handle a TERM signal. Shuts the handler down after handling any current request/s. Also
+ ### aliased to #on_interrupt_signal.
+ def on_termination_signal( signo )
+ self.log.warn "Terminated (%p)" % [ signo ]
+ self.stop
+ end
+ alias_method :on_interrupt_signal, :on_termination_signal
+
+
+ ### Handle a HUP signal. The default is to restart the handler.
+ def on_hangup_signal( signo )
+ self.log.warn "Hangup (%p)" % [ signo ]
+ self.restart
+ end
+
end # class Arborist::ObserverRunner