lib/arborist/manager.rb in arborist-0.0.1.pre20160128152542 vs lib/arborist/manager.rb in arborist-0.0.1.pre20160606141735
- old
+ new
@@ -1,9 +1,10 @@
# -*- ruby -*-
#encoding: utf-8
require 'pathname'
+require 'tempfile'
require 'configurability'
require 'loggability'
require 'rbczmq'
require 'arborist' unless defined?( Arborist )
@@ -24,16 +25,49 @@
]
# The number of seconds to wait between checks for incoming signals
SIGNAL_INTERVAL = 0.5
+ # Configurability API -- set config defaults
+ CONFIG_DEFAULTS = {
+ state_file: nil,
+ checkpoint_frequency: 30
+ }
- ##
+
# Use the Arborist logger
log_to :arborist
+ # Configurability API -- use the 'arborist' section
+ config_key :arborist
+
+ ##
+ # The Pathname of the file the manager's node tree state is saved to
+ singleton_attr_accessor :state_file
+
+ ##
+ # The number of seconds between automatic state checkpoints
+ singleton_attr_accessor :checkpoint_frequency
+
+
+ ### Configurability API -- configure the manager
+ def self::configure( config=nil )
+ config ||= {}
+ config = self.defaults.merge( config[:manager] || {} )
+
+ self.state_file = config[:state_file] && Pathname( config[:state_file] )
+
+ interval = config[:checkpoint_frequency].to_i
+ if interval && interval.nonzero?
+ self.checkpoint_frequency = interval
+ else
+ self.checkpoint_frequency = nil
+ end
+ end
+
+
#
# Instance methods
#
### Create a new Arborist::Manager.
@@ -52,10 +86,11 @@
Thread.main[:signal_queue] = []
@zmq_loop = nil
@api_handler = nil
@event_publisher = nil
+ @checkpoint_timer = nil
end
######
public
@@ -112,12 +147,15 @@
self.log.debug "Unregistering sockets."
@zmq_loop.remove( @tree_sock )
@tree_sock.pollable.close
@zmq_loop.remove( @event_sock )
@event_sock.pollable.close
+ @zmq_loop.cancel_timer( @checkpoint_timer ) if @checkpoint_timer
end
+ self.save_node_states
+
self.log.debug "Resetting ZMQ context"
Arborist.reset_zmq_context
end
@@ -139,10 +177,13 @@
@event_publisher = Arborist::Manager::EventPublisher.new( @event_sock, self, @zmq_loop )
@event_sock.handler = @event_publisher
@zmq_loop.register( @event_sock )
+ @checkpoint_timer = self.start_state_checkpointing
+ @zmq_loop.register_timer( @checkpoint_timer ) if @checkpoint_timer
+
self.setup_signal_timer
self.start_time = Time.now
self.log.debug "Manager running."
@zmq_loop.start
@@ -193,10 +234,76 @@
@zmq_loop.stop if @zmq_loop
end
#
+ # :section: Node state saving/reloading
+ #
+
+ ### Write out the state of all the manager's nodes to the state_file if one is
+ ### configured.
+ def save_node_states
+ path = self.class.state_file or return
+ self.log.info "Saving current node state to %s" % [ path ]
+ tmpfile = Tempfile.create(
+ [path.basename.to_s.sub(path.extname, ''), path.extname],
+ path.dirname.to_s,
+ encoding: 'binary'
+ )
+ Marshal.dump( self.nodes, tmpfile )
+ tmpfile.close
+
+ File.rename( tmpfile.path, path.to_s )
+
+ rescue SystemCallError => err
+ self.log.error "%p while saving node state: %s" % [ err.class, err.message ]
+
+ ensure
+ File.unlink( tmpfile.path ) if tmpfile && File.exist?( tmpfile.path )
+ end
+
+
+ ### Attempt to restore the state of loaded node from the configured state file. Returns
+ ### true if it succeeded, or false if a state file wasn't configured, doesn't
+ ### exist, isn't readable, or couldn't be unmarshalled.
+ def restore_node_states
+ path = self.class.state_file or return false
+ return false unless path.readable?
+
+ self.log.info "Restoring node state from %s" % [ path ]
+ nodes = Marshal.load( path.open('r:binary') )
+
+ nodes.each do |identifier, saved_node|
+ self.log.debug "Loaded node: %p" % [ identifier ]
+ if (( current_node = self.nodes[ identifier ] ))
+ self.log.debug "Restoring state of the %p node." % [ identifier ]
+ current_node.restore( saved_node )
+ else
+ self.log.info "Not restoring state for the %s node: not present in the loaded tree." %
+ [ identifier ]
+ end
+ end
+
+ return true
+ end
+
+
+ ### Start a timer that will save a snapshot of the node tree's state to the state
+ ### file on a configured interval if it's configured.
+ def start_state_checkpointing
+ return nil unless self.class.state_file
+ interval = self.class.checkpoint_frequency or return nil
+
+ self.log.info "Setting up node state checkpoint every %ds" % [ interval ]
+ checkpoint_timer = ZMQ::Timer.new( interval, 0 ) do
+ self.save_node_states
+ end
+ return checkpoint_timer
+ end
+
+
+ #
# :section: Signal Handling
# These methods set up some behavior for starting, restarting, and stopping
# your application when a signal is received. If you don't want signals to
# be handled, override #set_signal_handlers with an empty method.
#
@@ -290,10 +397,11 @@
### Handle a USR1 signal. Writes a message to the log.
def on_user1_signal( signo )
self.log.info "Checkpoint: User signal."
+ self.save_node_states
end
#
# :section: Tree API
@@ -310,21 +418,31 @@
### Build the tree out of all the loaded nodes.
def build_tree
self.log.info "Building tree from %d loaded nodes." % [ self.nodes.length ]
- self.nodes.each do |identifier, node|
+
+ # Build primary tree structure
+ self.nodes.each_value do |node|
next if node.operational?
self.link_node_to_parent( node )
end
self.tree_built = true
+
+ # Set up secondary dependencies
+ self.nodes.each_value do |node|
+ node.register_secondary_dependencies( self )
+ end
+
+ self.restore_node_states
end
### Link the specified +node+ to its parent. Raises an error if the specified +node+'s
### parent is not yet loaded.
def link_node_to_parent( node )
+ self.log.debug "Linking node %p to its parent" % [ node ]
parent_id = node.parent || '_'
parent_node = self.nodes[ parent_id ] or
raise "no parent '%s' node loaded for %p" % [ parent_id, node ]
self.log.debug "adding %p as a child of %p" % [ node, parent_node ]
@@ -334,29 +452,41 @@
### Add the specified +node+ to the Manager.
def add_node( node )
identifier = node.identifier
- unless self.nodes[identifier].equal?( node )
+ unless self.nodes[ identifier ].equal?( node )
self.remove_node( self.nodes[identifier] )
self.nodes[ identifier ] = node
end
- self.log.debug "Linking node %p to its parent" % [ node ]
- self.link_node_to_parent( node ) if self.tree_built?
+ if self.tree_built?
+ self.link_node( node )
+ node.handle_event( Arborist::Event.create(:sys_node_added, node) )
+ end
end
+ ### Link the node to other nodes in the tree.
+ def link_node( node )
+ raise "Tree is not built yet" unless self.tree_built?
+
+ self.link_node_to_parent( node )
+ node.register_secondary_dependencies( self )
+ end
+
+
### Remove a +node+ from the Manager. The +node+ can either be the Arborist::Node to
### remove, or the identifier of a node.
def remove_node( node )
node = self.nodes[ node ] unless node.is_a?( Arborist::Node )
return unless node
raise "Can't remove an operational node" if node.operational?
self.log.info "Removing node %p" % [ node ]
+ node.handle_event( Arborist::Event.create(:sys_node_removed, node) )
node.children.each do |identifier, child_node|
self.remove_node( child_node )
end
if parent_node = self.nodes[ node.parent || '_' ]
@@ -382,19 +512,20 @@
### Traverse the node tree and fetch the specified +return_values+ from any nodes which
### match the given +filter+, skipping downed nodes and all their children
### unless +include_down+ is set. If +return_values+ is set to +nil+, then all
### values from the node will be returned.
- def fetch_matching_node_states( filter, return_values, include_down=false )
+ def fetch_matching_node_states( filter, return_values, include_down=false, negative_filter={} )
nodes_iter = if include_down
self.all_nodes
else
self.reachable_nodes
end
states = nodes_iter.
select {|node| node.matches?(filter) }.
+ reject {|node| !negative_filter.empty? && node.matches?(negative_filter) }.
each_with_object( {} ) do |node, hash|
hash[ node.identifier ] = node.fetch_values( return_values )
end
return states
@@ -435,18 +566,18 @@
### Yield each node that is not down to the specified +block+, or return
### an Enumerator if no block is given.
def reachable_nodes( &block )
iter = self.enumerator_for( self.root ) do |node|
- !(node.down? || node.disabled?)
+ !(node.down? || node.disabled? || node.quieted?)
end
return iter.each( &block ) if block
return iter
end
- ### Return an enumerator for the specified +node+.
+ ### Return an enumerator for the specified +start_node+.
def enumerator_for( start_node, &filter )
return Enumerator.new do |yielder|
traverse = ->( node ) do
if !filter || filter.call( node )
yielder.yield( node )
@@ -456,29 +587,69 @@
traverse.call( start_node )
end
end
+ ### Return a +depth+ limited enumerator for the specified +start_node+.
+ def depth_limited_enumerator_for( start_node, depth, &filter )
+ return Enumerator.new do |yielder|
+ traverse = ->( node, current_depth ) do
+ self.log.debug "Enumerating nodes from %s at depth: %p" %
+ [ node.identifier, current_depth ]
+ if !filter || filter.call( node )
+ yielder.yield( node )
+ node.each do |child|
+ traverse[ child, current_depth - 1 ]
+ end if current_depth > 0
+ end
+ end
+ traverse.call( start_node, depth )
+ end
+ end
+
+
+ ### Return an Array of all nodes below the specified +node+.
+ def descendants_for( node )
+ return self.enumerator_for( node ).to_a
+ end
+
+
+ ### Return the Array of all nodes above the specified +node+.
+ def ancestors_for( node )
+ parent_id = node.parent or return []
+ parent = self.nodes[ parent_id ]
+ return [ parent ] + self.ancestors_for( parent )
+ end
+
+
#
# Event API
#
- ### Create a subscription for the node with the specified +identifier+ and
- ### +event_pattern+, using the given +criteria+ when considering an event.
- def create_subscription( identifier, event_pattern, criteria )
+ ### Add the specified +subscription+ to the node corresponding with the given +identifier+.
+ def subscribe( identifier, subscription )
identifier ||= '_'
-
node = self.nodes[ identifier ] or raise ArgumentError, "no such node %p" % [ identifier ]
- sub = Arborist::Subscription.new( self.event_publisher, event_pattern, criteria )
- self.log.debug "Registering subscription %p" % [ sub ]
- node.add_subscription( sub )
- self.log.debug " adding '%s' to the subscriptions hash." % [ sub.id ]
- self.subscriptions[ sub.id ] = node
+ self.log.debug "Registering subscription %p" % [ subscription ]
+ node.add_subscription( subscription )
+ self.log.debug " adding '%s' to the subscriptions hash." % [ subscription.id ]
+ self.subscriptions[ subscription.id ] = node
self.log.debug " subscriptions hash: %#0x" % [ self.subscriptions.object_id ]
+ end
+
+ ### Create a subscription that publishes to the Manager's event publisher for
+ ### the node with the specified +identifier+ and +event_pattern+, using the
+ ### given +criteria+ when considering an event.
+ def create_subscription( identifier, event_pattern, criteria )
+ sub = Arborist::Subscription.new( event_pattern, criteria ) do |*args|
+ self.event_publisher.publish( *args )
+ end
+ self.subscribe( identifier, sub )
+
return sub
end
### Remove the subscription with the specified +subscription_identifier+ from the node
@@ -490,10 +661,9 @@
### Propagate one or more +events+ to the specified +node+ and its ancestors in the tree,
### publishing them to matching subscriptions belonging to the nodes along the way.
def propagate_events( node, *events )
- self.log.info "Propagating %d events to node %s" % [ events.length, node.identifier ]
node.publish_events( *events )
if node.parent
parent = self.nodes[ node.parent ] or raise "couldn't find parent %p of node %p!" %
[ node.parent, node.identifier ]