lib/arborist/manager.rb in arborist-0.2.0.pre20170519125456 vs lib/arborist/manager.rb in arborist-0.2.0

- old
+ new

@@ -20,20 +20,37 @@ # The main Arborist process -- responsible for coordinating all other activity. class Arborist::Manager extend Configurability, Loggability, Arborist::MethodUtilities - include CZTop::Reactor::SignalHandling + include CZTop::Reactor::SignalHandling, + Arborist::HashUtilities # Signals the manager responds to QUEUE_SIGS = [ :INT, :TERM, :HUP, :USR1, # :TODO: :QUIT, :WINCH, :USR2, :TTIN, :TTOU ] & Signal.list.keys.map( &:to_sym ) + # Array of actions supported by the Tree API + VALID_TREEAPI_ACTIONS = %w[ + ack + deps + fetch + graft + modify + prune + search + status + subscribe + unack + unsubscribe + update + ] + # Use the Arborist logger log_to :arborist # Configurability API -- use the 'arborist' section configurability( 'arborist.manager' ) do @@ -167,11 +184,10 @@ ### Setup sockets and start the event loop. def run self.log.info "Getting ready to start the manager." self.setup_sockets - self.publish_system_event( 'startup', start_time: Time.now.to_s, version: Arborist::VERSION ) self.register_timers self.with_signal_handler( reactor, *QUEUE_SIGS ) do self.start_accepting_requests end ensure @@ -242,17 +258,29 @@ self.log.info "Stopping the manager." self.reactor.stop_polling end + ### Return a human-readable representation of the Manager suitable for debugging. + def inspect + return "#<%p:%#x {runid: %s} %d nodes>" % [ + self.class, + self.object_id * 2, + self.run_id, + self.nodes.length, + ] + end + + # # :section: Node state saving/reloading # ### Write out the state of all the manager's nodes to the state_file if one is ### configured. def save_node_states + start_time = Time.now path = self.class.state_file or return self.log.info "Saving current node state to %s" % [ path ] tmpfile = Tempfile.create( [path.basename.to_s.sub(path.extname, ''), path.extname], path.dirname.to_s, @@ -260,10 +288,11 @@ ) Marshal.dump( self.nodes, tmpfile ) tmpfile.close File.rename( tmpfile.path, path.to_s ) + self.log.debug "Saved state file in %0.1f seconds." % [ Time.now - start_time ] rescue SystemCallError => err self.log.error "%p while saving node state: %s" % [ err.class, err.message ] ensure @@ -321,12 +350,19 @@ ### Register a periodic timer that will save a snapshot of the node tree's state to the state ### file on a configured interval if one is configured. def register_checkpoint_timer - return nil unless self.class.state_file - interval = self.class.checkpoint_frequency or return nil + unless self.class.state_file + self.log.info "No state file configured; skipping checkpoint timer setup." + return nil + end + interval = self.class.checkpoint_frequency + unless interval && interval.nonzero? + self.log.info "Checkpoint frequency is %p; skipping checkpoint timer setup." % [ interval ] + return nil + end self.log.info "Setting up node state checkpoint every %0.3fs" % [ interval ] @checkpoint_timer = self.reactor.add_periodic_timer( interval ) do self.save_node_states end @@ -444,14 +480,12 @@ ### Add the specified +node+ to the Manager. def add_node( node ) identifier = node.identifier - unless self.nodes[ identifier ].equal?( node ) - self.remove_node( self.nodes[identifier] ) - self.nodes[ identifier ] = node - end + raise Arborist::NodeError, "Node %p already present." % [ identifier ] if self.nodes[ identifier ] + self.nodes[ identifier ] = node if self.tree_built? self.link_node( node ) self.publish_system_event( 'node_added', node: identifier ) end @@ -489,30 +523,30 @@ end ### Update the node with the specified +identifier+ with the given +new_properties+ ### and propagate any events generated by the update to the node and its ancestors. - def update_node( identifier, new_properties ) + def update_node( identifier, new_properties, monitor_key='_' ) unless (( node = self.nodes[identifier] )) self.log.warn "Update for non-existent node %p ignored." % [ identifier ] return [] end - events = node.update( new_properties ) + events = node.update( new_properties, monitor_key ) self.propagate_events( node, events ) end - ### Traverse the node tree and fetch the specified +return_values+ from any nodes which + ### Traverse the node tree and return the specified +return_values+ from any nodes which ### match the given +filter+, skipping downed nodes and all their children - ### unless +include_down+ is set. If +return_values+ is set to +nil+, then all + ### if +exclude_down+ is set. If +return_values+ is set to +nil+, then all ### values from the node will be returned. - def fetch_matching_node_states( filter, return_values, include_down=false, negative_filter={} ) - nodes_iter = if include_down - self.all_nodes - else + def find_matching_node_states( filter, return_values, exclude_down=false, negative_filter={} ) + nodes_iter = if exclude_down self.reachable_nodes + else + self.all_nodes end states = nodes_iter. select {|node| node.matches?(filter) }. reject {|node| !negative_filter.empty? && node.matches?(negative_filter) }. @@ -549,11 +583,11 @@ ### Set up the ZeroMQ REP socket for the Tree API. def setup_tree_socket @tree_socket = CZTop::Socket::REP.new - self.log.debug " binding the tree API socket (%#0x) to %p" % + self.log.info " binding the tree API socket (%#0x) to %p" % [ @tree_socket.object_id * 2, Arborist.tree_api_url ] @tree_socket.options.linger = 0 @tree_socket.bind( Arborist.tree_api_url ) end @@ -567,27 +601,24 @@ ### ZMQ::Handler API -- Read and handle an incoming request. def on_tree_socket_event( event ) if event.readable? request = event.socket.receive - msg = self.handle_tree_request( request ) + msg = self.dispatch_request( request ) event.socket << msg else raise "Unsupported event %p on tree API socket!" % [ event ] end end ### Handle the specified +raw_request+ and return a response. - def handle_tree_request( raw_request ) + def dispatch_request( raw_request ) raise "Manager is shutting down" unless self.running? header, body = Arborist::TreeAPI.decode( raw_request ) - raise Arborist::MessageError, "missing required header 'action'" unless - header.key?( 'action' ) - handler = self.lookup_tree_request_action( header ) or - raise Arborist::MessageError, "No such action '%s'" % [ header['action'] ] + handler = self.lookup_tree_request_action( header ) return handler.call( header, body ) rescue => err self.log.error "%p: %s" % [ err.class, err.message ] @@ -609,20 +640,23 @@ ### Given a request +header+, return a #call-able object that can handle the response. def lookup_tree_request_action( header ) raise Arborist::MessageError, "unsupported version %d" % [ header['version'] ] unless header['version'] == 1 - handler_name = "handle_%s_request" % [ header['action'] ] - return nil unless self.respond_to?( handler_name ) + action = header['action'] or + raise Arborist::MessageError, "missing required header 'action'" + raise Arborist::MessageError, "No such action '%s'" % [ action ] unless + VALID_TREEAPI_ACTIONS.include?( action ) + handler_name = "handle_%s_request" % [ action ] return self.method( handler_name ) end ### Return a response to the `status` action. def handle_status_request( header, body ) - self.log.debug "STATUS: %p" % [ header ] + self.log.info "STATUS: %p" % [ header ] return Arborist::TreeAPI.successful_response( server_version: Arborist::VERSION, state: self.running? ? 'running' : 'not running', uptime: self.uptime, nodecount: self.nodecount @@ -630,29 +664,29 @@ end ### Return a response to the `subscribe` action. def handle_subscribe_request( header, body ) - self.log.debug "SUBSCRIBE: %p" % [ header ] + self.log.info "SUBSCRIBE: %p" % [ header ] event_type = header[ 'event_type' ] node_identifier = header[ 'identifier' ] body = [ body ] unless body.is_a?( Array ) positive = body.shift negative = body.shift || {} subscription = self.create_subscription( node_identifier, event_type, positive, negative ) self.log.info "Subscription to %s events at or under %s: %p" % - [ event_type, node_identifier || 'the root node', subscription ] + [ event_type || 'all', node_identifier || 'the root node', subscription ] return Arborist::TreeAPI.successful_response( id: subscription.id ) end ### Return a response to the `unsubscribe` action. def handle_unsubscribe_request( header, body ) - self.log.debug "UNSUBSCRIBE: %p" % [ header ] + self.log.info "UNSUBSCRIBE: %p" % [ header ] subscription_id = header[ 'subscription_id' ] or return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for UNSUBSCRIBE.' ) subscription = self.remove_subscription( subscription_id ) or return Arborist::TreeAPI.successful_response( nil ) @@ -662,71 +696,111 @@ criteria: subscription.criteria ) end - ### Return a repsonse to the `list` action. - def handle_list_request( header, body ) - self.log.debug "LIST: %p" % [ header ] + ### Return a repsonse to the `fetch` action. + def handle_fetch_request( header, body ) + self.log.info "FETCH: %p" % [ header ] from = header['from'] || '_' depth = header['depth'] + tree = header['tree'] - start_node = self.nodes[ from ] + start_node = self.nodes[ from ] or + return Arborist::TreeAPI.error_response( 'client', "No such node %s." % [from] ) self.log.debug " Listing nodes under %p" % [ start_node ] - iter = if depth + + if tree + iter = [ start_node.to_h(depth: (depth || -1)) ] + elsif depth self.log.debug " depth limited to %d" % [ depth ] - self.depth_limited_enumerator_for( start_node, depth ) + iter = self.depth_limited_enumerator_for( start_node, depth ) else self.log.debug " no depth limit" - self.enumerator_for( start_node ) + iter = self.enumerator_for( start_node ) end data = iter.map( &:to_h ) self.log.debug " got data for %d nodes" % [ data.length ] return Arborist::TreeAPI.successful_response( data ) end - ### Return a response to the 'fetch' action. - def handle_fetch_request( header, body ) - self.log.debug "FETCH: %p" % [ header ] + ### Return a response to the `deps` action. + def handle_deps_request( header, body ) + self.log.info "DEPS: %p" % [ header ] + from = header['from'] || '_' - include_down = header['include_down'] + deps = self.merge_dependencies_from( from ) + deps.delete( from ) + + return Arborist::TreeAPI.successful_response({ deps: deps.to_a }) + + rescue Arborist::ClientError => err + return Arborist::TreeAPI.error_response( 'client', err.message ) + end + + + ### Recurse into the children and secondary dependencies of the +from+ node and + ### merge the identifiers of the traversed nodes into the +deps_set+. + def merge_dependencies_from( from, deps_set=Set.new ) + return deps_set unless deps_set.add?( from ) + + start_node = self.nodes[ from ] or + raise Arborist::ClientError "No such node %s." % [ from ] + + self.enumerator_for( start_node ).each do |subnode| + deps_set.add( subnode.identifier ) + subnode.node_subscribers.each do |subdep| + self.merge_dependencies_from( subdep, deps_set ) + end + end + + return deps_set + end + + + ### Return a response to the 'search' action. + def handle_search_request( header, body ) + self.log.info "SEARCH: %p" % [ header ] + + exclude_down = header['exclude_down'] values = if header.key?( 'return' ) header['return'] || [] else nil end body = [ body ] unless body.is_a?( Array ) positive = body.shift negative = body.shift || {} - states = self.fetch_matching_node_states( positive, values, include_down, negative ) + states = self.find_matching_node_states( positive, values, exclude_down, negative ) return Arborist::TreeAPI.successful_response( states ) end ### Update nodes using the data from the update request's +body+. def handle_update_request( header, body ) - self.log.debug "UPDATE: %p" % [ header ] + self.log.info "UPDATE: %p" % [ header ] unless body.respond_to?( :each ) return Arborist::TreeAPI.error_response( 'client', 'Malformed update: body does not respond to #each' ) end + monitor_key = header['monitor_key'] body.each do |identifier, properties| - self.update_node( identifier, properties ) + self.update_node( identifier, properties, monitor_key ) end return Arborist::TreeAPI.successful_response( nil ) end ### Remove a node and its children. def handle_prune_request( header, body ) - self.log.debug "PRUNE: %p" % [ header ] + self.log.info "PRUNE: %p" % [ header ] identifier = header[ 'identifier' ] or return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for PRUNE.' ) node = self.remove_node( identifier ) @@ -734,14 +808,19 @@ end ### Add a node def handle_graft_request( header, body ) - self.log.debug "GRAFT: %p" % [ header ] + self.log.info "GRAFT: %p" % [ header ] identifier = header[ 'identifier' ] or return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for GRAFT.' ) + + if self.nodes[ identifier ] + return Arborist::TreeAPI.error_response( 'client', "Node %p already exists." % [identifier] ) + end + type = header[ 'type' ] or return Arborist::TreeAPI.error_response( 'client', 'No type specified for GRAFT.' ) parent = header[ 'parent' ] || '_' parent_node = self.nodes[ parent ] or return Arborist::TreeAPI.error_response( 'client', 'No parent node found for %s.' % [parent] ) @@ -763,26 +842,70 @@ end ### Modify a node's operational attributes def handle_modify_request( header, body ) - self.log.debug "MODIFY: %p" % [ header ] + self.log.info "MODIFY: %p" % [ header ] identifier = header[ 'identifier' ] or return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for MODIFY.' ) return Arborist::TreeAPI.error_response( 'client', "Unable to MODIFY root node." ) if identifier == '_' node = self.nodes[ identifier ] or return Arborist::TreeAPI.error_response( 'client', "No such node %p" % [identifier] ) self.log.debug "Modifying operational attributes of the %s node: %p" % [ identifier, body ] + if new_parent_identifier = body.delete( 'parent' ) + old_parent = self.nodes[ node.parent ] + new_parent = self.nodes[ new_parent_identifier ] or + return Arborist::TreeAPI.error_response( 'client', "No such parent node: %p" % [new_parent_identifier] ) + node.reparent( old_parent, new_parent ) + end + node.modify( body ) return Arborist::TreeAPI.successful_response( nil ) end + ### Acknowledge a node + def handle_ack_request( header, body ) + self.log.info "ACK: %p" % [ header ] + + identifier = header[ 'identifier' ] or + return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for ACK.' ) + node = self.nodes[ identifier ] or + return Arborist::TreeAPI.error_response( 'client', "No such node %p" % [identifier] ) + + self.log.debug "Acking the %s node: %p" % [ identifier, body ] + + body = symbolify_keys( body ) + events = node.acknowledge( **body ) + self.propagate_events( node, events ) + + return Arborist::TreeAPI.successful_response( nil ) + end + + + ### Un-acknowledge a node + def handle_unack_request( header, body ) + self.log.info "UNACK: %p" % [ header ] + + identifier = header[ 'identifier' ] or + return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for UNACK.' ) + node = self.nodes[ identifier ] or + return Arborist::TreeAPI.error_response( 'client', "No such node %p" % [identifier] ) + + self.log.debug "Unacking the %s node: %p" % [ identifier, body ] + + events = node.unacknowledge + self.propagate_events( node, events ) + + return Arborist::TreeAPI.successful_response( nil ) + end + + ### Return the current root node. def root_node return self.nodes[ '_' ] end @@ -857,11 +980,11 @@ # ### Set up the ZMQ PUB socket for published events. def setup_event_socket @event_socket = CZTop::Socket::PUB.new - self.log.debug " binding the event socket (%#0x) to %p" % + self.log.info " binding the event socket (%#0x) to %p" % [ @event_socket.object_id * 2, Arborist.event_api_url ] @event_socket.options.linger = ( self.linger * 1000 ).ceil @event_socket.bind( Arborist.event_api_url ) end @@ -891,26 +1014,29 @@ end ### Register the publisher with the reactor if it's not already. def register_event_socket + self.log.debug "Registering event socket for write events." self.reactor.enable_events( self.event_socket, :write ) unless self.reactor.event_enabled?( self.event_socket, :write ) end ### Unregister the event publisher socket from the reactor if it's registered. def unregister_event_socket + self.log.debug "Unregistering event socket for write events." self.reactor.disable_events( self.event_socket, :write ) if self.reactor.event_enabled?( self.event_socket, :write ) end ### IO event handler for the event socket. def on_event_socket_event( event ) if event.writable? if (( msg = self.event_queue.shift )) + # self.log.debug "Publishing event %p" % [ msg ] event.socket << msg end else raise "Unhandled event %p on the event socket" % [ event ] end @@ -919,10 +1045,15 @@ end ### Publish a system event that observers can watch for to detect restarts. def publish_heartbeat_event - self.publish_system_event( 'heartbeat', run_id: self.run_id ) + return unless self.start_time + self.publish_system_event( 'heartbeat', + run_id: self.run_id, + start_time: self.start_time.iso8601, + version: Arborist::VERSION + ) end ### Publish an event with the specified +eventname+ and +data+. def publish_system_event( eventname, **data )