lib/rflow/shard.rb in rflow-1.3.0 vs lib/rflow/shard.rb in rflow-1.3.1

- old
+ new

@@ -1,27 +1,38 @@ require 'rflow/child_process' class RFlow # An object implementation shared between two processes. The parent - # process will instantiate, configure, and run! a shard, at which - # point the parent will have access to the shard object and be able - # to monitor the underlying processes. The child implementation, - # running in a separate process, will not return from spawn!, but + # process will instantiate, configure, and run! a {Shard}, at which + # point the parent will have access to the {Shard} object and be able + # to monitor the underlying {Shard::Worker} processes. The child implementation, + # running in a separate process, will not return from +spawn!+, but # start an EventMachine reactor. class Shard + # An actual child process under the {Shard}, which coordinates a set of + # identical {Worker}s. class Worker < ChildProcess - attr_reader :shard, :index + # A reference to the {Shard} governing this {Worker}. + # @return [Shard] + attr_reader :shard + # Which worker index this is (for example, in a set of 3 {Worker}s, + # one would have index 0, one would have index 1, one would have index 2). + # @return [Integer] + attr_reader :index + def initialize(shard, index = 1) super("#{shard.name}-#{index}", 'Worker') @shard = shard @index = index # build at initialize time to fail fast @components = shard.config.components.map {|config| Component.build(self, config) } end + # Configure, connect, and actually start running RFlow components. + # @return [void] def run_process EM.run do begin # TODO: Monitor the master configure_components! @@ -36,10 +47,11 @@ end RFlow.logger.info 'Shutting down worker after EM stopped' end + protected def configure_components! RFlow.logger.debug 'Configuring components' @components.zip(shard.config.components.map(&:options)).each do |(component, config)| RFlow.logger.debug "Configuring component '#{component.name}' (#{component.uuid})" component.configure! config @@ -66,10 +78,13 @@ RFlow.logger.debug "Running component '#{component.name}' (#{component.uuid})" component.run! end end + public + # Shut down the {Worker}. Shuts down each component and kills EventMachine. + # @return [void] def shutdown!(signal) RFlow.logger.debug 'Shutting down components' @components.each do |component| RFlow.logger.debug "Shutting down component '#{component.name}' (#{component.uuid})" component.shutdown! @@ -77,19 +92,35 @@ EM.stop_event_loop super end end - attr_reader :config, :name, :count, :workers + # Reference to the {Shard}'s configuration. + # @return [Configuration::Shard] + attr_reader :config + # The {Shard}'s name. + # @return [String] + attr_reader :name + + # The count of workers that should be started. + # @return [Integer] + attr_reader :count + + # Reference to the actual {Worker}s. + # @return [Array<Worker>] + attr_reader :workers + def initialize(config) @config = config @uuid = config.uuid @name = config.name @count = config.count @workers = count.times.map {|i| Worker.new(self, i+1) } end + # Start the shard by spawning and starting all the workers. + # @return [void] def run! RFlow.logger.debug "Running shard #{name} with #{count} workers" workers.each(&:spawn!) RFlow.logger.debug "#{count} workers started for #{name}: #{workers.map { |w| "#{w.name} (#{w.pid})" }.join(', ')}"