lib/rflow/master.rb in rflow-1.0.0a2 vs lib/rflow/master.rb in rflow-1.0.0a3

- old
+ new

@@ -1,31 +1,38 @@ require 'rflow/daemon_process' require 'rflow/pid_file' require 'rflow/shard' +require 'rflow/broker' class RFlow class Master < DaemonProcess attr_reader :shards + attr_reader :brokers def initialize(config) super(config['rflow.application_name'], 'Master') @pid_file = PIDFile.new(config['rflow.pid_file_path']) @shards = config.shards.map {|config| Shard.new(config) } + @brokers = config.connections.flat_map(&:brokers).map {|config| Broker.build(config) } end def run! write_pid_file super ensure remove_pid_file end def spawn_subprocesses + RFlow.logger.debug "Running #{brokers.count} brokers" if brokers.count > 0 + brokers.each(&:spawn!) + RFlow.logger.debug "#{brokers.count} brokers started: #{brokers.map { |w| "#{w.name} (#{w.pid})" }.join(", ")}" if brokers.count > 0 + shards.each(&:run!) end def subprocesses - shards.flat_map(&:workers) + brokers + shards.flat_map(&:workers) end def run_process EM.run do # TODO: Monitor the workers