lib/arborist/observer_runner.rb in arborist-0.0.1.pre20160606141735 vs lib/arborist/observer_runner.rb in arborist-0.0.1.pre20160829140603
- old
+ new
@@ -29,11 +29,12 @@
log_to :arborist
### Create a ZMQ::Handler that acts as the agent that runs the specified
### +observer+.
- def initialize( reactor )
+ def initialize( runner, reactor )
+ @runner = runner
@client = Arborist::Client.new
@pollitem = ZMQ::Pollitem.new( @client.event_api, ZMQ::POLLIN )
@pollitem.handler = self
@subscriptions = {}
@@ -43,17 +44,30 @@
######
public
######
+ # The Arborist::ObserverRunner that owns this handler.
+ attr_reader :runner
+
# The Arborist::Client that will be used for creating and tearing down subscriptions
attr_reader :client
# The map of subscription IDs to the Observer which it was created for.
attr_reader :subscriptions
+ ### Unsubscribe from and clear all current subscriptions.
+ def reset
+ self.log.warn "Resetting the observer handler."
+ self.subscriptions.keys.each do |subid|
+ self.client.event_api.unsubscribe( subid )
+ end
+ self.subscriptions.clear
+ end
+
+
### Add the specified +observer+ and subscribe to the events it wishes to receive.
def add_observer( observer )
self.log.info "Adding observer: %s" % [ observer.description ]
observer.subscriptions.each do |sub|
subid = self.client.subscribe( sub )
@@ -83,14 +97,17 @@
### the correct observer.
def on_readable
subid = self.recv
raise "Partial write?!" unless self.pollitem.pollable.rcvmore?
raw_event = self.recv
+ event = MessagePack.unpack( raw_event )
if (( observer = self.subscriptions[subid] ))
- event = MessagePack.unpack( raw_event )
observer.handle_event( subid, event )
+ elsif subid.start_with?( 'sys.' )
+ self.log.debug "System event! %p" % [ event ]
+ self.runner.handle_system_event( subid, event )
else
self.log.warn "Ignoring event %p for which we have no observer." % [ subid ]
end
return true
@@ -103,10 +120,11 @@
def initialize
@observers = []
@timers = []
@handler = nil
@reactor = ZMQ::Loop.new
+ @manager_last_runid = nil
end
######
public
@@ -131,16 +149,15 @@
end
### Run the specified +observers+
def run
- self.handler = Arborist::ObserverRunner::Handler.new( self.reactor )
+ self.handler = Arborist::ObserverRunner::Handler.new( self, self.reactor )
- self.observers.each do |observer|
- self.handler.add_observer( observer )
- self.add_timers_for( observer )
- end
+ self.register_observers
+ self.register_observer_timers
+ self.subscribe_to_system_events
self.reactor.start
rescue Interrupt
$stderr.puts "Interrupted!"
self.stop
@@ -156,10 +173,32 @@
self.reactor.stop
end
+ ### Register each of the runner's Observers with its handler.
+ def register_observers
+ self.observers.each do |observer|
+ self.handler.add_observer( observer )
+ end
+ end
+
+
+ ### Register timers for each Observer.
+ def register_observer_timers
+ self.observers.each do |observer|
+ self.add_timers_for( observer )
+ end
+ end
+
+
+ ### Subscribe the runner to system events published by the Manager.
+ def subscribe_to_system_events
+ self.handler.client.event_api.subscribe( 'sys.' )
+ end
+
+
### Register a timer for the specified +observer+.
def add_timers_for( observer )
observer.timers.each do |interval, callback|
self.log.info "Creating timer for %s observer to run %p every %ds" %
[ observer.description, callback, interval ]
@@ -172,9 +211,31 @@
### Remove any registered timers.
def remove_timers
self.timers.each do |timer|
self.reactor.cancel_timer( timer )
+ end
+ end
+
+
+ ### Handle a `sys.` event from the Manager being observed.
+ def handle_system_event( event_type, event )
+ self.log.debug "Got a %s event from the Manager: %p" % [ event_type, event ]
+
+ case event_type
+ when 'sys.heartbeat'
+ this_runid = event['run_id']
+ if @manager_last_runid && this_runid != @manager_last_runid
+ self.log.warn "Manager run ID changed: re-subscribing"
+ self.handler.reset
+ self.register_observers
+ end
+
+ @manager_last_runid = this_runid
+ when 'sys.node_added', 'sys.node_removed'
+ # no-op
+ else
+ # no-op
end
end
end # class Arborist::ObserverRunner