lib/rflow/master.rb in rflow-1.0.0a1 vs lib/rflow/master.rb in rflow-1.0.0a2

- old
+ new

@@ -1,127 +1,45 @@ +require 'rflow/daemon_process' require 'rflow/pid_file' require 'rflow/shard' class RFlow - class Master + class Master < DaemonProcess + attr_reader :shards - attr_accessor :name, :pid_file, :ready_write - attr_accessor :shards - def initialize(config) - @name = config['rflow.application_name'] + super(config['rflow.application_name'], 'Master') @pid_file = PIDFile.new(config['rflow.pid_file_path']) - @shards = config.shards.map do |shard_config| - Shard.new(shard_config) - end + @shards = config.shards.map {|config| Shard.new(config) } end - def handle_signals - # Gracefully shutdown on termination signals - ['SIGTERM', 'SIGINT', 'SIGQUIT', 'SIGCHLD'].each do |signal| - Signal.trap signal do - # Log4r and traps don't mix, so we need to put it in another thread - Thread.new { shutdown(signal) }.join - end - end + def run! + write_pid_file + super + ensure + remove_pid_file + end - # Reopen logs on USR1 - ['SIGUSR1'].each do |signal| - Signal.trap signal do - Thread.new do - RFlow.logger.reopen - signal_workers(signal) - end.join - end - end + def spawn_subprocesses + shards.each(&:run!) + end - # Toggle log level on USR2 - ['SIGUSR2'].each do |signal| - Signal.trap signal do - Thread.new do - RFlow.logger.toggle_log_level - signal_workers(signal) - end.join - end - end + def subprocesses + shards.flat_map(&:workers) end - def run - Log4r::NDC.clear - Log4r::NDC.push name - $0 = name - - shards.each {|s| s.run!} - - handle_signals - - # Signal the grandparent that we are running - if ready_write - ready_write.syswrite($$.to_s) - ready_write.close rescue nil - end - - pid_file.write - - RFlow.logger.info "Master started" - + def run_process EM.run do # TODO: Monitor the workers end - - @pid_file.safe_unlink end - def daemonize! - RFlow.logger.info "#{name} daemonizing" - - ready_read, @ready_write = IO.pipe - [ready_read, @ready_write].each { |io| io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) } - - grandparent = $$ - - if fork - # Grandparent waits for a PID on the pipe indicating that the - # master successfully started. - @ready_write.close # grandparent does not write - master_pid = (ready_read.readpartial(16) rescue nil).to_i - unless master_pid > 1 - RFlow.logger.error "Master failed to start" - exit! 1 - end - RFlow.logger.info "Master indicated successful daemonization" - exit 0 - end - - Process.daemon(true, true) - - ready_read.close # master does not read - - # Close standard IO - $stdout.sync = $stderr.sync = true - $stdin.binmode; $stdout.binmode; $stderr.binmode - begin; $stdin.reopen "/dev/null"; rescue ::Exception; end - begin; $stdout.reopen "/dev/null"; rescue ::Exception; end - begin; $stderr.reopen "/dev/null"; rescue ::Exception; end - - $$ + def shutdown!(reason) + remove_pid_file + super end - def signal_workers(signal) - shards.each do |shard| - shard.workers.each do |worker| - RFlow.logger.info "Signalling #{worker.name} with #{signal}" - Process.kill(signal, worker.pid) - end - end - end - - def shutdown(reason) - RFlow.logger.info "#{name} shutting down due to #{reason}" - signal_workers('QUIT') - pid_file.safe_unlink - RFlow.logger.info "#{name} exiting" - exit 0 - end - + private + def write_pid_file; @pid_file.write; end + def remove_pid_file; @pid_file.safe_unlink; end end end