lib/arborist/manager.rb in arborist-0.0.1.pre20160606141735 vs lib/arborist/manager.rb in arborist-0.0.1.pre20160829140603

- old
+ new

@@ -1,8 +1,9 @@ # -*- ruby -*- #encoding: utf-8 +require 'securerandom' require 'pathname' require 'tempfile' require 'configurability' require 'loggability' require 'rbczmq' @@ -28,11 +29,13 @@ SIGNAL_INTERVAL = 0.5 # Configurability API -- set config defaults CONFIG_DEFAULTS = { state_file: nil, - checkpoint_frequency: 30 + checkpoint_frequency: 30000, + heartbeat_frequency: 1000, + linger: 5000 } # Use the Arborist logger log_to :arborist @@ -44,21 +47,39 @@ ## # The Pathname of the file the manager's node tree state is saved to singleton_attr_accessor :state_file ## - # The number of seconds between automatic state checkpoints + # The number of milliseconds between automatic state checkpoints singleton_attr_accessor :checkpoint_frequency + ## + # The number of milliseconds between heartbeat events + singleton_attr_accessor :heartbeat_frequency + ## + # The maximum amount of time to wait for pending events to be delivered during + # shutdown, in milliseconds. + singleton_attr_accessor :linger + + ### Configurability API -- configure the manager def self::configure( config=nil ) config ||= {} config = self.defaults.merge( config[:manager] || {} ) + self.log.debug "Config is: %p" % [ config ] + self.state_file = config[:state_file] && Pathname( config[:state_file] ) + self.linger = config[:linger].to_i + self.log.info "Linger configured to %p" % [ self.linger ] + self.heartbeat_frequency = config[:heartbeat_frequency].to_i || + CONFIG_DEFAULTS[:heartbeat_frequency] + raise Arborist::ConfigError, "heartbeat frequency must be a positive non-zero integer" if + self.heartbeat_frequency <= 0 + interval = config[:checkpoint_frequency].to_i if interval && interval.nonzero? self.checkpoint_frequency = interval else self.checkpoint_frequency = nil @@ -70,35 +91,54 @@ # Instance methods # ### Create a new Arborist::Manager. def initialize + @run_id = SecureRandom.hex( 16 ) @root = Arborist::Node.create( :root ) - @nodes = { - '_' => @root, - } + @nodes = { '_' => @root } + @subscriptions = {} @tree_built = false @tree_sock = @event_sock = nil @signal_timer = nil @start_time = nil - Thread.main[:signal_queue] = [] - @zmq_loop = nil - - @api_handler = nil - @event_publisher = nil @checkpoint_timer = nil + @linger = self.class.linger || Arborist::Manager::CONFIG_DEFAULTS[ :linger ] + self.log.info "Linger set to %p" % [ @linger ] + + @zmq_loop = ZMQ::Loop.new + # @zmq_loop.verbose = true + @tree_sock = self.setup_tree_socket + @event_sock = self.setup_event_socket + + @api_handler = Arborist::Manager::TreeAPI.new( @tree_sock, self ) + @tree_sock.handler = @api_handler + @zmq_loop.register( @tree_sock ) + + @event_publisher = Arborist::Manager::EventPublisher.new( @event_sock, self, @zmq_loop ) + @event_sock.handler = @event_publisher + @zmq_loop.register( @event_sock ) + + @heartbeat_timer = self.make_heartbeat_timer + @checkpoint_timer = self.make_checkpoint_timer + + Thread.main[:signal_queue] = [] end ###### public ###### ## + # A unique string used to identify different runs of the Manager + attr_reader :run_id + + ## # The root node of the tree. attr_accessor :root ## # The Hash of all loaded Nodes, keyed by their identifier @@ -126,32 +166,50 @@ ## # Flag for marking when the tree is built successfully the first time attr_predicate_accessor :tree_built + ## + # The maximum amount of time to wait for pending events to be delivered during + # shutdown, in milliseconds. + attr_reader :linger + ## + # The ZMQ::Timer that processes signals + attr_reader :signal_timer + + ## + # The ZMQ::Timer that periodically checkpoints the manager's state (if it's configured to do so) + attr_reader :checkpoint_timer + + ## + # The ZMQ::Timer that periodically publishes a heartbeat event + attr_reader :heartbeat_timer + + # # :section: Startup/Shutdown # ### Setup sockets and start the event loop. def run self.log.info "Getting ready to start the manager." - self.setup_sockets + self.publish_system_event( 'startup', start_time: Time.now.to_s, version: Arborist::VERSION ) + self.register_timers self.set_signal_handlers self.start_accepting_requests return self # For chaining ensure self.restore_signal_handlers - if @zmq_loop + if self.zmq_loop self.log.debug "Unregistering sockets." - @zmq_loop.remove( @tree_sock ) + self.zmq_loop.remove( @tree_sock ) @tree_sock.pollable.close - @zmq_loop.remove( @event_sock ) + self.zmq_loop.remove( @event_sock ) @event_sock.pollable.close - @zmq_loop.cancel_timer( @checkpoint_timer ) if @checkpoint_timer + self.zmq_loop.cancel_timer( @checkpoint_timer ) if @checkpoint_timer end self.save_node_states self.log.debug "Resetting ZMQ context" @@ -159,47 +217,33 @@ end ### Returns true if the Manager is running. def running? - return @zmq_loop && @zmq_loop.running? + return self.zmq_loop && self.zmq_loop.running? end + ### Register the Manager's timers. + def register_timers + self.zmq_loop.register_timer( self.heartbeat_timer ) + self.zmq_loop.register_timer( self.checkpoint_timer ) if self.checkpoint_timer + end + + ### Start a loop, accepting a request and handling it. def start_accepting_requests self.log.debug "Starting the main loop" - @zmq_loop = ZMQ::Loop.new - - @api_handler = Arborist::Manager::TreeAPI.new( @tree_sock, self ) - @tree_sock.handler = @api_handler - @zmq_loop.register( @tree_sock ) - - @event_publisher = Arborist::Manager::EventPublisher.new( @event_sock, self, @zmq_loop ) - @event_sock.handler = @event_publisher - @zmq_loop.register( @event_sock ) - - @checkpoint_timer = self.start_state_checkpointing - @zmq_loop.register_timer( @checkpoint_timer ) if @checkpoint_timer - self.setup_signal_timer self.start_time = Time.now self.log.debug "Manager running." - @zmq_loop.start + return self.zmq_loop.start end - ### Create the ZMQ API socket if necessary. - def setup_sockets - self.log.debug "Setting up sockets" - @tree_sock = self.setup_tree_socket - @event_sock = self.setup_event_socket - end - - ### Set up the ZMQ REP socket for the Tree API. def setup_tree_socket sock = Arborist.zmq_context.socket( :REP ) self.log.debug " binding the tree API socket (%#0x) to %p" % [ sock.object_id * 2, Arborist.tree_api_url ] @@ -212,11 +256,11 @@ ### Set up the ZMQ PUB socket for published events. def setup_event_socket sock = Arborist.zmq_context.socket( :PUB ) self.log.debug " binding the event socket (%#0x) to %p" % [ sock.object_id * 2, Arborist.event_api_url ] - sock.linger = 0 + sock.linger = self.linger sock.bind( Arborist.event_api_url ) return ZMQ::Pollitem.new( sock, ZMQ::POLLOUT ) end @@ -229,11 +273,15 @@ ### Stop the manager. def stop self.log.info "Stopping the manager." self.ignore_signals self.cancel_signal_timer - @zmq_loop.stop if @zmq_loop + + @api_handler.shutdown + @event_publisher.shutdown + + self.zmq_loop.stop end # # :section: Node state saving/reloading @@ -285,18 +333,30 @@ return true end - ### Start a timer that will save a snapshot of the node tree's state to the state + ### Make a ZMQ::Timer that will publish a heartbeat event at a configurable interval. + def make_heartbeat_timer + interval = self.class.heartbeat_frequency || CONFIG_DEFAULTS[ :heartbeat_frequency ] + + self.log.info "Setting up to heartbeat every %dms" % [ interval ] + heartbeat_timer = ZMQ::Timer.new( (interval/1000.0), 0 ) do + self.publish_heartbeat_event + end + return heartbeat_timer + end + + + ### Make a ZMQ::Timer that will save a snapshot of the node tree's state to the state ### file on a configured interval if it's configured. - def start_state_checkpointing + def make_checkpoint_timer return nil unless self.class.state_file interval = self.class.checkpoint_frequency or return nil - self.log.info "Setting up node state checkpoint every %ds" % [ interval ] - checkpoint_timer = ZMQ::Timer.new( interval, 0 ) do + self.log.info "Setting up node state checkpoint every %dms" % [ interval ] + checkpoint_timer = ZMQ::Timer.new( (interval/1000.0), 0 ) do self.save_node_states end return checkpoint_timer end @@ -309,19 +369,19 @@ # ### Set up a periodic ZMQ timer to check for queued signals and handle them. def setup_signal_timer @signal_timer = ZMQ::Timer.new( SIGNAL_INTERVAL, 0, self.method(:process_signal_queue) ) - @zmq_loop.register_timer( @signal_timer ) + self.zmq_loop.register_timer( @signal_timer ) end ### Disable the timer that checks for incoming signals def cancel_signal_timer - if @signal_timer - @signal_timer.cancel - @zmq_loop.cancel_timer( @signal_timer ) + if self.signal_timer + self.signal_timer.cancel + self.zmq_loop.cancel_timer( self.signal_timer ) end end ### Set up signal handlers for common signals that will shut down, restart, etc. @@ -401,10 +461,17 @@ self.log.info "Checkpoint: User signal." self.save_node_states end + ### Simulate the receipt of the specified +signal+ (probably only useful + ### in testing). + def simulate_signal( signal ) + Thread.main[:signal_queue] << signal.to_sym + end + + # # :section: Tree API # ### Add nodes yielded from the specified +enumerator+ into the manager's @@ -459,11 +526,11 @@ self.nodes[ identifier ] = node end if self.tree_built? self.link_node( node ) - node.handle_event( Arborist::Event.create(:sys_node_added, node) ) + self.publish_system_event( 'node_added', node: identifier ) end end ### Link the node to other nodes in the tree. @@ -482,11 +549,11 @@ return unless node raise "Can't remove an operational node" if node.operational? self.log.info "Removing node %p" % [ node ] - node.handle_event( Arborist::Event.create(:sys_node_removed, node) ) + self.publish_system_event( 'node_removed', node: node.identifier ) node.children.each do |identifier, child_node| self.remove_node( child_node ) end if parent_node = self.nodes[ node.parent || '_' ] @@ -553,10 +620,17 @@ # # Tree-traversal API # + + ### Return the current root node. + def root_node + return self.nodes[ '_' ] + end + + ### Yield each node in a depth-first traversal of the manager's tree ### to the specified +block+, or return an Enumerator if no block is given. def all_nodes( &block ) iter = self.enumerator_for( self.root ) return iter.each( &block ) if block @@ -634,9 +708,24 @@ self.log.debug "Registering subscription %p" % [ subscription ] node.add_subscription( subscription ) self.log.debug " adding '%s' to the subscriptions hash." % [ subscription.id ] self.subscriptions[ subscription.id ] = node self.log.debug " subscriptions hash: %#0x" % [ self.subscriptions.object_id ] + end + + + ### Publish a system event that observers can watch for to detect restarts. + def publish_heartbeat_event + self.publish_system_event( 'heartbeat', run_id: self.run_id ) + end + + + ### Publish an event with the specified +eventname+ and +data+. + def publish_system_event( eventname, **data ) + eventname = eventname.to_s + eventname = 'sys.' + eventname unless eventname.start_with?( 'sys.' ) + self.log.debug "Publishing %s event: %p." % [ eventname, data ] + self.event_publisher.publish( eventname, data ) end ### Create a subscription that publishes to the Manager's event publisher for ### the node with the specified +identifier+ and +event_pattern+, using the