lib/rflow/shard.rb in rflow-1.0.0a3 vs lib/rflow/shard.rb in rflow-1.0.0a4
- old
+ new
@@ -7,33 +7,40 @@
# to monitor the underlying processes. The child implementation,
# running in a separate process, will not return from spawn!, but
# start an EventMachine reactor.
class Shard
class Worker < ChildProcess
+ attr_reader :shard, :index
+
def initialize(shard, index = 1)
super("#{shard.name}-#{index}", 'Worker')
@shard = shard
+ @index = index
# build at initialize time to fail fast
- @components = shard.config.components.map {|config| Component.build(config) }
+ @components = shard.config.components.map {|config| Component.build(self, config) }
end
def run_process
EM.run do
- # TODO: Monitor the master
- configure_components!
- connect_components!
- # TODO: need to do proper node synchronization for ZMQ to remove sleep
- sleep 1
- run_components!
+ begin
+ # TODO: Monitor the master
+ configure_components!
+ connect_components!
+ # TODO: need to do proper node synchronization for ZMQ to remove sleep
+ sleep 1
+ run_components!
+ rescue Exception => e
+ RFlow.logger.error "Error in worker, shutting down: #{e.class.name}: #{e.message}, because: #{e.backtrace.inspect}"
+ end
end
RFlow.logger.info "Shutting down worker after EM stopped"
end
def configure_components!
RFlow.logger.debug "Configuring components"
- @components.zip(@shard.config.components.map(&:options)).each do |(component, config)|
+ @components.zip(shard.config.components.map(&:options)).each do |(component, config)|
RFlow.logger.debug "Configuring component '#{component.name}' (#{component.uuid})"
component.configure! config
end
end