lib/bluepill/process.rb in evented_bluepill-0.0.46 vs lib/bluepill/process.rb in evented_bluepill-0.0.47
- old
+ new
@@ -1,39 +1,63 @@
-require "state_machine"
-require "daemons"
+# -*- encoding: utf-8 -*-
+require 'state_machine'
+require 'daemons'
+
module Bluepill
+ class ProcessTimer < Coolio::TimerWatcher
+ attr_accessor :process
+
+ def initialize(process)
+ self.process = process
+ super(1, true)
+ end
+
+ def on_timer
+ return if self.process.skipping_ticks?
+ self.process.skip_ticks_until = nil
+
+ # clear the memoization per tick
+ self.process.process_running = nil
+
+ # run state machine transitions
+ self.process.tick
+
+ self.process.refresh_children! if self.process.up? and self.process.monitor_children?
+ end
+ end
+
class Process
CONFIGURABLE_ATTRIBUTES = [
- :start_command,
- :stop_command,
- :restart_command,
-
+ :start_command,
+ :stop_command,
+ :restart_command,
+
:stdout,
:stderr,
:stdin,
-
- :daemonize,
- :pid_file,
+
+ :daemonize,
+ :pid_file,
:working_dir,
:environment,
-
- :start_grace_time,
- :stop_grace_time,
+
+ :start_grace_time,
+ :stop_grace_time,
:restart_grace_time,
-
+
:uid,
:gid,
-
+
:monitor_children,
- :child_process_template
+ :child_process_factory
]
-
- attr_accessor :name, :watches, :triggers, :logger, :skip_ticks_until
+
+ attr_accessor :name, :watches, :triggers, :logger, :skip_ticks_until, :process_running
attr_accessor *CONFIGURABLE_ATTRIBUTES
- attr_reader :children, :statistics
-
+ attr_reader :children_timer, :statistics, :timer
+
state_machine :initial => :unmonitored do
# These are the idle states, i.e. only an event (either external or internal) will trigger a transition.
# The distinction between down and unmonitored is that down
# means we know it is not running and unmonitored is that we don't care if it's running.
state :unmonitored, :up, :down
@@ -45,19 +69,19 @@
transition :starting => :up, :if => :process_running?
transition :starting => :down, :unless => :process_running?
transition :up => :up, :if => :process_running?
transition :up => :down, :unless => :process_running?
-
+
# The process failed to die after entering the stopping state. Change the state to reflect
# reality.
- transition :stopping => :up, :if => :process_running?
+ transition :stopping => :up, :if => :process_running?
transition :stopping => :down, :unless => :process_running?
-
+
transition :down => :up, :if => :process_running?
transition :down => :starting, :unless => :process_running?
-
+
transition :restarting => :up, :if => :process_running?
transition :restarting => :down, :unless => :process_running?
end
event :start do
@@ -82,129 +106,102 @@
after_transition any => :stopping, :do => :stop_process
after_transition any => :restarting, :do => :restart_process
after_transition any => any, :do => :record_transition
end
-
- def initialize(process_name, options = {})
+
+ def initialize(process_name, checks, options = {})
@name = process_name
- @event_mutex = Monitor.new
@transition_history = Util::RotationalArray.new(10)
@watches = []
@triggers = []
- @children = []
+ @children_timer = []
@statistics = ProcessStatistics.new
-
+ @actual_pid = options[:actual_pid]
+ self.logger = options[:logger]
+
+ checks.each do |name, opts|
+ if Bluepill::Trigger[name]
+ self.add_trigger(name, opts)
+ else
+ self.add_watch(name, opts)
+ end
+ end
+
# These defaults are overriden below if it's configured to be something else.
@monitor_children = false
@start_grace_time = @stop_grace_time = @restart_grace_time = 3
@environment = {}
-
+
CONFIGURABLE_ATTRIBUTES.each do |attribute_name|
self.send("#{attribute_name}=", options[attribute_name]) if options.has_key?(attribute_name)
end
-
+
# Let state_machine do its initialization stuff
super() # no arguments intentional
end
- def tick
- return if self.skipping_ticks?
- self.skip_ticks_until = nil
-
- # clear the memoization per tick
- @process_running = nil
-
- # run state machine transitions
- super
-
- if self.up?
- self.run_watches
-
- if self.monitor_children?
- refresh_children!
- children.each {|child| child.tick}
- end
- end
- end
-
def logger=(logger)
@logger = logger
self.watches.each {|w| w.logger = logger }
self.triggers.each {|t| t.logger = logger }
end
-
+
# State machine methods
def dispatch!(event, reason = nil)
- @event_mutex.synchronize do
- @statistics.record_event(event, reason)
- self.send("#{event}")
- end
+ @statistics.record_event(event, reason)
+ self.send("#{event}")
end
-
+
def record_transition(transition)
unless transition.loopback?
- @transitioned = true
-
# When a process changes state, we should clear the memory of all the watches
self.watches.each { |w| w.clear_history! }
-
+
# Also, when a process changes state, we should re-populate its child list
if self.monitor_children?
self.logger.warning "Clearing child list"
- self.children.clear
+ self.children_timer.each {|timer| timer.detach }
+ self.children_timer.each {|timer| timer.process.watches.each {|w| w.detach }}
+ self.children_timer.clear
end
logger.info "Going from #{transition.from_name} => #{transition.to_name}"
end
end
-
+
def notify_triggers(transition)
self.triggers.each {|trigger| trigger.notify(transition)}
end
-
+
# Watch related methods
def add_watch(name, options = {})
- self.watches << ConditionWatch.new(name, options.merge(:logger => self.logger))
+ self.watches << ConditionWatch.new(name, self, options.merge(:logger => self.logger))
end
-
+
def add_trigger(name, options = {})
self.triggers << Trigger[name].new(self, options.merge(:logger => self.logger))
end
- def run_watches
- now = Time.now.to_i
-
- threads = self.watches.collect do |watch|
- [watch, Thread.new { Thread.current[:events] = watch.run(self.actual_pid, now) }]
- end
-
- @transitioned = false
-
- threads.inject([]) do |events, (watch, thread)|
- thread.join
- if thread[:events].size > 0
- logger.info "#{watch.name} dispatched: #{thread[:events].join(',')}"
- thread[:events].each do |event|
- events << [event, watch.to_s]
- end
- end
- events
- end.each do |(event, reason)|
- break if @transitioned
- self.dispatch!(event, reason)
- end
- end
-
def determine_initial_state
if self.process_running?(true)
self.state = 'up'
else
# TODO: or "unmonitored" if bluepill was started in no auto-start mode.
self.state = 'down'
end
+
+ # TODO move into right position
+ self.set_timer
end
-
+
+ def set_timer
+ @timer = Bluepill::ProcessTimer.new(self)
+ Bluepill::Event.attach(self.timer)
+
+ self.watches.each {|w| Bluepill::Event.attach(w) }
+ end
+
def handle_user_command(cmd)
case cmd
when "start"
if self.process_running?(true)
logger.warning("Refusing to re-run start command on an already running process.")
@@ -221,103 +218,103 @@
# scheduled events gets cleared
triggers.each {|t| t.reset! }
dispatch!(:unmonitor, "user initiated")
end
end
-
+
# System Process Methods
def process_running?(force = false)
@process_running = nil if force # clear existing state if forced
-
+
@process_running ||= signal_process(0)
# the process isn't running, so we should clear the PID
self.clear_pid unless @process_running
@process_running
end
-
+
def start_process
logger.warning "Executing start command: #{start_command}"
-
+
if self.daemonize?
System.daemonize(start_command, self.system_command_options)
-
+
else
# This is a self-daemonizing process
with_timeout(start_grace_time) do
result = System.execute_blocking(start_command, self.system_command_options)
-
+
unless result[:exit_code].zero?
logger.warning "Start command execution returned non-zero exit code:"
logger.warning result.inspect
- end
+ end
end
end
-
+
self.skip_ticks_for(start_grace_time)
end
-
- def stop_process
+
+ def stop_process
if stop_command
cmd = self.prepare_command(stop_command)
logger.warning "Executing stop command: #{cmd}"
-
+
with_timeout(stop_grace_time) do
result = System.execute_blocking(cmd, self.system_command_options)
-
+
unless result[:exit_code].zero?
logger.warning "Stop command execution returned non-zero exit code:"
logger.warning result.inspect
end
end
-
+
else
logger.warning "Executing default stop command. Sending TERM signal to #{actual_pid}"
signal_process("TERM")
end
self.unlink_pid # TODO: we only write the pid file if we daemonize, should we only unlink it if we daemonize?
-
+
self.skip_ticks_for(stop_grace_time)
end
-
+
def restart_process
if restart_command
cmd = self.prepare_command(restart_command)
-
+
logger.warning "Executing restart command: #{cmd}"
-
+
with_timeout(restart_grace_time) do
result = System.execute_blocking(cmd, self.system_command_options)
unless result[:exit_code].zero?
logger.warning "Restart command execution returned non-zero exit code:"
logger.warning result.inspect
end
end
-
+
self.skip_ticks_for(restart_grace_time)
else
logger.warning "No restart_command specified. Must stop and start to restart"
self.stop_process
# the tick will bring it back.
end
end
-
+
def daemonize?
!!self.daemonize
end
-
+
def monitor_children?
!!self.monitor_children
end
-
+
def signal_process(code)
::Process.kill(code, actual_pid)
true
rescue
false
end
-
+
def actual_pid
@actual_pid ||= begin
if pid_file
if File.exists?(pid_file)
str = File.read(pid_file)
@@ -327,96 +324,89 @@
nil
end
end
end
end
-
+
def actual_pid=(pid)
@actual_pid = pid
end
-
+
def clear_pid
@actual_pid = nil
end
-
+
def unlink_pid
File.unlink(pid_file) if pid_file && File.exists?(pid_file)
end
-
+
# Internal State Methods
def skip_ticks_for(seconds)
# TODO: should this be addative or longest wins?
# i.e. if two calls for skip_ticks_for come in for 5 and 10, should it skip for 10 or 15?
self.skip_ticks_until = (self.skip_ticks_until || Time.now.to_i) + seconds.to_i
end
-
+
def skipping_ticks?
self.skip_ticks_until && self.skip_ticks_until > Time.now.to_i
end
-
+
def refresh_children!
# First prune the list of dead children
- @children.delete_if {|child| !child.process_running?(true) }
-
+ dead_children = self.children_timer.select {|timer| !timer.process.process_running?(true) }
+ dead_children.each {|timer| timer.detach }
+ dead_children.each {|timer| timer.process.watches.each {|w| w.detach }}
+ @children_timer -= dead_children
+
# Add new found children to the list
- new_children_pids = System.get_children(self.actual_pid) - @children.map {|child| child.actual_pid}
-
+ new_children_pids = System.get_children(self.actual_pid) - self.children_timer.map {|timer| timer.process.actual_pid}
+
unless new_children_pids.empty?
- logger.info "Existing children: #{@children.collect{|c| c.actual_pid}.join(",")}. Got new children: #{new_children_pids.inspect} for #{actual_pid}"
+ logger.info "Existing children: #{self.children_timer.collect{|c| c.process.actual_pid}.join(",")}. Got new children: #{new_children_pids.inspect} for #{actual_pid}"
end
-
+
# Construct a new process wrapper for each new found children
new_children_pids.each do |child_pid|
- child = self.child_process_template.deep_copy
-
- child.name = "<child(pid:#{child_pid})>"
- child.actual_pid = child_pid
- child.logger = self.logger.prefix_with(child.name)
-
+ name = "<child(pid:#{child_pid})>"
+ logger = self.logger.prefix_with(name)
+
+ child = self.child_process_factory.create_child_process(name, child_pid, logger)
+
child.initialize_state_machines
child.state = "up"
-
- @children << child
+
+ child.set_timer
+ child.watches.each {|w| w.process = child }
+
+ @children_timer << child.timer
end
end
- def deep_copy
- # TODO: This is a kludge. Ideally, process templates
- # would be facotries, and not a template object.
- mutex, triggers, @event_mutex, @triggers = @event_mutex, @triggers, nil, nil
- clone = Marshal.load(Marshal.dump(self))
- clone.instance_variable_set("@event_mutex", Monitor.new)
- clone.instance_variable_set("@triggers", triggers.collect{ |t| t.deep_copy })
- @event_mutex = mutex
- @triggers = triggers
- clone
- end
-
def prepare_command(command)
command.to_s.gsub("{{PID}}", actual_pid.to_s)
end
-
+
def system_command_options
{
- :uid => self.uid,
- :gid => self.gid,
+ :uid => self.uid,
+ :gid => self.gid,
:working_dir => self.working_dir,
:environment => self.environment,
:pid_file => self.pid_file,
:logger => self.logger,
:stdin => self.stdin,
:stdout => self.stdout,
:stderr => self.stderr
}
end
-
+
def with_timeout(secs, &blk)
Timeout.timeout(secs.to_f, &blk)
-
+
rescue Timeout::Error
logger.err "Execution is taking longer than expected. Unmonitoring."
logger.err "Did you forget to tell bluepill to daemonize this process?"
self.dispatch!("unmonitor")
end
end
end
-
+