lib/arborist/manager.rb in arborist-0.2.0.pre20170519125456 vs lib/arborist/manager.rb in arborist-0.2.0
- old
+ new
@@ -20,20 +20,37 @@
# The main Arborist process -- responsible for coordinating all other activity.
class Arborist::Manager
extend Configurability,
Loggability,
Arborist::MethodUtilities
- include CZTop::Reactor::SignalHandling
+ include CZTop::Reactor::SignalHandling,
+ Arborist::HashUtilities
# Signals the manager responds to
QUEUE_SIGS = [
:INT, :TERM, :HUP, :USR1,
# :TODO: :QUIT, :WINCH, :USR2, :TTIN, :TTOU
] & Signal.list.keys.map( &:to_sym )
+ # Array of actions supported by the Tree API
+ VALID_TREEAPI_ACTIONS = %w[
+ ack
+ deps
+ fetch
+ graft
+ modify
+ prune
+ search
+ status
+ subscribe
+ unack
+ unsubscribe
+ update
+ ]
+
# Use the Arborist logger
log_to :arborist
# Configurability API -- use the 'arborist' section
configurability( 'arborist.manager' ) do
@@ -167,11 +184,10 @@
### Setup sockets and start the event loop.
def run
self.log.info "Getting ready to start the manager."
self.setup_sockets
- self.publish_system_event( 'startup', start_time: Time.now.to_s, version: Arborist::VERSION )
self.register_timers
self.with_signal_handler( reactor, *QUEUE_SIGS ) do
self.start_accepting_requests
end
ensure
@@ -242,17 +258,29 @@
self.log.info "Stopping the manager."
self.reactor.stop_polling
end
+ ### Return a human-readable representation of the Manager suitable for debugging.
+ def inspect
+ return "#<%p:%#x {runid: %s} %d nodes>" % [
+ self.class,
+ self.object_id * 2,
+ self.run_id,
+ self.nodes.length,
+ ]
+ end
+
+
#
# :section: Node state saving/reloading
#
### Write out the state of all the manager's nodes to the state_file if one is
### configured.
def save_node_states
+ start_time = Time.now
path = self.class.state_file or return
self.log.info "Saving current node state to %s" % [ path ]
tmpfile = Tempfile.create(
[path.basename.to_s.sub(path.extname, ''), path.extname],
path.dirname.to_s,
@@ -260,10 +288,11 @@
)
Marshal.dump( self.nodes, tmpfile )
tmpfile.close
File.rename( tmpfile.path, path.to_s )
+ self.log.debug "Saved state file in %0.1f seconds." % [ Time.now - start_time ]
rescue SystemCallError => err
self.log.error "%p while saving node state: %s" % [ err.class, err.message ]
ensure
@@ -321,12 +350,19 @@
### Register a periodic timer that will save a snapshot of the node tree's state to the state
### file on a configured interval if one is configured.
def register_checkpoint_timer
- return nil unless self.class.state_file
- interval = self.class.checkpoint_frequency or return nil
+ unless self.class.state_file
+ self.log.info "No state file configured; skipping checkpoint timer setup."
+ return nil
+ end
+ interval = self.class.checkpoint_frequency
+ unless interval && interval.nonzero?
+ self.log.info "Checkpoint frequency is %p; skipping checkpoint timer setup." % [ interval ]
+ return nil
+ end
self.log.info "Setting up node state checkpoint every %0.3fs" % [ interval ]
@checkpoint_timer = self.reactor.add_periodic_timer( interval ) do
self.save_node_states
end
@@ -444,14 +480,12 @@
### Add the specified +node+ to the Manager.
def add_node( node )
identifier = node.identifier
- unless self.nodes[ identifier ].equal?( node )
- self.remove_node( self.nodes[identifier] )
- self.nodes[ identifier ] = node
- end
+ raise Arborist::NodeError, "Node %p already present." % [ identifier ] if self.nodes[ identifier ]
+ self.nodes[ identifier ] = node
if self.tree_built?
self.link_node( node )
self.publish_system_event( 'node_added', node: identifier )
end
@@ -489,30 +523,30 @@
end
### Update the node with the specified +identifier+ with the given +new_properties+
### and propagate any events generated by the update to the node and its ancestors.
- def update_node( identifier, new_properties )
+ def update_node( identifier, new_properties, monitor_key='_' )
unless (( node = self.nodes[identifier] ))
self.log.warn "Update for non-existent node %p ignored." % [ identifier ]
return []
end
- events = node.update( new_properties )
+ events = node.update( new_properties, monitor_key )
self.propagate_events( node, events )
end
- ### Traverse the node tree and fetch the specified +return_values+ from any nodes which
+ ### Traverse the node tree and return the specified +return_values+ from any nodes which
### match the given +filter+, skipping downed nodes and all their children
- ### unless +include_down+ is set. If +return_values+ is set to +nil+, then all
+ ### if +exclude_down+ is set. If +return_values+ is set to +nil+, then all
### values from the node will be returned.
- def fetch_matching_node_states( filter, return_values, include_down=false, negative_filter={} )
- nodes_iter = if include_down
- self.all_nodes
- else
+ def find_matching_node_states( filter, return_values, exclude_down=false, negative_filter={} )
+ nodes_iter = if exclude_down
self.reachable_nodes
+ else
+ self.all_nodes
end
states = nodes_iter.
select {|node| node.matches?(filter) }.
reject {|node| !negative_filter.empty? && node.matches?(negative_filter) }.
@@ -549,11 +583,11 @@
### Set up the ZeroMQ REP socket for the Tree API.
def setup_tree_socket
@tree_socket = CZTop::Socket::REP.new
- self.log.debug " binding the tree API socket (%#0x) to %p" %
+ self.log.info " binding the tree API socket (%#0x) to %p" %
[ @tree_socket.object_id * 2, Arborist.tree_api_url ]
@tree_socket.options.linger = 0
@tree_socket.bind( Arborist.tree_api_url )
end
@@ -567,27 +601,24 @@
### ZMQ::Handler API -- Read and handle an incoming request.
def on_tree_socket_event( event )
if event.readable?
request = event.socket.receive
- msg = self.handle_tree_request( request )
+ msg = self.dispatch_request( request )
event.socket << msg
else
raise "Unsupported event %p on tree API socket!" % [ event ]
end
end
### Handle the specified +raw_request+ and return a response.
- def handle_tree_request( raw_request )
+ def dispatch_request( raw_request )
raise "Manager is shutting down" unless self.running?
header, body = Arborist::TreeAPI.decode( raw_request )
- raise Arborist::MessageError, "missing required header 'action'" unless
- header.key?( 'action' )
- handler = self.lookup_tree_request_action( header ) or
- raise Arborist::MessageError, "No such action '%s'" % [ header['action'] ]
+ handler = self.lookup_tree_request_action( header )
return handler.call( header, body )
rescue => err
self.log.error "%p: %s" % [ err.class, err.message ]
@@ -609,20 +640,23 @@
### Given a request +header+, return a #call-able object that can handle the response.
def lookup_tree_request_action( header )
raise Arborist::MessageError, "unsupported version %d" % [ header['version'] ] unless
header['version'] == 1
- handler_name = "handle_%s_request" % [ header['action'] ]
- return nil unless self.respond_to?( handler_name )
+ action = header['action'] or
+ raise Arborist::MessageError, "missing required header 'action'"
+ raise Arborist::MessageError, "No such action '%s'" % [ action ] unless
+ VALID_TREEAPI_ACTIONS.include?( action )
+ handler_name = "handle_%s_request" % [ action ]
return self.method( handler_name )
end
### Return a response to the `status` action.
def handle_status_request( header, body )
- self.log.debug "STATUS: %p" % [ header ]
+ self.log.info "STATUS: %p" % [ header ]
return Arborist::TreeAPI.successful_response(
server_version: Arborist::VERSION,
state: self.running? ? 'running' : 'not running',
uptime: self.uptime,
nodecount: self.nodecount
@@ -630,29 +664,29 @@
end
### Return a response to the `subscribe` action.
def handle_subscribe_request( header, body )
- self.log.debug "SUBSCRIBE: %p" % [ header ]
+ self.log.info "SUBSCRIBE: %p" % [ header ]
event_type = header[ 'event_type' ]
node_identifier = header[ 'identifier' ]
body = [ body ] unless body.is_a?( Array )
positive = body.shift
negative = body.shift || {}
subscription = self.create_subscription( node_identifier, event_type, positive, negative )
self.log.info "Subscription to %s events at or under %s: %p" %
- [ event_type, node_identifier || 'the root node', subscription ]
+ [ event_type || 'all', node_identifier || 'the root node', subscription ]
return Arborist::TreeAPI.successful_response( id: subscription.id )
end
### Return a response to the `unsubscribe` action.
def handle_unsubscribe_request( header, body )
- self.log.debug "UNSUBSCRIBE: %p" % [ header ]
+ self.log.info "UNSUBSCRIBE: %p" % [ header ]
subscription_id = header[ 'subscription_id' ] or
return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for UNSUBSCRIBE.' )
subscription = self.remove_subscription( subscription_id ) or
return Arborist::TreeAPI.successful_response( nil )
@@ -662,71 +696,111 @@
criteria: subscription.criteria
)
end
- ### Return a repsonse to the `list` action.
- def handle_list_request( header, body )
- self.log.debug "LIST: %p" % [ header ]
+ ### Return a repsonse to the `fetch` action.
+ def handle_fetch_request( header, body )
+ self.log.info "FETCH: %p" % [ header ]
from = header['from'] || '_'
depth = header['depth']
+ tree = header['tree']
- start_node = self.nodes[ from ]
+ start_node = self.nodes[ from ] or
+ return Arborist::TreeAPI.error_response( 'client', "No such node %s." % [from] )
self.log.debug " Listing nodes under %p" % [ start_node ]
- iter = if depth
+
+ if tree
+ iter = [ start_node.to_h(depth: (depth || -1)) ]
+ elsif depth
self.log.debug " depth limited to %d" % [ depth ]
- self.depth_limited_enumerator_for( start_node, depth )
+ iter = self.depth_limited_enumerator_for( start_node, depth )
else
self.log.debug " no depth limit"
- self.enumerator_for( start_node )
+ iter = self.enumerator_for( start_node )
end
data = iter.map( &:to_h )
self.log.debug " got data for %d nodes" % [ data.length ]
return Arborist::TreeAPI.successful_response( data )
end
- ### Return a response to the 'fetch' action.
- def handle_fetch_request( header, body )
- self.log.debug "FETCH: %p" % [ header ]
+ ### Return a response to the `deps` action.
+ def handle_deps_request( header, body )
+ self.log.info "DEPS: %p" % [ header ]
+ from = header['from'] || '_'
- include_down = header['include_down']
+ deps = self.merge_dependencies_from( from )
+ deps.delete( from )
+
+ return Arborist::TreeAPI.successful_response({ deps: deps.to_a })
+
+ rescue Arborist::ClientError => err
+ return Arborist::TreeAPI.error_response( 'client', err.message )
+ end
+
+
+ ### Recurse into the children and secondary dependencies of the +from+ node and
+ ### merge the identifiers of the traversed nodes into the +deps_set+.
+ def merge_dependencies_from( from, deps_set=Set.new )
+ return deps_set unless deps_set.add?( from )
+
+ start_node = self.nodes[ from ] or
+ raise Arborist::ClientError "No such node %s." % [ from ]
+
+ self.enumerator_for( start_node ).each do |subnode|
+ deps_set.add( subnode.identifier )
+ subnode.node_subscribers.each do |subdep|
+ self.merge_dependencies_from( subdep, deps_set )
+ end
+ end
+
+ return deps_set
+ end
+
+
+ ### Return a response to the 'search' action.
+ def handle_search_request( header, body )
+ self.log.info "SEARCH: %p" % [ header ]
+
+ exclude_down = header['exclude_down']
values = if header.key?( 'return' )
header['return'] || []
else
nil
end
body = [ body ] unless body.is_a?( Array )
positive = body.shift
negative = body.shift || {}
- states = self.fetch_matching_node_states( positive, values, include_down, negative )
+ states = self.find_matching_node_states( positive, values, exclude_down, negative )
return Arborist::TreeAPI.successful_response( states )
end
### Update nodes using the data from the update request's +body+.
def handle_update_request( header, body )
- self.log.debug "UPDATE: %p" % [ header ]
+ self.log.info "UPDATE: %p" % [ header ]
unless body.respond_to?( :each )
return Arborist::TreeAPI.error_response( 'client', 'Malformed update: body does not respond to #each' )
end
+ monitor_key = header['monitor_key']
body.each do |identifier, properties|
- self.update_node( identifier, properties )
+ self.update_node( identifier, properties, monitor_key )
end
return Arborist::TreeAPI.successful_response( nil )
end
### Remove a node and its children.
def handle_prune_request( header, body )
- self.log.debug "PRUNE: %p" % [ header ]
+ self.log.info "PRUNE: %p" % [ header ]
identifier = header[ 'identifier' ] or
return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for PRUNE.' )
node = self.remove_node( identifier )
@@ -734,14 +808,19 @@
end
### Add a node
def handle_graft_request( header, body )
- self.log.debug "GRAFT: %p" % [ header ]
+ self.log.info "GRAFT: %p" % [ header ]
identifier = header[ 'identifier' ] or
return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for GRAFT.' )
+
+ if self.nodes[ identifier ]
+ return Arborist::TreeAPI.error_response( 'client', "Node %p already exists." % [identifier] )
+ end
+
type = header[ 'type' ] or
return Arborist::TreeAPI.error_response( 'client', 'No type specified for GRAFT.' )
parent = header[ 'parent' ] || '_'
parent_node = self.nodes[ parent ] or
return Arborist::TreeAPI.error_response( 'client', 'No parent node found for %s.' % [parent] )
@@ -763,26 +842,70 @@
end
### Modify a node's operational attributes
def handle_modify_request( header, body )
- self.log.debug "MODIFY: %p" % [ header ]
+ self.log.info "MODIFY: %p" % [ header ]
identifier = header[ 'identifier' ] or
return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for MODIFY.' )
return Arborist::TreeAPI.error_response( 'client', "Unable to MODIFY root node." ) if identifier == '_'
node = self.nodes[ identifier ] or
return Arborist::TreeAPI.error_response( 'client', "No such node %p" % [identifier] )
self.log.debug "Modifying operational attributes of the %s node: %p" % [ identifier, body ]
+ if new_parent_identifier = body.delete( 'parent' )
+ old_parent = self.nodes[ node.parent ]
+ new_parent = self.nodes[ new_parent_identifier ] or
+ return Arborist::TreeAPI.error_response( 'client', "No such parent node: %p" % [new_parent_identifier] )
+ node.reparent( old_parent, new_parent )
+ end
+
node.modify( body )
return Arborist::TreeAPI.successful_response( nil )
end
+ ### Acknowledge a node
+ def handle_ack_request( header, body )
+ self.log.info "ACK: %p" % [ header ]
+
+ identifier = header[ 'identifier' ] or
+ return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for ACK.' )
+ node = self.nodes[ identifier ] or
+ return Arborist::TreeAPI.error_response( 'client', "No such node %p" % [identifier] )
+
+ self.log.debug "Acking the %s node: %p" % [ identifier, body ]
+
+ body = symbolify_keys( body )
+ events = node.acknowledge( **body )
+ self.propagate_events( node, events )
+
+ return Arborist::TreeAPI.successful_response( nil )
+ end
+
+
+ ### Un-acknowledge a node
+ def handle_unack_request( header, body )
+ self.log.info "UNACK: %p" % [ header ]
+
+ identifier = header[ 'identifier' ] or
+ return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for UNACK.' )
+ node = self.nodes[ identifier ] or
+ return Arborist::TreeAPI.error_response( 'client', "No such node %p" % [identifier] )
+
+ self.log.debug "Unacking the %s node: %p" % [ identifier, body ]
+
+ events = node.unacknowledge
+ self.propagate_events( node, events )
+
+ return Arborist::TreeAPI.successful_response( nil )
+ end
+
+
### Return the current root node.
def root_node
return self.nodes[ '_' ]
end
@@ -857,11 +980,11 @@
#
### Set up the ZMQ PUB socket for published events.
def setup_event_socket
@event_socket = CZTop::Socket::PUB.new
- self.log.debug " binding the event socket (%#0x) to %p" %
+ self.log.info " binding the event socket (%#0x) to %p" %
[ @event_socket.object_id * 2, Arborist.event_api_url ]
@event_socket.options.linger = ( self.linger * 1000 ).ceil
@event_socket.bind( Arborist.event_api_url )
end
@@ -891,26 +1014,29 @@
end
### Register the publisher with the reactor if it's not already.
def register_event_socket
+ self.log.debug "Registering event socket for write events."
self.reactor.enable_events( self.event_socket, :write ) unless
self.reactor.event_enabled?( self.event_socket, :write )
end
### Unregister the event publisher socket from the reactor if it's registered.
def unregister_event_socket
+ self.log.debug "Unregistering event socket for write events."
self.reactor.disable_events( self.event_socket, :write ) if
self.reactor.event_enabled?( self.event_socket, :write )
end
### IO event handler for the event socket.
def on_event_socket_event( event )
if event.writable?
if (( msg = self.event_queue.shift ))
+ # self.log.debug "Publishing event %p" % [ msg ]
event.socket << msg
end
else
raise "Unhandled event %p on the event socket" % [ event ]
end
@@ -919,10 +1045,15 @@
end
### Publish a system event that observers can watch for to detect restarts.
def publish_heartbeat_event
- self.publish_system_event( 'heartbeat', run_id: self.run_id )
+ return unless self.start_time
+ self.publish_system_event( 'heartbeat',
+ run_id: self.run_id,
+ start_time: self.start_time.iso8601,
+ version: Arborist::VERSION
+ )
end
### Publish an event with the specified +eventname+ and +data+.
def publish_system_event( eventname, **data )