lib/arborist/manager.rb in arborist-0.0.1.pre20160606141735 vs lib/arborist/manager.rb in arborist-0.0.1.pre20160829140603
- old
+ new
@@ -1,8 +1,9 @@
# -*- ruby -*-
#encoding: utf-8
+require 'securerandom'
require 'pathname'
require 'tempfile'
require 'configurability'
require 'loggability'
require 'rbczmq'
@@ -28,11 +29,13 @@
SIGNAL_INTERVAL = 0.5
# Configurability API -- set config defaults
CONFIG_DEFAULTS = {
state_file: nil,
- checkpoint_frequency: 30
+ checkpoint_frequency: 30000,
+ heartbeat_frequency: 1000,
+ linger: 5000
}
# Use the Arborist logger
log_to :arborist
@@ -44,21 +47,39 @@
##
# The Pathname of the file the manager's node tree state is saved to
singleton_attr_accessor :state_file
##
- # The number of seconds between automatic state checkpoints
+ # The number of milliseconds between automatic state checkpoints
singleton_attr_accessor :checkpoint_frequency
+ ##
+ # The number of milliseconds between heartbeat events
+ singleton_attr_accessor :heartbeat_frequency
+ ##
+ # The maximum amount of time to wait for pending events to be delivered during
+ # shutdown, in milliseconds.
+ singleton_attr_accessor :linger
+
+
### Configurability API -- configure the manager
def self::configure( config=nil )
config ||= {}
config = self.defaults.merge( config[:manager] || {} )
+ self.log.debug "Config is: %p" % [ config ]
+
self.state_file = config[:state_file] && Pathname( config[:state_file] )
+ self.linger = config[:linger].to_i
+ self.log.info "Linger configured to %p" % [ self.linger ]
+ self.heartbeat_frequency = config[:heartbeat_frequency].to_i ||
+ CONFIG_DEFAULTS[:heartbeat_frequency]
+ raise Arborist::ConfigError, "heartbeat frequency must be a positive non-zero integer" if
+ self.heartbeat_frequency <= 0
+
interval = config[:checkpoint_frequency].to_i
if interval && interval.nonzero?
self.checkpoint_frequency = interval
else
self.checkpoint_frequency = nil
@@ -70,35 +91,54 @@
# Instance methods
#
### Create a new Arborist::Manager.
def initialize
+ @run_id = SecureRandom.hex( 16 )
@root = Arborist::Node.create( :root )
- @nodes = {
- '_' => @root,
- }
+ @nodes = { '_' => @root }
+
@subscriptions = {}
@tree_built = false
@tree_sock = @event_sock = nil
@signal_timer = nil
@start_time = nil
- Thread.main[:signal_queue] = []
- @zmq_loop = nil
-
- @api_handler = nil
- @event_publisher = nil
@checkpoint_timer = nil
+ @linger = self.class.linger || Arborist::Manager::CONFIG_DEFAULTS[ :linger ]
+ self.log.info "Linger set to %p" % [ @linger ]
+
+ @zmq_loop = ZMQ::Loop.new
+ # @zmq_loop.verbose = true
+ @tree_sock = self.setup_tree_socket
+ @event_sock = self.setup_event_socket
+
+ @api_handler = Arborist::Manager::TreeAPI.new( @tree_sock, self )
+ @tree_sock.handler = @api_handler
+ @zmq_loop.register( @tree_sock )
+
+ @event_publisher = Arborist::Manager::EventPublisher.new( @event_sock, self, @zmq_loop )
+ @event_sock.handler = @event_publisher
+ @zmq_loop.register( @event_sock )
+
+ @heartbeat_timer = self.make_heartbeat_timer
+ @checkpoint_timer = self.make_checkpoint_timer
+
+ Thread.main[:signal_queue] = []
end
######
public
######
##
+ # A unique string used to identify different runs of the Manager
+ attr_reader :run_id
+
+ ##
# The root node of the tree.
attr_accessor :root
##
# The Hash of all loaded Nodes, keyed by their identifier
@@ -126,32 +166,50 @@
##
# Flag for marking when the tree is built successfully the first time
attr_predicate_accessor :tree_built
+ ##
+ # The maximum amount of time to wait for pending events to be delivered during
+ # shutdown, in milliseconds.
+ attr_reader :linger
+ ##
+ # The ZMQ::Timer that processes signals
+ attr_reader :signal_timer
+
+ ##
+ # The ZMQ::Timer that periodically checkpoints the manager's state (if it's configured to do so)
+ attr_reader :checkpoint_timer
+
+ ##
+ # The ZMQ::Timer that periodically publishes a heartbeat event
+ attr_reader :heartbeat_timer
+
+
#
# :section: Startup/Shutdown
#
### Setup sockets and start the event loop.
def run
self.log.info "Getting ready to start the manager."
- self.setup_sockets
+ self.publish_system_event( 'startup', start_time: Time.now.to_s, version: Arborist::VERSION )
+ self.register_timers
self.set_signal_handlers
self.start_accepting_requests
return self # For chaining
ensure
self.restore_signal_handlers
- if @zmq_loop
+ if self.zmq_loop
self.log.debug "Unregistering sockets."
- @zmq_loop.remove( @tree_sock )
+ self.zmq_loop.remove( @tree_sock )
@tree_sock.pollable.close
- @zmq_loop.remove( @event_sock )
+ self.zmq_loop.remove( @event_sock )
@event_sock.pollable.close
- @zmq_loop.cancel_timer( @checkpoint_timer ) if @checkpoint_timer
+ self.zmq_loop.cancel_timer( @checkpoint_timer ) if @checkpoint_timer
end
self.save_node_states
self.log.debug "Resetting ZMQ context"
@@ -159,47 +217,33 @@
end
### Returns true if the Manager is running.
def running?
- return @zmq_loop && @zmq_loop.running?
+ return self.zmq_loop && self.zmq_loop.running?
end
+ ### Register the Manager's timers.
+ def register_timers
+ self.zmq_loop.register_timer( self.heartbeat_timer )
+ self.zmq_loop.register_timer( self.checkpoint_timer ) if self.checkpoint_timer
+ end
+
+
### Start a loop, accepting a request and handling it.
def start_accepting_requests
self.log.debug "Starting the main loop"
- @zmq_loop = ZMQ::Loop.new
-
- @api_handler = Arborist::Manager::TreeAPI.new( @tree_sock, self )
- @tree_sock.handler = @api_handler
- @zmq_loop.register( @tree_sock )
-
- @event_publisher = Arborist::Manager::EventPublisher.new( @event_sock, self, @zmq_loop )
- @event_sock.handler = @event_publisher
- @zmq_loop.register( @event_sock )
-
- @checkpoint_timer = self.start_state_checkpointing
- @zmq_loop.register_timer( @checkpoint_timer ) if @checkpoint_timer
-
self.setup_signal_timer
self.start_time = Time.now
self.log.debug "Manager running."
- @zmq_loop.start
+ return self.zmq_loop.start
end
- ### Create the ZMQ API socket if necessary.
- def setup_sockets
- self.log.debug "Setting up sockets"
- @tree_sock = self.setup_tree_socket
- @event_sock = self.setup_event_socket
- end
-
-
### Set up the ZMQ REP socket for the Tree API.
def setup_tree_socket
sock = Arborist.zmq_context.socket( :REP )
self.log.debug " binding the tree API socket (%#0x) to %p" %
[ sock.object_id * 2, Arborist.tree_api_url ]
@@ -212,11 +256,11 @@
### Set up the ZMQ PUB socket for published events.
def setup_event_socket
sock = Arborist.zmq_context.socket( :PUB )
self.log.debug " binding the event socket (%#0x) to %p" %
[ sock.object_id * 2, Arborist.event_api_url ]
- sock.linger = 0
+ sock.linger = self.linger
sock.bind( Arborist.event_api_url )
return ZMQ::Pollitem.new( sock, ZMQ::POLLOUT )
end
@@ -229,11 +273,15 @@
### Stop the manager.
def stop
self.log.info "Stopping the manager."
self.ignore_signals
self.cancel_signal_timer
- @zmq_loop.stop if @zmq_loop
+
+ @api_handler.shutdown
+ @event_publisher.shutdown
+
+ self.zmq_loop.stop
end
#
# :section: Node state saving/reloading
@@ -285,18 +333,30 @@
return true
end
- ### Start a timer that will save a snapshot of the node tree's state to the state
+ ### Make a ZMQ::Timer that will publish a heartbeat event at a configurable interval.
+ def make_heartbeat_timer
+ interval = self.class.heartbeat_frequency || CONFIG_DEFAULTS[ :heartbeat_frequency ]
+
+ self.log.info "Setting up to heartbeat every %dms" % [ interval ]
+ heartbeat_timer = ZMQ::Timer.new( (interval/1000.0), 0 ) do
+ self.publish_heartbeat_event
+ end
+ return heartbeat_timer
+ end
+
+
+ ### Make a ZMQ::Timer that will save a snapshot of the node tree's state to the state
### file on a configured interval if it's configured.
- def start_state_checkpointing
+ def make_checkpoint_timer
return nil unless self.class.state_file
interval = self.class.checkpoint_frequency or return nil
- self.log.info "Setting up node state checkpoint every %ds" % [ interval ]
- checkpoint_timer = ZMQ::Timer.new( interval, 0 ) do
+ self.log.info "Setting up node state checkpoint every %dms" % [ interval ]
+ checkpoint_timer = ZMQ::Timer.new( (interval/1000.0), 0 ) do
self.save_node_states
end
return checkpoint_timer
end
@@ -309,19 +369,19 @@
#
### Set up a periodic ZMQ timer to check for queued signals and handle them.
def setup_signal_timer
@signal_timer = ZMQ::Timer.new( SIGNAL_INTERVAL, 0, self.method(:process_signal_queue) )
- @zmq_loop.register_timer( @signal_timer )
+ self.zmq_loop.register_timer( @signal_timer )
end
### Disable the timer that checks for incoming signals
def cancel_signal_timer
- if @signal_timer
- @signal_timer.cancel
- @zmq_loop.cancel_timer( @signal_timer )
+ if self.signal_timer
+ self.signal_timer.cancel
+ self.zmq_loop.cancel_timer( self.signal_timer )
end
end
### Set up signal handlers for common signals that will shut down, restart, etc.
@@ -401,10 +461,17 @@
self.log.info "Checkpoint: User signal."
self.save_node_states
end
+ ### Simulate the receipt of the specified +signal+ (probably only useful
+ ### in testing).
+ def simulate_signal( signal )
+ Thread.main[:signal_queue] << signal.to_sym
+ end
+
+
#
# :section: Tree API
#
### Add nodes yielded from the specified +enumerator+ into the manager's
@@ -459,11 +526,11 @@
self.nodes[ identifier ] = node
end
if self.tree_built?
self.link_node( node )
- node.handle_event( Arborist::Event.create(:sys_node_added, node) )
+ self.publish_system_event( 'node_added', node: identifier )
end
end
### Link the node to other nodes in the tree.
@@ -482,11 +549,11 @@
return unless node
raise "Can't remove an operational node" if node.operational?
self.log.info "Removing node %p" % [ node ]
- node.handle_event( Arborist::Event.create(:sys_node_removed, node) )
+ self.publish_system_event( 'node_removed', node: node.identifier )
node.children.each do |identifier, child_node|
self.remove_node( child_node )
end
if parent_node = self.nodes[ node.parent || '_' ]
@@ -553,10 +620,17 @@
#
# Tree-traversal API
#
+
+ ### Return the current root node.
+ def root_node
+ return self.nodes[ '_' ]
+ end
+
+
### Yield each node in a depth-first traversal of the manager's tree
### to the specified +block+, or return an Enumerator if no block is given.
def all_nodes( &block )
iter = self.enumerator_for( self.root )
return iter.each( &block ) if block
@@ -634,9 +708,24 @@
self.log.debug "Registering subscription %p" % [ subscription ]
node.add_subscription( subscription )
self.log.debug " adding '%s' to the subscriptions hash." % [ subscription.id ]
self.subscriptions[ subscription.id ] = node
self.log.debug " subscriptions hash: %#0x" % [ self.subscriptions.object_id ]
+ end
+
+
+ ### Publish a system event that observers can watch for to detect restarts.
+ def publish_heartbeat_event
+ self.publish_system_event( 'heartbeat', run_id: self.run_id )
+ end
+
+
+ ### Publish an event with the specified +eventname+ and +data+.
+ def publish_system_event( eventname, **data )
+ eventname = eventname.to_s
+ eventname = 'sys.' + eventname unless eventname.start_with?( 'sys.' )
+ self.log.debug "Publishing %s event: %p." % [ eventname, data ]
+ self.event_publisher.publish( eventname, data )
end
### Create a subscription that publishes to the Manager's event publisher for
### the node with the specified +identifier+ and +event_pattern+, using the