lib/arborist/manager.rb in arborist-0.0.1.pre20160128152542 vs lib/arborist/manager.rb in arborist-0.0.1.pre20160606141735

- old
+ new

@@ -1,9 +1,10 @@ # -*- ruby -*- #encoding: utf-8 require 'pathname' +require 'tempfile' require 'configurability' require 'loggability' require 'rbczmq' require 'arborist' unless defined?( Arborist ) @@ -24,16 +25,49 @@ ] # The number of seconds to wait between checks for incoming signals SIGNAL_INTERVAL = 0.5 + # Configurability API -- set config defaults + CONFIG_DEFAULTS = { + state_file: nil, + checkpoint_frequency: 30 + } - ## + # Use the Arborist logger log_to :arborist + # Configurability API -- use the 'arborist' section + config_key :arborist + + ## + # The Pathname of the file the manager's node tree state is saved to + singleton_attr_accessor :state_file + + ## + # The number of seconds between automatic state checkpoints + singleton_attr_accessor :checkpoint_frequency + + + ### Configurability API -- configure the manager + def self::configure( config=nil ) + config ||= {} + config = self.defaults.merge( config[:manager] || {} ) + + self.state_file = config[:state_file] && Pathname( config[:state_file] ) + + interval = config[:checkpoint_frequency].to_i + if interval && interval.nonzero? + self.checkpoint_frequency = interval + else + self.checkpoint_frequency = nil + end + end + + # # Instance methods # ### Create a new Arborist::Manager. @@ -52,10 +86,11 @@ Thread.main[:signal_queue] = [] @zmq_loop = nil @api_handler = nil @event_publisher = nil + @checkpoint_timer = nil end ###### public @@ -112,12 +147,15 @@ self.log.debug "Unregistering sockets." @zmq_loop.remove( @tree_sock ) @tree_sock.pollable.close @zmq_loop.remove( @event_sock ) @event_sock.pollable.close + @zmq_loop.cancel_timer( @checkpoint_timer ) if @checkpoint_timer end + self.save_node_states + self.log.debug "Resetting ZMQ context" Arborist.reset_zmq_context end @@ -139,10 +177,13 @@ @event_publisher = Arborist::Manager::EventPublisher.new( @event_sock, self, @zmq_loop ) @event_sock.handler = @event_publisher @zmq_loop.register( @event_sock ) + @checkpoint_timer = self.start_state_checkpointing + @zmq_loop.register_timer( @checkpoint_timer ) if @checkpoint_timer + self.setup_signal_timer self.start_time = Time.now self.log.debug "Manager running." @zmq_loop.start @@ -193,10 +234,76 @@ @zmq_loop.stop if @zmq_loop end # + # :section: Node state saving/reloading + # + + ### Write out the state of all the manager's nodes to the state_file if one is + ### configured. + def save_node_states + path = self.class.state_file or return + self.log.info "Saving current node state to %s" % [ path ] + tmpfile = Tempfile.create( + [path.basename.to_s.sub(path.extname, ''), path.extname], + path.dirname.to_s, + encoding: 'binary' + ) + Marshal.dump( self.nodes, tmpfile ) + tmpfile.close + + File.rename( tmpfile.path, path.to_s ) + + rescue SystemCallError => err + self.log.error "%p while saving node state: %s" % [ err.class, err.message ] + + ensure + File.unlink( tmpfile.path ) if tmpfile && File.exist?( tmpfile.path ) + end + + + ### Attempt to restore the state of loaded node from the configured state file. Returns + ### true if it succeeded, or false if a state file wasn't configured, doesn't + ### exist, isn't readable, or couldn't be unmarshalled. + def restore_node_states + path = self.class.state_file or return false + return false unless path.readable? + + self.log.info "Restoring node state from %s" % [ path ] + nodes = Marshal.load( path.open('r:binary') ) + + nodes.each do |identifier, saved_node| + self.log.debug "Loaded node: %p" % [ identifier ] + if (( current_node = self.nodes[ identifier ] )) + self.log.debug "Restoring state of the %p node." % [ identifier ] + current_node.restore( saved_node ) + else + self.log.info "Not restoring state for the %s node: not present in the loaded tree." % + [ identifier ] + end + end + + return true + end + + + ### Start a timer that will save a snapshot of the node tree's state to the state + ### file on a configured interval if it's configured. + def start_state_checkpointing + return nil unless self.class.state_file + interval = self.class.checkpoint_frequency or return nil + + self.log.info "Setting up node state checkpoint every %ds" % [ interval ] + checkpoint_timer = ZMQ::Timer.new( interval, 0 ) do + self.save_node_states + end + return checkpoint_timer + end + + + # # :section: Signal Handling # These methods set up some behavior for starting, restarting, and stopping # your application when a signal is received. If you don't want signals to # be handled, override #set_signal_handlers with an empty method. # @@ -290,10 +397,11 @@ ### Handle a USR1 signal. Writes a message to the log. def on_user1_signal( signo ) self.log.info "Checkpoint: User signal." + self.save_node_states end # # :section: Tree API @@ -310,21 +418,31 @@ ### Build the tree out of all the loaded nodes. def build_tree self.log.info "Building tree from %d loaded nodes." % [ self.nodes.length ] - self.nodes.each do |identifier, node| + + # Build primary tree structure + self.nodes.each_value do |node| next if node.operational? self.link_node_to_parent( node ) end self.tree_built = true + + # Set up secondary dependencies + self.nodes.each_value do |node| + node.register_secondary_dependencies( self ) + end + + self.restore_node_states end ### Link the specified +node+ to its parent. Raises an error if the specified +node+'s ### parent is not yet loaded. def link_node_to_parent( node ) + self.log.debug "Linking node %p to its parent" % [ node ] parent_id = node.parent || '_' parent_node = self.nodes[ parent_id ] or raise "no parent '%s' node loaded for %p" % [ parent_id, node ] self.log.debug "adding %p as a child of %p" % [ node, parent_node ] @@ -334,29 +452,41 @@ ### Add the specified +node+ to the Manager. def add_node( node ) identifier = node.identifier - unless self.nodes[identifier].equal?( node ) + unless self.nodes[ identifier ].equal?( node ) self.remove_node( self.nodes[identifier] ) self.nodes[ identifier ] = node end - self.log.debug "Linking node %p to its parent" % [ node ] - self.link_node_to_parent( node ) if self.tree_built? + if self.tree_built? + self.link_node( node ) + node.handle_event( Arborist::Event.create(:sys_node_added, node) ) + end end + ### Link the node to other nodes in the tree. + def link_node( node ) + raise "Tree is not built yet" unless self.tree_built? + + self.link_node_to_parent( node ) + node.register_secondary_dependencies( self ) + end + + ### Remove a +node+ from the Manager. The +node+ can either be the Arborist::Node to ### remove, or the identifier of a node. def remove_node( node ) node = self.nodes[ node ] unless node.is_a?( Arborist::Node ) return unless node raise "Can't remove an operational node" if node.operational? self.log.info "Removing node %p" % [ node ] + node.handle_event( Arborist::Event.create(:sys_node_removed, node) ) node.children.each do |identifier, child_node| self.remove_node( child_node ) end if parent_node = self.nodes[ node.parent || '_' ] @@ -382,19 +512,20 @@ ### Traverse the node tree and fetch the specified +return_values+ from any nodes which ### match the given +filter+, skipping downed nodes and all their children ### unless +include_down+ is set. If +return_values+ is set to +nil+, then all ### values from the node will be returned. - def fetch_matching_node_states( filter, return_values, include_down=false ) + def fetch_matching_node_states( filter, return_values, include_down=false, negative_filter={} ) nodes_iter = if include_down self.all_nodes else self.reachable_nodes end states = nodes_iter. select {|node| node.matches?(filter) }. + reject {|node| !negative_filter.empty? && node.matches?(negative_filter) }. each_with_object( {} ) do |node, hash| hash[ node.identifier ] = node.fetch_values( return_values ) end return states @@ -435,18 +566,18 @@ ### Yield each node that is not down to the specified +block+, or return ### an Enumerator if no block is given. def reachable_nodes( &block ) iter = self.enumerator_for( self.root ) do |node| - !(node.down? || node.disabled?) + !(node.down? || node.disabled? || node.quieted?) end return iter.each( &block ) if block return iter end - ### Return an enumerator for the specified +node+. + ### Return an enumerator for the specified +start_node+. def enumerator_for( start_node, &filter ) return Enumerator.new do |yielder| traverse = ->( node ) do if !filter || filter.call( node ) yielder.yield( node ) @@ -456,29 +587,69 @@ traverse.call( start_node ) end end + ### Return a +depth+ limited enumerator for the specified +start_node+. + def depth_limited_enumerator_for( start_node, depth, &filter ) + return Enumerator.new do |yielder| + traverse = ->( node, current_depth ) do + self.log.debug "Enumerating nodes from %s at depth: %p" % + [ node.identifier, current_depth ] + if !filter || filter.call( node ) + yielder.yield( node ) + node.each do |child| + traverse[ child, current_depth - 1 ] + end if current_depth > 0 + end + end + traverse.call( start_node, depth ) + end + end + + + ### Return an Array of all nodes below the specified +node+. + def descendants_for( node ) + return self.enumerator_for( node ).to_a + end + + + ### Return the Array of all nodes above the specified +node+. + def ancestors_for( node ) + parent_id = node.parent or return [] + parent = self.nodes[ parent_id ] + return [ parent ] + self.ancestors_for( parent ) + end + + # # Event API # - ### Create a subscription for the node with the specified +identifier+ and - ### +event_pattern+, using the given +criteria+ when considering an event. - def create_subscription( identifier, event_pattern, criteria ) + ### Add the specified +subscription+ to the node corresponding with the given +identifier+. + def subscribe( identifier, subscription ) identifier ||= '_' - node = self.nodes[ identifier ] or raise ArgumentError, "no such node %p" % [ identifier ] - sub = Arborist::Subscription.new( self.event_publisher, event_pattern, criteria ) - self.log.debug "Registering subscription %p" % [ sub ] - node.add_subscription( sub ) - self.log.debug " adding '%s' to the subscriptions hash." % [ sub.id ] - self.subscriptions[ sub.id ] = node + self.log.debug "Registering subscription %p" % [ subscription ] + node.add_subscription( subscription ) + self.log.debug " adding '%s' to the subscriptions hash." % [ subscription.id ] + self.subscriptions[ subscription.id ] = node self.log.debug " subscriptions hash: %#0x" % [ self.subscriptions.object_id ] + end + + ### Create a subscription that publishes to the Manager's event publisher for + ### the node with the specified +identifier+ and +event_pattern+, using the + ### given +criteria+ when considering an event. + def create_subscription( identifier, event_pattern, criteria ) + sub = Arborist::Subscription.new( event_pattern, criteria ) do |*args| + self.event_publisher.publish( *args ) + end + self.subscribe( identifier, sub ) + return sub end ### Remove the subscription with the specified +subscription_identifier+ from the node @@ -490,10 +661,9 @@ ### Propagate one or more +events+ to the specified +node+ and its ancestors in the tree, ### publishing them to matching subscriptions belonging to the nodes along the way. def propagate_events( node, *events ) - self.log.info "Propagating %d events to node %s" % [ events.length, node.identifier ] node.publish_events( *events ) if node.parent parent = self.nodes[ node.parent ] or raise "couldn't find parent %p of node %p!" % [ node.parent, node.identifier ]