lib/flapjack/coordinator.rb in flapjack-1.6.0 vs lib/flapjack/coordinator.rb in flapjack-2.0.0b1
- old
+ new
@@ -1,174 +1,127 @@
#!/usr/bin/env ruby
-require 'eventmachine'
-require 'em-synchrony'
-
+require 'monitor'
require 'syslog'
+require 'zermelo'
+
require 'flapjack/configuration'
require 'flapjack/patches'
-require 'flapjack/redis_pool'
-require 'flapjack/logger'
+require 'flapjack/redis_proxy'
+
require 'flapjack/pikelet'
+require 'flapjack/data/condition'
+
module Flapjack
class Coordinator
+ # states: :starting, :running, :reloading, :stopped
+
def initialize(config)
- @config = config
- @redis_options = config.for_redis
- @pikelets = []
+ Thread.abort_on_exception = true
- @received_signals = []
+ ActiveSupport.use_standard_json_time_format = true
+ ActiveSupport.time_precision = 0
- @logger = Flapjack::Logger.new("flapjack-coordinator", @config.all['logger'])
- end
+ @exit_value = nil
- def start(options = {})
- @boot_time = Time.now
+ @config = config
+ @pikelets = []
- EM.synchrony do
- setup_signals if options[:signals]
+ @received_signals = []
- redis = Flapjack::RedisPool.new(:config => @redis_options, :size => 1, :logger => @logger)
- ['entity', 'check'].each do |type|
- discovered = redis.keys("#{type}_tag:*")
- redis.sadd("known_tags:#{type}_tag", discovered) unless discovered.empty?
- end
+ @state = :starting
+ @monitor = Monitor.new
+ @monitor_cond = @monitor.new_cond
- begin
- add_pikelets(pikelets(@config.all))
- loop do
- while sig = @received_signals.shift do
- case sig
- when 'INT', 'TERM'
- @exit_value = Signal.list[sig] + 128
- raise Interrupt
- when 'HUP'
- reload
- end
- end
- EM::Synchrony.sleep 0.25
- end
- rescue Exception => e
- unless e.is_a?(Interrupt)
- trace = e.backtrace.join("\n")
- @logger.fatal "#{e.class.name}\n#{e.message}\n#{trace}"
- @exit_value = 1
- end
- remove_pikelets(@pikelets)
- EM.stop
- end
- end
+ # needs to be done per-thread
+ cfg = @config.all
+ Flapjack.configure_log('flapjack-coordinator', cfg.nil? ? {} : cfg['logger'])
- Syslog.close if Syslog.opened?
+ @reload = proc {
+ @monitor.synchronize {
+ @monitor_cond.wait_until { :running.eql?(@state) }
+ @state = :reloading
+ @monitor_cond.signal
+ }
+ }
- @exit_value
+ @shutdown = proc { |exit_val|
+ @monitor.synchronize {
+ @monitor_cond.wait_until { :running.eql?(@state) }
+ @state = :stopping
+ @exit_value = exit_val
+ @monitor_cond.signal
+ }
+ }
end
- private
+ def start(opts = {})
+ # we can't block on the main thread, as signals interrupt that
+ Thread.new do
+ # needs to be done per-thread
+ cfg = @config.all
+ Flapjack.configure_log('flapjack-coordinator', cfg.nil? ? {} : cfg['logger'])
- def reload
- prev_pikelet_cfg = pikelets(@config.all)
+ @boot_time = Time.now
- removed = []
- added = []
- ask_running = []
+ Flapjack::RedisProxy.config = @config.for_redis
- cfg_filename = @config.filename
- @config = Flapjack::Configuration.new
- @config.load(cfg_filename)
+ pikelet_defs = pikelet_definitions(cfg)
+ return if pikelet_defs.empty?
- enabled_pikelet_cfg = pikelets(@config.all)
+ create_pikelets(pikelet_defs).each do |pik|
+ @pikelets << pik
+ end
- (prev_pikelet_cfg.keys + enabled_pikelet_cfg.keys).each do |type|
-
- if prev_pikelet_cfg.keys.include?(type)
- if enabled_pikelet_cfg.keys.include?(type)
- ask_running << type
- else
- removed << type
- end
- elsif enabled_pikelet_cfg.keys.include?(type)
- added << type
+ @pikelets.each do |pik|
+ pik.start
end
- end
+ setup_signals if opts[:signals]
- @pikelets.select {|pik| ask_running.include?(pik.type) }.each do |pik|
- # for sections previously there and still there, ask them
- # to make the config change; they will if they can, or will signal
- # restart is needed if not
+ # block this thread until 'stop' has been called, and
+ # all pikelets have been stopped
+ @monitor.synchronize {
+ @state = :running
+ @monitor_cond.wait_until { !(:running.eql?(@state)) }
+ case @state
+ when :reloading
+ reload
+ @state = :running
+ @monitor_cond.signal
+ when :stopping
+ @pikelets.map(&:stop)
+ @pikelets.clear
+ @state = :stopped
+ @monitor_cond.signal
+ end
+ }
- # reload() returns trinary value here; true means the change was made, false
- # means the pikelet needs to be restarted, nil means no change
- # was required
- next unless pik.reload(enabled_pikelet_cfg[pik.type]).is_a?(FalseClass)
- removed << pik.type
- added << pik.type
- end
+ end.join
- removed_pikelets = @pikelets.select {|pik| removed.include?(pik.type) }
-
- remove_pikelets(removed_pikelets)
-
- # is there a nicer way to only keep the parts of the hash with matching keys?
- added_pikelets = enabled_pikelet_cfg.select {|k, v| added.include?(k) }
-
- add_pikelets(added_pikelets)
+ @exit_value
end
+ private
+
# the global nature of this seems at odds with it calling stop
# within a single coordinator instance. Coordinator is essentially
# a singleton anyway...
def setup_signals
- Kernel.trap('INT') { @received_signals << 'INT' unless @received_signals.include?('INT') }
- Kernel.trap('TERM') { @received_signals << 'TERM' unless @received_signals.include?('TERM') }
+ Kernel.trap('INT') { Thread.new { @shutdown.call(Signal.list['INT']) }.join }
+ Kernel.trap('TERM') { Thread.new { @shutdown.call(Signal.list['TERM']) }.join }
unless RbConfig::CONFIG['host_os'] =~ /mswin|windows|cygwin/i
- Kernel.trap('HUP') { @received_signals << 'HUP' unless @received_signals.include?('HUP') }
+ Kernel.trap('HUP') { Thread.new { @reload.call }.join }
end
end
- # passed a hash with {PIKELET_TYPE => PIKELET_CFG, ...}
- def add_pikelets(pikelets_data = {})
- pikelets_data.each_pair do |type, cfg|
- next unless pikelet = Flapjack::Pikelet.create(type,
- :config => cfg, :redis_config => @redis_options,
- :boot_time => @boot_time)
-
- @pikelets << pikelet
- pikelet.start
- end
- end
-
- def remove_pikelets(piks, opts = {})
- piks.map(&:stop)
-
- loop do
- # only prints state changes, otherwise pikelets not closing promptly can
- # cause everything else to be spammy
- piks.each do |pik|
- old_status = pik.status
- pik.update_status
- status = pik.status
- next if old_status.eql?(status)
- @logger.info "#{pik.type}: #{old_status} -> #{status}"
- end
-
- if piks.any? {|p| p.status == 'stopping' }
- EM::Synchrony.sleep 0.25
- else
- @pikelets -= piks
- break
- end
- end
- end
-
- def pikelets(config_env)
+ def pikelet_definitions(config_env)
config = {}
return config unless config_env
# backwards-compatible with config file for previous 'executive' pikelet
exec_cfg = nil
@@ -193,9 +146,65 @@
return config unless config_env && config_env['gateways'] &&
!config_env['gateways'].nil?
config.merge( config_env['gateways'].select {|k, v|
Flapjack::Pikelet.is_pikelet?(k) && v['enabled']
} )
+ end
+
+ # passed a hash with {PIKELET_TYPE => PIKELET_CFG, ...}
+ # returns unstarted pikelet instances.
+ def create_pikelets(pikelets_data = {})
+ pikelets_data.inject([]) do |memo, (type, cfg)|
+ pikelets = Flapjack::Pikelet.create(type, @shutdown, :config => cfg,
+ :boot_time => @boot_time)
+ memo += pikelets
+ memo
+ end
+ end
+
+ # NB: global config options (e.g. daemonize, pidfile,
+ # logfile, redis options) won't be checked on reload.
+ # should we do a full restart if some of these change?
+ def reload
+ # TODO refactor cfg load and key retrieval, consolidate with initial load
+ prev_pikelet_cfg = pikelet_definitions(@config.all)
+
+ @config.reload
+
+ current_pikelet_cfg = pikelet_definitions(@config.all)
+
+ prev_keys = prev_pikelet_cfg.keys
+ current_keys = current_pikelet_cfg.keys
+
+ removed = prev_keys - current_keys
+ added = current_keys - prev_keys
+ ask_running = current_keys - (added + removed)
+
+ # for sections previously there and still there, ask them
+ # to make the config change; they will if they can, or will signal
+ # restart is needed if not
+ # reload() returns trinary value here; true means the change was made, false
+ # means the pikelet needs to be restarted, nil means no change
+ # was required.
+ ask_running.each do |ask_key|
+ next unless pikelet = @pikelets.detect {|pik| ask_key == pik.type}
+
+ if pikelet.reload(current_pikelet_cfg[pikelet.type]).is_a?(FalseClass)
+ removed << pikelet.type
+ added << pikelet.type
+ end
+ end
+
+ pikelets_to_remove = @pikelets.select{|pik| removed.include?(pik.type) }
+ pikelets_to_remove.map(&:stop)
+ @pikelets -= pikelets_to_remove
+
+ added_defs = current_pikelet_cfg.select {|k, v| added.include?(k) }
+
+ create_pikelets(added_defs).each do |pik|
+ @pikelets << pik
+ pik.start
+ end
end
end
end