lib/rflow/shard.rb in rflow-1.0.0a3 vs lib/rflow/shard.rb in rflow-1.0.0a4

- old
+ new

@@ -7,33 +7,40 @@ # to monitor the underlying processes. The child implementation, # running in a separate process, will not return from spawn!, but # start an EventMachine reactor. class Shard class Worker < ChildProcess + attr_reader :shard, :index + def initialize(shard, index = 1) super("#{shard.name}-#{index}", 'Worker') @shard = shard + @index = index # build at initialize time to fail fast - @components = shard.config.components.map {|config| Component.build(config) } + @components = shard.config.components.map {|config| Component.build(self, config) } end def run_process EM.run do - # TODO: Monitor the master - configure_components! - connect_components! - # TODO: need to do proper node synchronization for ZMQ to remove sleep - sleep 1 - run_components! + begin + # TODO: Monitor the master + configure_components! + connect_components! + # TODO: need to do proper node synchronization for ZMQ to remove sleep + sleep 1 + run_components! + rescue Exception => e + RFlow.logger.error "Error in worker, shutting down: #{e.class.name}: #{e.message}, because: #{e.backtrace.inspect}" + end end RFlow.logger.info "Shutting down worker after EM stopped" end def configure_components! RFlow.logger.debug "Configuring components" - @components.zip(@shard.config.components.map(&:options)).each do |(component, config)| + @components.zip(shard.config.components.map(&:options)).each do |(component, config)| RFlow.logger.debug "Configuring component '#{component.name}' (#{component.uuid})" component.configure! config end end