lib/rflow/shard.rb in rflow-1.0.0a1 vs lib/rflow/shard.rb in rflow-1.0.0a2

- old
+ new

@@ -1,148 +1,81 @@ -class RFlow +require 'rflow/child_process' +class RFlow # An object implementation shared between two processes. The parent # process will instantiate, configure, and run! a shard, at which # point the parent will have access to the shard object and be able # to monitor the underlying processes. The child implementation, - # running in a separate process, will not return from run!, but - # start an Eventmachine reactor, connect the components, and not - # return + # running in a separate process, will not return from spawn!, but + # start an EventMachine reactor. class Shard - - # An internal class that represents an instance of the running - # shard, i.e. a process - class Worker - - attr_accessor :shard, :index, :name, :pid - attr_accessor :components - attr_accessor :worker_read, :master_write - - def initialize(shard, index=1) + class Worker < ChildProcess + def initialize(shard, index = 1) + super("#{shard.name}-#{index}", 'Worker') @shard = shard - @index = index - @name = "#{shard.name}-#{index}" - # Set up the IPC pipes - @worker_read, @master_write = IO.pipe - [@worker_read, @master_write].each do |io| - io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) - end - - @components = shard.config.components.map do |component_config| - Component.build(component_config) - end + # build at initialize time to fail fast + @components = shard.config.components.map {|config| Component.build(config) } end - def handle_signals - ['SIGTERM', 'SIGINT', 'SIGQUIT'].each do |signal| - Signal.trap signal do - Thread.new { shutdown(signal) }.join - end + def run_process + EM.run do + # TODO: Monitor the master + configure_components! + connect_components! + # TODO: need to do proper node synchronization for ZMQ to remove sleep + sleep 1 + run_components! end - ['SIGUSR1'].each do |signal| - Signal.trap signal do - Thread.new do - RFlow.logger.reopen - end.join - end - end - - # Toggle log level on USR2 - ['SIGUSR2'].each do |signal| - Signal.trap signal do - Thread.new do - RFlow.logger.toggle_log_level - end.join - end - end + RFlow.logger.info "Shutting down worker after EM stopped" end - # Launch another process to execute the shard. The parent - # process retains the original worker object (with pid and IPC - # pipe) to allow for process management - def launch - @pid = Process.fork do - @master_write.close - - handle_signals - - $0 += " #{name}" - Log4r::NDC.push name - - RFlow.logger.info "Worker started" - EM.run do - # TODO: Monitor the master - - connect_components! - # TODO: need to do proper node synchronization for ZMQ to - # remove sleep - sleep 1 - run_components! - end - - RFlow.logger.info "Shutting down worker after EM stopped" + def configure_components! + RFlow.logger.debug "Configuring components" + @components.zip(@shard.config.components.map(&:options)).each do |(component, config)| + RFlow.logger.debug "Configuring component '#{component.name}' (#{component.uuid})" + component.configure! config end - - @worker_read.close - self end - # Send a command to each component to tell them to connect their - # ports via their connections def connect_components! RFlow.logger.debug "Connecting components" - components.each do |component| + @components.each do |component| RFlow.logger.debug "Connecting component '#{component.name}' (#{component.uuid})" component.connect! end end - # Start each component running def run_components! RFlow.logger.debug "Running components" - components.each do |component| + @components.each do |component| RFlow.logger.debug "Running component '#{component.name}' (#{component.uuid})" component.run! end end + + def shutdown!(signal) + EM.stop_event_loop + super + end end + attr_reader :config, :name, :count, :workers - attr_reader :config, :uuid, :name, :count - attr_accessor :workers - - def initialize(config) @config = config @uuid = config.uuid @name = config.name @count = config.count - - @workers = count.times.map do |i| - Worker.new(self, i+1) - end + @workers = count.times.map {|i| Worker.new(self, i+1) } end - def run! RFlow.logger.debug "Running shard #{name} with #{count} workers" - workers.each do |worker| - worker.launch - end + workers.each(&:spawn!) RFlow.logger.debug "#{count} workers started for #{name}: #{workers.map { |w| "#{w.name} (#{w.pid})" }.join(", ")}" workers - end - - - # TODO: Implement - def shutdown! - end - - - # TODO: Implement - def cleanup! end end end