lib/rflow/master.rb in rflow-1.0.0a2 vs lib/rflow/master.rb in rflow-1.0.0a3
- old
+ new
@@ -1,31 +1,38 @@
require 'rflow/daemon_process'
require 'rflow/pid_file'
require 'rflow/shard'
+require 'rflow/broker'
class RFlow
class Master < DaemonProcess
attr_reader :shards
+ attr_reader :brokers
def initialize(config)
super(config['rflow.application_name'], 'Master')
@pid_file = PIDFile.new(config['rflow.pid_file_path'])
@shards = config.shards.map {|config| Shard.new(config) }
+ @brokers = config.connections.flat_map(&:brokers).map {|config| Broker.build(config) }
end
def run!
write_pid_file
super
ensure
remove_pid_file
end
def spawn_subprocesses
+ RFlow.logger.debug "Running #{brokers.count} brokers" if brokers.count > 0
+ brokers.each(&:spawn!)
+ RFlow.logger.debug "#{brokers.count} brokers started: #{brokers.map { |w| "#{w.name} (#{w.pid})" }.join(", ")}" if brokers.count > 0
+
shards.each(&:run!)
end
def subprocesses
- shards.flat_map(&:workers)
+ brokers + shards.flat_map(&:workers)
end
def run_process
EM.run do
# TODO: Monitor the workers