lib/arborist/node.rb in arborist-0.2.0.pre20170519125456 vs lib/arborist/node.rb in arborist-0.2.0

- old
+ new

@@ -40,10 +40,11 @@ tags parent description dependencies status_changed + status_last_changed last_contacted ack errors quieted_reasons config @@ -114,32 +115,50 @@ state_machine( :status, initial: :unknown ) do state :unknown, :up, :down, + :warn, :acked, :disabled, :quieted event :update do - transition [:up, :unknown] => :disabled, if: :ack_set? - transition [:down, :unknown, :acked] => :up, unless: :has_errors? - transition [:up, :unknown, :acked] => :down, if: :has_errors? - transition :down => :acked, if: :ack_set? - transition :disabled => :unknown, unless: :ack_set? + transition [:down, :warn, :unknown, :acked] => :up, unless: :has_errors_or_warnings? + transition [:up, :warn, :unknown] => :down, if: :has_errors? + transition [:up, :down, :unknown] => :warn, if: :has_only_warnings? end + event :acknowledge do + transition any - [:down, :acked] => :disabled + transition [:down, :acked] => :acked + end + + event :unacknowledge do + transition [:acked, :disabled] => :warn, if: :has_warnings? + transition [:acked, :disabled] => :down, if: :has_errors? + transition [:acked, :disabled] => :unknown + end + event :handle_event do transition any - [:disabled, :quieted, :acked] => :quieted, if: :has_quieted_reason? transition :quieted => :unknown, unless: :has_quieted_reason? end + event :reparent do + transition any - [:disabled, :quieted, :acked] => :unknown + transition :quieted => :unknown, unless: :has_quieted_reason? + end + + before_transition [:acked, :disabled] => any, do: :save_previous_ack + after_transition any => :acked, do: :on_ack after_transition :acked => :up, do: :on_ack_cleared after_transition :down => :up, do: :on_node_up - after_transition [:unknown, :up] => :down, do: :on_node_down - after_transition [:unknown, :up] => :disabled, do: :on_node_disabled + after_transition :up => :warn, do: :on_node_warn + after_transition [:unknown, :warn, :up] => :down, do: :on_node_down + after_transition [:unknown, :warn, :up] => :disabled, do: :on_node_disabled after_transition any => :quieted, do: :on_node_quieted after_transition :disabled => :unknown, do: :on_node_enabled after_transition :quieted => :unknown, do: :on_node_unquieted after_transition any => any, do: :log_transition @@ -178,11 +197,11 @@ ### Record a new loaded instance if the Thread-local variable is set up to track ### them. def self::add_loaded_instance( new_instance ) instances = Thread.current[ LOADED_INSTANCE_KEY ] or return - self.log.debug "Adding new instance %p to node tree" % [ new_instance ] + # self.log.debug "Adding new instance %p to node tree" % [ new_instance ] instances << new_instance end ### Inheritance hook -- add a DSL declarative function for the given +subclass+. @@ -193,18 +212,20 @@ Arborist.add_dsl_constructor( subclass, &body ) end ### Get/set the node type instances of the class live under. If no parent_type is set, it - ### is a top-level node type. - def self::parent_types( *types ) + ### is a top-level node type. If a +block+ is given, it can be used to pre-process the + ### arguments into the (identifier, attributes, block) arguments used to create + ### the node instances. + def self::parent_types( *types, &block ) @parent_types ||= [] types.each do |new_type| subclass = Arborist::Node.get_subclass( new_type ) @parent_types << subclass - subclass.add_subnode_factory_method( self ) + subclass.add_subnode_factory_method( self, &block ) end return @parent_types end singleton_method_alias :parent_type, :parent_types @@ -216,15 +237,27 @@ end ### Add a factory method that can be used to create subnodes of the specified +subnode_type+ ### on instances of the receiving class. - def self::add_subnode_factory_method( subnode_type ) + def self::add_subnode_factory_method( subnode_type, &dsl_block ) if subnode_type.name name = subnode_type.plugin_name - body = lambda do |identifier, attributes={}, &block| - return Arborist::Node.create( name, identifier, self, attributes, &block ) + # self.log.debug "Adding factory constructor for %s nodes to %p" % [ name, self ] + body = lambda do |*args, &constructor_block| + if dsl_block + # self.log.debug "Using DSL block to split args: %p" % [ dsl_block ] + identifier, attributes = dsl_block.call( *args ) + else + # self.log.debug "Splitting args the default way: %p" % [ args ] + identifier, attributes = *args + end + attributes ||= {} + # self.log.debug "Identifier: %p, attributes: %p, self: %p" % + # [ identifier, attributes, self ] + + return Arborist::Node.create( name, identifier, self, attributes, &constructor_block ) end define_method( name, &body ) else self.log.info "Skipping factory constructor for anonymous subnode class." @@ -277,25 +310,27 @@ @dependencies = Arborist::Dependency.new( :all ) # Primary state @status = 'unknown' @status_changed = Time.at( 0 ) + @status_last_changed = Time.at( 0 ) # Attributes that govern state @errors = {} + @warnings = {} @ack = nil + @previous_ack = nil @last_contacted = Time.at( 0 ) @quieted_reasons = {} # Event-handling @update_delta = Hash.new do |h,k| h[ k ] = Hash.new( &h.default_proc ) end - @pending_update_events = [] + @pending_change_events = [] @subscriptions = {} - self.log.debug "Setting node attributes to: %p" % [ attributes ] self.modify( attributes ) self.instance_eval( &block ) if block end @@ -326,36 +361,49 @@ ## # The Time the node's status last changed. attr_accessor :status_changed ## + # The previous Time the node's status changed, for duration + # calculations between states. + attr_accessor :status_last_changed + + ## # The Hash of last errors encountered by a monitor attempting to update this # node, keyed by the monitor's `key`. attr_accessor :errors ## + # The Hash of last warnings encountered by a monitor attempting to update this + # node, keyed by the monitor's `key`. + attr_accessor :warnings + + ## # The acknowledgement currently in effect. Should be an instance of Arborist::Node::ACK attr_accessor :ack ## + # The acknowledgement previously in effect (if any). + attr_accessor :previous_ack + + ## # The Hash of changes tracked during an #update. attr_reader :update_delta ## # The Array of events generated by the current update event - attr_reader :pending_update_events + attr_reader :pending_change_events ## # The Hash of Subscription objects observing this node and its children, keyed by # subscription ID. attr_reader :subscriptions ## # The node's secondary dependencies, expressed as an Arborist::Node::Sexp attr_accessor :dependencies - ## # The reasons this node was quieted. This is a Hash of text descriptions keyed by the # type of dependency it came from (either :primary or :secondary). attr_reader :quieted_reasons @@ -366,17 +414,18 @@ end ### Set one or more node +attributes+. This should be overridden by subclasses which ### wish to allow their operational attributes to be set/updated via the Tree API - ### (+modify+ and +graft+). Supported attributes are: +parent+, +description+, and - ### +tags+. + ### (+modify+ and +graft+). Supported attributes are: +parent+, +description+, + ### +tags+, and +config+. def modify( attributes ) attributes = stringify_keys( attributes ) self.parent( attributes['parent'] ) self.description( attributes['description'] ) + self.config( attributes['config'] ) if attributes['tags'] @tags.clear self.tags( attributes['tags'] ) end @@ -445,11 +494,11 @@ ### Get or set the node's configuration hash. This can be used to pass per-node ### information to systems using the tree (e.g., monitors, subscribers). def config( new_config=nil ) - @config = stringify_keys( new_config ) if new_config + @config.merge!( stringify_keys( new_config ) ) if new_config return @config end # @@ -481,49 +530,112 @@ def find_matching_subscriptions( event ) return self.subscriptions.values.find_all {|sub| event =~ sub } end + ### Return the Set of identifier of nodes that are secondary dependencies of this node. + def node_subscribers + self.log.debug "Finding node subscribers among %d subscriptions" % [ self.subscriptions.length ] + return self.subscriptions.each_with_object( Set.new ) do |(identifier, sub), set| + if sub.respond_to?( :node_identifier ) + set.add( sub.node_identifier ) + else + self.log.debug "Skipping %p: not a node subscription" % [ sub ] + end + end + end + + ### Update specified +properties+ for the node. - def update( new_properties ) + def update( new_properties, monitor_key='_' ) + self.last_contacted = Time.now + self.update_properties( new_properties, monitor_key ) + + # Super to the state machine event method + super + + events = self.pending_change_events.clone + events << self.make_update_event + events << self.make_delta_event unless self.update_delta.empty? + + results = self.broadcast_events( *events ) + self.log.debug ">>> Results from broadcast: %p" % [ results ] + events.concat( results ) + + return events + ensure + self.clear_transition_temp_vars + end + + + ### Update the node's properties with those in +new_properties+ (a String-keyed Hash) + def update_properties( new_properties, monitor_key ) + monitor_key ||= '_' new_properties = stringify_keys( new_properties ) - monitor_key = new_properties[ '_monitor_key' ] || '_' self.log.debug "Updated via a %s monitor: %p" % [ monitor_key, new_properties ] - self.last_contacted = Time.now + self.update_errors( monitor_key, new_properties.delete('error') ) + self.update_warnings( monitor_key, new_properties.delete('warning') ) - if new_properties.key?( 'ack' ) - self.ack = new_properties.delete( 'ack' ) - elsif new_properties['error'] - self.errors[ monitor_key ] = new_properties.delete( 'error' ) + self.properties.merge!( new_properties, &self.method(:merge_and_record_delta) ) + compact_hash( self.properties ) + end + + + ### Update the errors hash for the specified +monitor_key+ to +value+. + def update_errors( monitor_key, value=nil ) + if value + self.errors[ monitor_key ] = value else self.errors.delete( monitor_key ) end + end - self.properties.merge!( new_properties, &self.method(:merge_and_record_delta) ) - compact_hash( self.properties ) - # Super to the state machine event method - super - - return self.collect_events + ### Update the warnings hash for the specified +monitor_key+ to +value+. + def update_warnings( monitor_key, value=nil ) + if value + self.warnings[ monitor_key ] = value + else + self.warnings.delete( monitor_key ) + end end - ### Collect the events generated by updates and return them after broadcasting - ### them to any child nodes. - def collect_events - events = self.pending_update_events.clone - events << self.make_update_event + ### Acknowledge any current or future abnormal status for this node. + def acknowledge( **args ) + super() + + self.ack = args + + events = self.pending_change_events.clone events << self.make_delta_event unless self.update_delta.empty? + results = self.broadcast_events( *events ) + self.log.debug ">>> Results from broadcast: %p" % [ results ] + events.concat( results ) - self.broadcast_events( *events ) + return events + ensure + self.clear_transition_temp_vars + end + + ### Clear any current acknowledgement. + def unacknowledge + super() + + self.ack = nil + + events = self.pending_change_events.clone + events << self.make_delta_event unless self.update_delta.empty? + results = self.broadcast_events( *events ) + self.log.debug ">>> Results from broadcast: %p" % [ results ] + events.concat( results ) + return events ensure - self.update_delta.clear - self.pending_update_events.clear + self.clear_transition_temp_vars end ### Merge the specified +new_properties+ into the node's properties, recording ### each change in the node's #update_delta. @@ -552,10 +664,18 @@ return newval end end + ### Clear out the state used during a transition to track changes. + def clear_transition_temp_vars + self.previous_ack = nil + self.update_delta.clear + self.pending_change_events.clear + end + + ### Return the node's state in an Arborist::Event of type 'node.update'. def make_update_event return Arborist::Event.create( 'node_update', self ) end @@ -591,21 +711,22 @@ end ### Returns +true+ if the node matches the specified +key+ and +val+ criteria. def match_criteria?( key, val ) + array_val = Array( val ) return case key when 'status' - self.status == val + array_val.include?( self.status ) when 'type' - self.log.debug "Checking node type %p against %p" % [ self.type, val ] - self.type == val + array_val.include?( self.type ) when 'parent' - self.parent == val + array_val.include?( self.parent ) when 'tag' then @tags.include?( val.to_s ) - when 'tags' then Array(val).all? {|tag| @tags.include?(tag) } - when 'identifier' then @identifier == val + when 'tags' then array_val.all? {|tag| @tags.include?(tag) } + when 'identifier' + array_val.include?( self.identifier ) when 'config' val.all? {|ikey, ival| hash_matches(@config, ikey, ival) } else hash_matches( @properties, key, val ) end @@ -648,13 +769,11 @@ raise Arborist::ConfigError, "Can't depend on ancestor node %p." % [ identifier ] elsif manager.descendants_for( self ).any? {|node| node.identifier == identifier } raise Arborist::ConfigError, "Can't depend on descendant node %p." % [ identifier ] end - sub = Arborist::Subscription.new do |_, event| - self.handle_event( event ) - end + sub = Arborist::NodeSubscription.new( self ) manager.subscribe( identifier, sub ) end end @@ -668,16 +787,18 @@ ### Send an event to this node's immediate children. def broadcast_events( *events ) events.flatten! - self.children.each do |identifier, child| + results = self.children.flat_map do |identifier, child| self.log.debug "Broadcasting %d events to %p" % [ events.length, identifier ] - events.each do |event| + events.flat_map do |event| child.handle_event( event ) end end + + return results end ### Handle the specified +event+, delivered either via broadcast or secondary ### dependency subscription. @@ -690,17 +811,40 @@ self.method( handler_name ).call( event ) else self.log.debug "No handler for a %s event!" % [ event.type ] end + self.log.debug ">>> Pending change events before: %p" % [ self.pending_change_events ] + super # to state-machine - self.publish_events( event, *self.pending_update_events ) - self.collect_events + results = self.pending_change_events.clone + self.log.debug ">>> Pending change events after: %p" % [ results ] + results << self.make_delta_event unless self.update_delta.empty? + + child_results = self.broadcast_events( *results ) + results.concat( child_results ) + + self.publish_events( *results ) + + return results + ensure + self.clear_transition_temp_vars end + ### Move a node from +old_parent+ to +new_parent+. + def reparent( old_parent, new_parent ) + old_parent.remove_child( self ) + self.parent( new_parent.identifier ) + new_parent.add_child( self ) + + self.quieted_reasons.delete( :primary ) + super + end + + ### Returns +true+ if this node's dependencies are not met. def dependencies_down? return self.dependencies.down? end alias_method :has_downed_dependencies?, :dependencies_down? @@ -767,11 +911,11 @@ end ### Handle a 'node.up' event received via broadcast. def handle_node_up_event( event ) - self.log.debug "Got a node.up event: %p" % [ event ] + self.log.debug "Got a node.%s event: %p" % [ event.type, event ] self.dependencies.mark_up( event.node.identifier ) self.quieted_reasons.delete( :secondary ) if self.dependencies_up? if event.node.identifier == self.parent @@ -780,10 +924,11 @@ self.parent ] self.quieted_reasons.delete( :primary ) end end + alias_method :handle_node_warn_event, :handle_node_up_event # # :section: Hierarchy API @@ -808,10 +953,11 @@ ### Returns +true+ if the node's status indicates it shouldn't be ### included by default when traversing nodes. def unreachable? + self.log.debug "Testing for reachability; status is: %p" % [ self.status ] return UNREACHABLE_STATES.include?( self.status ) end ### Returns +true+ if the node's status indicates it is included by @@ -858,21 +1004,23 @@ ### Return a string describing the node's status. def status_description case self.status - when 'up', 'down' + when 'up', 'down', 'warn' return "%s as of %s" % [ self.status.upcase, self.last_contacted ] when 'acked' return "ACKed %s" % [ self.acked_description ] when 'disabled' return "disabled %s" % [ self.acked_description ] when 'quieted' reasons = self.quieted_reasons.values.join( ',' ) return "quieted: %s" % [ reasons ] + when 'unknown' + return "in an 'unknown' state" else - return "in an unknown state" + return "in an unhandled state" end end ### Return a string describing node details; returns +nil+ for the base class. Subclasses @@ -900,29 +1048,35 @@ # # :section: Serialization API # - ### Restore any saved state from the +old_node+ loaded from the state file. + ### Restore any saved state from the +old_node+ loaded from the state file. This is + ### used to overlay selective bits of the saved node tree to the equivalent nodes loaded + ### from node definitions. def restore( old_node ) @status = old_node.status @properties = old_node.properties.dup @ack = old_node.ack.dup if old_node.ack @last_contacted = old_node.last_contacted @status_changed = old_node.status_changed @errors = old_node.errors + @warnings = old_node.warnings @quieted_reasons = old_node.quieted_reasons + @status_last_changed = old_node.status_last_changed # Only merge in downed dependencies. old_node.dependencies.each_downed do |identifier, time| @dependencies.mark_down( identifier, time ) end end - ### Return a Hash of the node's state. - def to_h( deep: false ) + ### Return a Hash of the node's state. If +depth+ is greater than 0, that many + ### levels of child nodes are included in the node's `:children` value. Setting + ### +depth+ to a negative number will return all of the node's children. + def to_h( depth: 0 ) hash = { identifier: self.identifier, type: self.class.name.to_s.sub( /.+::/, '' ).downcase, parent: self.parent, description: self.description, @@ -931,19 +1085,24 @@ status: self.status, properties: self.properties.dup, ack: self.ack ? self.ack.to_h : nil, last_contacted: self.last_contacted ? self.last_contacted.iso8601 : nil, status_changed: self.status_changed ? self.status_changed.iso8601 : nil, + status_last_changed: self.status_last_changed ? self.status_last_changed.iso8601 : nil, errors: self.errors, + warnings: self.warnings, dependencies: self.dependencies.to_h, quieted_reasons: self.quieted_reasons, } - if deep + if depth.nonzero? + # self.log.debug "including children for depth %p" % [ depth ] hash[ :children ] = self.children.each_with_object( {} ) do |(ident, node), h| - h[ ident ] = node.to_h( deep ) + h[ ident ] = node.to_h( depth: depth - 1 ) end + else + hash[ :children ] = {} end return hash end @@ -967,24 +1126,26 @@ @config = hash[:config] @children = {} @status = hash[:status] @status_changed = Time.parse( hash[:status_changed] ) + @status_last_changed = Time.parse( hash[:status_last_changed] ) @ack = Arborist::Node::Ack.from_hash( hash[:ack] ) if hash[:ack] @errors = hash[:errors] + @warnings = hash[:warnings] @properties = hash[:properties] || {} @last_contacted = Time.parse( hash[:last_contacted] ) @quieted_reasons = hash[:quieted_reasons] || {} self.log.debug "Deps are: %p" % [ hash[:dependencies] ] @dependencies = hash[:dependencies] @update_delta = Hash.new do |h,k| h[ k ] = Hash.new( &h.default_proc ) end - @pending_update_events = [] + @pending_change_events = [] @subscriptions = {} end @@ -1010,40 +1171,99 @@ @ack = Arborist::Node::Ack.from_hash( ack_data ) else self.log.info "Node %s ACK cleared explicitly" % [ self.identifier ] @ack = nil end + + self.add_previous_ack_to_update_delta end + ### Save off the current acknowledgement so it can be used after transitions + ### which unset it. + def save_previous_ack + self.log.debug "Saving previous ack: %p" % [ self.ack ] + self.previous_ack = self.ack + end + + + ### Add the previous and current acknowledgement to the delta if either of them + ### are set. + def add_previous_ack_to_update_delta + unless self.ack == self.previous_ack + self.log.debug "Adding previous ack to the update delta: %p" % [ self.previous_ack ] + self.update_delta[ 'ack' ] = [ self.previous_ack&.to_h, self.ack&.to_h ] + end + end + + ### State machine guard predicate -- Returns +true+ if the node has an ACK status set. def ack_set? self.log.debug "Checking to see if this node has been ACKed (it %s)" % [ @ack ? "has" : "has not" ] return @ack ? true : false end - ### State machine guard predicate -- Returns +true+ if the last time the node - ### was monitored resulted in an update. + ### State machine guard predicate -- returns +true+ if the node has errors. def has_errors? has_errors = ! self.errors.empty? self.log.debug "Checking to see if last contact cleared remaining errors (it %s)" % [ has_errors ? "did not" : "did" ] self.log.debug "Errors are: %p" % [ self.errors ] return has_errors end + ### State machine guard predicate -- Returns +true+ if the node has errors + ### and does not have an ACK status set. + def has_unacked_errors? + return self.has_errors? && !self.ack_set? + end + + + ### State machine guard predicate -- returns +true+ if the node has warnings. + def has_warnings? + has_warnings = ! self.warnings.empty? + self.log.debug "Checking to see if last contact cleared remaining warnings (it %s)" % + [ has_warnings ? "did not" : "did" ] + self.log.debug "Warnings are: %p" % [ self.warnings ] + return has_warnings + end + + + ### State machine guard predicate -- returns +true+ if the node has warnings or errors. + def has_errors_or_warnings? + return self.has_errors? || self.has_warnings? + end + + + ### State machine guard predicate -- returns +true+ if the node has warnings but + ### no errors. + def has_only_warnings? + return self.has_warnings? && ! self.has_errors? + end + + ### Return a string describing the errors that are set on the node. def errors_description return "No errors" if self.errors.empty? return self.errors.map do |key, msg| "%s: %s" % [ key, msg ] end.join( '; ' ) end + + ### Return a string describing the warnings that are set on the node. + def warnings_description + return "No warnings" if self.warnings.empty? + return self.warnings.map do |key, msg| + "%s: %s" % [ key, msg ] + end.join( '; ' ) + end + + # # :section: State Callbacks # ### Log every status transition @@ -1053,19 +1273,20 @@ end ### Update the last status change time. def update_status_changed( transition ) + self.status_last_changed = self.status_changed self.status_changed = Time.now end ### Queue up a transition event whenever one happens def make_transition_event( transition ) node_type = "node_%s" % [ transition.to ] self.log.debug "Making a %s event for %p" % [ node_type, transition ] - self.pending_update_events << Arborist::Event.create( node_type, self ) + self.pending_change_events << Arborist::Event.create( node_type, self ) end ### Callback for when an acknowledgement is set. def on_ack( transition ) @@ -1073,12 +1294,12 @@ end ### Callback for when an acknowledgement is cleared. def on_ack_cleared( transition ) - self.log.warn "ACK cleared for %s" % [ self.identifier ] self.ack = nil + self.log.warn "ACK cleared for %s" % [ self.identifier ] end ### Callback for when a node goes from down to up def on_node_up( transition ) @@ -1092,10 +1313,17 @@ self.log.error "%s is %s" % [ self.identifier, self.status_description ] self.update_delta[ 'errors' ] = [ nil, self.errors_description ] end + ### Callback for when a node goes from up to warn + def on_node_warn( transition ) + self.log.error "%s is %s" % [ self.identifier, self.status_description ] + self.update_delta[ 'warnings' ] = [ nil, self.warnings_description ] + end + + ### Callback for when a node goes from up to disabled def on_node_disabled( transition ) self.log.warn "%s is %s" % [ self.identifier, self.status_description ] end @@ -1113,9 +1341,10 @@ ### Callback for when a node goes from disabled to unknown def on_node_enabled( transition ) self.log.warn "%s is %s" % [ self.identifier, self.status_description ] + self.ack = nil end ### Add the transition from one state to another to the data used to build ### deltas for the #update event.