lib/arborist/observer_runner.rb in arborist-0.0.1.pre20160606141735 vs lib/arborist/observer_runner.rb in arborist-0.0.1.pre20160829140603

- old
+ new

@@ -29,11 +29,12 @@ log_to :arborist ### Create a ZMQ::Handler that acts as the agent that runs the specified ### +observer+. - def initialize( reactor ) + def initialize( runner, reactor ) + @runner = runner @client = Arborist::Client.new @pollitem = ZMQ::Pollitem.new( @client.event_api, ZMQ::POLLIN ) @pollitem.handler = self @subscriptions = {} @@ -43,17 +44,30 @@ ###### public ###### + # The Arborist::ObserverRunner that owns this handler. + attr_reader :runner + # The Arborist::Client that will be used for creating and tearing down subscriptions attr_reader :client # The map of subscription IDs to the Observer which it was created for. attr_reader :subscriptions + ### Unsubscribe from and clear all current subscriptions. + def reset + self.log.warn "Resetting the observer handler." + self.subscriptions.keys.each do |subid| + self.client.event_api.unsubscribe( subid ) + end + self.subscriptions.clear + end + + ### Add the specified +observer+ and subscribe to the events it wishes to receive. def add_observer( observer ) self.log.info "Adding observer: %s" % [ observer.description ] observer.subscriptions.each do |sub| subid = self.client.subscribe( sub ) @@ -83,14 +97,17 @@ ### the correct observer. def on_readable subid = self.recv raise "Partial write?!" unless self.pollitem.pollable.rcvmore? raw_event = self.recv + event = MessagePack.unpack( raw_event ) if (( observer = self.subscriptions[subid] )) - event = MessagePack.unpack( raw_event ) observer.handle_event( subid, event ) + elsif subid.start_with?( 'sys.' ) + self.log.debug "System event! %p" % [ event ] + self.runner.handle_system_event( subid, event ) else self.log.warn "Ignoring event %p for which we have no observer." % [ subid ] end return true @@ -103,10 +120,11 @@ def initialize @observers = [] @timers = [] @handler = nil @reactor = ZMQ::Loop.new + @manager_last_runid = nil end ###### public @@ -131,16 +149,15 @@ end ### Run the specified +observers+ def run - self.handler = Arborist::ObserverRunner::Handler.new( self.reactor ) + self.handler = Arborist::ObserverRunner::Handler.new( self, self.reactor ) - self.observers.each do |observer| - self.handler.add_observer( observer ) - self.add_timers_for( observer ) - end + self.register_observers + self.register_observer_timers + self.subscribe_to_system_events self.reactor.start rescue Interrupt $stderr.puts "Interrupted!" self.stop @@ -156,10 +173,32 @@ self.reactor.stop end + ### Register each of the runner's Observers with its handler. + def register_observers + self.observers.each do |observer| + self.handler.add_observer( observer ) + end + end + + + ### Register timers for each Observer. + def register_observer_timers + self.observers.each do |observer| + self.add_timers_for( observer ) + end + end + + + ### Subscribe the runner to system events published by the Manager. + def subscribe_to_system_events + self.handler.client.event_api.subscribe( 'sys.' ) + end + + ### Register a timer for the specified +observer+. def add_timers_for( observer ) observer.timers.each do |interval, callback| self.log.info "Creating timer for %s observer to run %p every %ds" % [ observer.description, callback, interval ] @@ -172,9 +211,31 @@ ### Remove any registered timers. def remove_timers self.timers.each do |timer| self.reactor.cancel_timer( timer ) + end + end + + + ### Handle a `sys.` event from the Manager being observed. + def handle_system_event( event_type, event ) + self.log.debug "Got a %s event from the Manager: %p" % [ event_type, event ] + + case event_type + when 'sys.heartbeat' + this_runid = event['run_id'] + if @manager_last_runid && this_runid != @manager_last_runid + self.log.warn "Manager run ID changed: re-subscribing" + self.handler.reset + self.register_observers + end + + @manager_last_runid = this_runid + when 'sys.node_added', 'sys.node_removed' + # no-op + else + # no-op end end end # class Arborist::ObserverRunner