lib/bluepill/process.rb in evented_bluepill-0.0.46 vs lib/bluepill/process.rb in evented_bluepill-0.0.47

- old
+ new

@@ -1,39 +1,63 @@ -require "state_machine" -require "daemons" +# -*- encoding: utf-8 -*- +require 'state_machine' +require 'daemons' + module Bluepill + class ProcessTimer < Coolio::TimerWatcher + attr_accessor :process + + def initialize(process) + self.process = process + super(1, true) + end + + def on_timer + return if self.process.skipping_ticks? + self.process.skip_ticks_until = nil + + # clear the memoization per tick + self.process.process_running = nil + + # run state machine transitions + self.process.tick + + self.process.refresh_children! if self.process.up? and self.process.monitor_children? + end + end + class Process CONFIGURABLE_ATTRIBUTES = [ - :start_command, - :stop_command, - :restart_command, - + :start_command, + :stop_command, + :restart_command, + :stdout, :stderr, :stdin, - - :daemonize, - :pid_file, + + :daemonize, + :pid_file, :working_dir, :environment, - - :start_grace_time, - :stop_grace_time, + + :start_grace_time, + :stop_grace_time, :restart_grace_time, - + :uid, :gid, - + :monitor_children, - :child_process_template + :child_process_factory ] - - attr_accessor :name, :watches, :triggers, :logger, :skip_ticks_until + + attr_accessor :name, :watches, :triggers, :logger, :skip_ticks_until, :process_running attr_accessor *CONFIGURABLE_ATTRIBUTES - attr_reader :children, :statistics - + attr_reader :children_timer, :statistics, :timer + state_machine :initial => :unmonitored do # These are the idle states, i.e. only an event (either external or internal) will trigger a transition. # The distinction between down and unmonitored is that down # means we know it is not running and unmonitored is that we don't care if it's running. state :unmonitored, :up, :down @@ -45,19 +69,19 @@ transition :starting => :up, :if => :process_running? transition :starting => :down, :unless => :process_running? transition :up => :up, :if => :process_running? transition :up => :down, :unless => :process_running? - + # The process failed to die after entering the stopping state. Change the state to reflect # reality. - transition :stopping => :up, :if => :process_running? + transition :stopping => :up, :if => :process_running? transition :stopping => :down, :unless => :process_running? - + transition :down => :up, :if => :process_running? transition :down => :starting, :unless => :process_running? - + transition :restarting => :up, :if => :process_running? transition :restarting => :down, :unless => :process_running? end event :start do @@ -82,129 +106,102 @@ after_transition any => :stopping, :do => :stop_process after_transition any => :restarting, :do => :restart_process after_transition any => any, :do => :record_transition end - - def initialize(process_name, options = {}) + + def initialize(process_name, checks, options = {}) @name = process_name - @event_mutex = Monitor.new @transition_history = Util::RotationalArray.new(10) @watches = [] @triggers = [] - @children = [] + @children_timer = [] @statistics = ProcessStatistics.new - + @actual_pid = options[:actual_pid] + self.logger = options[:logger] + + checks.each do |name, opts| + if Bluepill::Trigger[name] + self.add_trigger(name, opts) + else + self.add_watch(name, opts) + end + end + # These defaults are overriden below if it's configured to be something else. @monitor_children = false @start_grace_time = @stop_grace_time = @restart_grace_time = 3 @environment = {} - + CONFIGURABLE_ATTRIBUTES.each do |attribute_name| self.send("#{attribute_name}=", options[attribute_name]) if options.has_key?(attribute_name) end - + # Let state_machine do its initialization stuff super() # no arguments intentional end - def tick - return if self.skipping_ticks? - self.skip_ticks_until = nil - - # clear the memoization per tick - @process_running = nil - - # run state machine transitions - super - - if self.up? - self.run_watches - - if self.monitor_children? - refresh_children! - children.each {|child| child.tick} - end - end - end - def logger=(logger) @logger = logger self.watches.each {|w| w.logger = logger } self.triggers.each {|t| t.logger = logger } end - + # State machine methods def dispatch!(event, reason = nil) - @event_mutex.synchronize do - @statistics.record_event(event, reason) - self.send("#{event}") - end + @statistics.record_event(event, reason) + self.send("#{event}") end - + def record_transition(transition) unless transition.loopback? - @transitioned = true - # When a process changes state, we should clear the memory of all the watches self.watches.each { |w| w.clear_history! } - + # Also, when a process changes state, we should re-populate its child list if self.monitor_children? self.logger.warning "Clearing child list" - self.children.clear + self.children_timer.each {|timer| timer.detach } + self.children_timer.each {|timer| timer.process.watches.each {|w| w.detach }} + self.children_timer.clear end logger.info "Going from #{transition.from_name} => #{transition.to_name}" end end - + def notify_triggers(transition) self.triggers.each {|trigger| trigger.notify(transition)} end - + # Watch related methods def add_watch(name, options = {}) - self.watches << ConditionWatch.new(name, options.merge(:logger => self.logger)) + self.watches << ConditionWatch.new(name, self, options.merge(:logger => self.logger)) end - + def add_trigger(name, options = {}) self.triggers << Trigger[name].new(self, options.merge(:logger => self.logger)) end - def run_watches - now = Time.now.to_i - - threads = self.watches.collect do |watch| - [watch, Thread.new { Thread.current[:events] = watch.run(self.actual_pid, now) }] - end - - @transitioned = false - - threads.inject([]) do |events, (watch, thread)| - thread.join - if thread[:events].size > 0 - logger.info "#{watch.name} dispatched: #{thread[:events].join(',')}" - thread[:events].each do |event| - events << [event, watch.to_s] - end - end - events - end.each do |(event, reason)| - break if @transitioned - self.dispatch!(event, reason) - end - end - def determine_initial_state if self.process_running?(true) self.state = 'up' else # TODO: or "unmonitored" if bluepill was started in no auto-start mode. self.state = 'down' end + + # TODO move into right position + self.set_timer end - + + def set_timer + @timer = Bluepill::ProcessTimer.new(self) + Bluepill::Event.attach(self.timer) + + self.watches.each {|w| Bluepill::Event.attach(w) } + end + def handle_user_command(cmd) case cmd when "start" if self.process_running?(true) logger.warning("Refusing to re-run start command on an already running process.") @@ -221,103 +218,103 @@ # scheduled events gets cleared triggers.each {|t| t.reset! } dispatch!(:unmonitor, "user initiated") end end - + # System Process Methods def process_running?(force = false) @process_running = nil if force # clear existing state if forced - + @process_running ||= signal_process(0) # the process isn't running, so we should clear the PID self.clear_pid unless @process_running @process_running end - + def start_process logger.warning "Executing start command: #{start_command}" - + if self.daemonize? System.daemonize(start_command, self.system_command_options) - + else # This is a self-daemonizing process with_timeout(start_grace_time) do result = System.execute_blocking(start_command, self.system_command_options) - + unless result[:exit_code].zero? logger.warning "Start command execution returned non-zero exit code:" logger.warning result.inspect - end + end end end - + self.skip_ticks_for(start_grace_time) end - - def stop_process + + def stop_process if stop_command cmd = self.prepare_command(stop_command) logger.warning "Executing stop command: #{cmd}" - + with_timeout(stop_grace_time) do result = System.execute_blocking(cmd, self.system_command_options) - + unless result[:exit_code].zero? logger.warning "Stop command execution returned non-zero exit code:" logger.warning result.inspect end end - + else logger.warning "Executing default stop command. Sending TERM signal to #{actual_pid}" signal_process("TERM") end self.unlink_pid # TODO: we only write the pid file if we daemonize, should we only unlink it if we daemonize? - + self.skip_ticks_for(stop_grace_time) end - + def restart_process if restart_command cmd = self.prepare_command(restart_command) - + logger.warning "Executing restart command: #{cmd}" - + with_timeout(restart_grace_time) do result = System.execute_blocking(cmd, self.system_command_options) unless result[:exit_code].zero? logger.warning "Restart command execution returned non-zero exit code:" logger.warning result.inspect end end - + self.skip_ticks_for(restart_grace_time) else logger.warning "No restart_command specified. Must stop and start to restart" self.stop_process # the tick will bring it back. end end - + def daemonize? !!self.daemonize end - + def monitor_children? !!self.monitor_children end - + def signal_process(code) ::Process.kill(code, actual_pid) true rescue false end - + def actual_pid @actual_pid ||= begin if pid_file if File.exists?(pid_file) str = File.read(pid_file) @@ -327,96 +324,89 @@ nil end end end end - + def actual_pid=(pid) @actual_pid = pid end - + def clear_pid @actual_pid = nil end - + def unlink_pid File.unlink(pid_file) if pid_file && File.exists?(pid_file) end - + # Internal State Methods def skip_ticks_for(seconds) # TODO: should this be addative or longest wins? # i.e. if two calls for skip_ticks_for come in for 5 and 10, should it skip for 10 or 15? self.skip_ticks_until = (self.skip_ticks_until || Time.now.to_i) + seconds.to_i end - + def skipping_ticks? self.skip_ticks_until && self.skip_ticks_until > Time.now.to_i end - + def refresh_children! # First prune the list of dead children - @children.delete_if {|child| !child.process_running?(true) } - + dead_children = self.children_timer.select {|timer| !timer.process.process_running?(true) } + dead_children.each {|timer| timer.detach } + dead_children.each {|timer| timer.process.watches.each {|w| w.detach }} + @children_timer -= dead_children + # Add new found children to the list - new_children_pids = System.get_children(self.actual_pid) - @children.map {|child| child.actual_pid} - + new_children_pids = System.get_children(self.actual_pid) - self.children_timer.map {|timer| timer.process.actual_pid} + unless new_children_pids.empty? - logger.info "Existing children: #{@children.collect{|c| c.actual_pid}.join(",")}. Got new children: #{new_children_pids.inspect} for #{actual_pid}" + logger.info "Existing children: #{self.children_timer.collect{|c| c.process.actual_pid}.join(",")}. Got new children: #{new_children_pids.inspect} for #{actual_pid}" end - + # Construct a new process wrapper for each new found children new_children_pids.each do |child_pid| - child = self.child_process_template.deep_copy - - child.name = "<child(pid:#{child_pid})>" - child.actual_pid = child_pid - child.logger = self.logger.prefix_with(child.name) - + name = "<child(pid:#{child_pid})>" + logger = self.logger.prefix_with(name) + + child = self.child_process_factory.create_child_process(name, child_pid, logger) + child.initialize_state_machines child.state = "up" - - @children << child + + child.set_timer + child.watches.each {|w| w.process = child } + + @children_timer << child.timer end end - def deep_copy - # TODO: This is a kludge. Ideally, process templates - # would be facotries, and not a template object. - mutex, triggers, @event_mutex, @triggers = @event_mutex, @triggers, nil, nil - clone = Marshal.load(Marshal.dump(self)) - clone.instance_variable_set("@event_mutex", Monitor.new) - clone.instance_variable_set("@triggers", triggers.collect{ |t| t.deep_copy }) - @event_mutex = mutex - @triggers = triggers - clone - end - def prepare_command(command) command.to_s.gsub("{{PID}}", actual_pid.to_s) end - + def system_command_options { - :uid => self.uid, - :gid => self.gid, + :uid => self.uid, + :gid => self.gid, :working_dir => self.working_dir, :environment => self.environment, :pid_file => self.pid_file, :logger => self.logger, :stdin => self.stdin, :stdout => self.stdout, :stderr => self.stderr } end - + def with_timeout(secs, &blk) Timeout.timeout(secs.to_f, &blk) - + rescue Timeout::Error logger.err "Execution is taking longer than expected. Unmonitoring." logger.err "Did you forget to tell bluepill to daemonize this process?" self.dispatch!("unmonitor") end end end - +