lib/flapjack/coordinator.rb in flapjack-1.6.0 vs lib/flapjack/coordinator.rb in flapjack-2.0.0b1

- old
+ new

@@ -1,174 +1,127 @@ #!/usr/bin/env ruby -require 'eventmachine' -require 'em-synchrony' - +require 'monitor' require 'syslog' +require 'zermelo' + require 'flapjack/configuration' require 'flapjack/patches' -require 'flapjack/redis_pool' -require 'flapjack/logger' +require 'flapjack/redis_proxy' + require 'flapjack/pikelet' +require 'flapjack/data/condition' + module Flapjack class Coordinator + # states: :starting, :running, :reloading, :stopped + def initialize(config) - @config = config - @redis_options = config.for_redis - @pikelets = [] + Thread.abort_on_exception = true - @received_signals = [] + ActiveSupport.use_standard_json_time_format = true + ActiveSupport.time_precision = 0 - @logger = Flapjack::Logger.new("flapjack-coordinator", @config.all['logger']) - end + @exit_value = nil - def start(options = {}) - @boot_time = Time.now + @config = config + @pikelets = [] - EM.synchrony do - setup_signals if options[:signals] + @received_signals = [] - redis = Flapjack::RedisPool.new(:config => @redis_options, :size => 1, :logger => @logger) - ['entity', 'check'].each do |type| - discovered = redis.keys("#{type}_tag:*") - redis.sadd("known_tags:#{type}_tag", discovered) unless discovered.empty? - end + @state = :starting + @monitor = Monitor.new + @monitor_cond = @monitor.new_cond - begin - add_pikelets(pikelets(@config.all)) - loop do - while sig = @received_signals.shift do - case sig - when 'INT', 'TERM' - @exit_value = Signal.list[sig] + 128 - raise Interrupt - when 'HUP' - reload - end - end - EM::Synchrony.sleep 0.25 - end - rescue Exception => e - unless e.is_a?(Interrupt) - trace = e.backtrace.join("\n") - @logger.fatal "#{e.class.name}\n#{e.message}\n#{trace}" - @exit_value = 1 - end - remove_pikelets(@pikelets) - EM.stop - end - end + # needs to be done per-thread + cfg = @config.all + Flapjack.configure_log('flapjack-coordinator', cfg.nil? ? {} : cfg['logger']) - Syslog.close if Syslog.opened? + @reload = proc { + @monitor.synchronize { + @monitor_cond.wait_until { :running.eql?(@state) } + @state = :reloading + @monitor_cond.signal + } + } - @exit_value + @shutdown = proc { |exit_val| + @monitor.synchronize { + @monitor_cond.wait_until { :running.eql?(@state) } + @state = :stopping + @exit_value = exit_val + @monitor_cond.signal + } + } end - private + def start(opts = {}) + # we can't block on the main thread, as signals interrupt that + Thread.new do + # needs to be done per-thread + cfg = @config.all + Flapjack.configure_log('flapjack-coordinator', cfg.nil? ? {} : cfg['logger']) - def reload - prev_pikelet_cfg = pikelets(@config.all) + @boot_time = Time.now - removed = [] - added = [] - ask_running = [] + Flapjack::RedisProxy.config = @config.for_redis - cfg_filename = @config.filename - @config = Flapjack::Configuration.new - @config.load(cfg_filename) + pikelet_defs = pikelet_definitions(cfg) + return if pikelet_defs.empty? - enabled_pikelet_cfg = pikelets(@config.all) + create_pikelets(pikelet_defs).each do |pik| + @pikelets << pik + end - (prev_pikelet_cfg.keys + enabled_pikelet_cfg.keys).each do |type| - - if prev_pikelet_cfg.keys.include?(type) - if enabled_pikelet_cfg.keys.include?(type) - ask_running << type - else - removed << type - end - elsif enabled_pikelet_cfg.keys.include?(type) - added << type + @pikelets.each do |pik| + pik.start end - end + setup_signals if opts[:signals] - @pikelets.select {|pik| ask_running.include?(pik.type) }.each do |pik| - # for sections previously there and still there, ask them - # to make the config change; they will if they can, or will signal - # restart is needed if not + # block this thread until 'stop' has been called, and + # all pikelets have been stopped + @monitor.synchronize { + @state = :running + @monitor_cond.wait_until { !(:running.eql?(@state)) } + case @state + when :reloading + reload + @state = :running + @monitor_cond.signal + when :stopping + @pikelets.map(&:stop) + @pikelets.clear + @state = :stopped + @monitor_cond.signal + end + } - # reload() returns trinary value here; true means the change was made, false - # means the pikelet needs to be restarted, nil means no change - # was required - next unless pik.reload(enabled_pikelet_cfg[pik.type]).is_a?(FalseClass) - removed << pik.type - added << pik.type - end + end.join - removed_pikelets = @pikelets.select {|pik| removed.include?(pik.type) } - - remove_pikelets(removed_pikelets) - - # is there a nicer way to only keep the parts of the hash with matching keys? - added_pikelets = enabled_pikelet_cfg.select {|k, v| added.include?(k) } - - add_pikelets(added_pikelets) + @exit_value end + private + # the global nature of this seems at odds with it calling stop # within a single coordinator instance. Coordinator is essentially # a singleton anyway... def setup_signals - Kernel.trap('INT') { @received_signals << 'INT' unless @received_signals.include?('INT') } - Kernel.trap('TERM') { @received_signals << 'TERM' unless @received_signals.include?('TERM') } + Kernel.trap('INT') { Thread.new { @shutdown.call(Signal.list['INT']) }.join } + Kernel.trap('TERM') { Thread.new { @shutdown.call(Signal.list['TERM']) }.join } unless RbConfig::CONFIG['host_os'] =~ /mswin|windows|cygwin/i - Kernel.trap('HUP') { @received_signals << 'HUP' unless @received_signals.include?('HUP') } + Kernel.trap('HUP') { Thread.new { @reload.call }.join } end end - # passed a hash with {PIKELET_TYPE => PIKELET_CFG, ...} - def add_pikelets(pikelets_data = {}) - pikelets_data.each_pair do |type, cfg| - next unless pikelet = Flapjack::Pikelet.create(type, - :config => cfg, :redis_config => @redis_options, - :boot_time => @boot_time) - - @pikelets << pikelet - pikelet.start - end - end - - def remove_pikelets(piks, opts = {}) - piks.map(&:stop) - - loop do - # only prints state changes, otherwise pikelets not closing promptly can - # cause everything else to be spammy - piks.each do |pik| - old_status = pik.status - pik.update_status - status = pik.status - next if old_status.eql?(status) - @logger.info "#{pik.type}: #{old_status} -> #{status}" - end - - if piks.any? {|p| p.status == 'stopping' } - EM::Synchrony.sleep 0.25 - else - @pikelets -= piks - break - end - end - end - - def pikelets(config_env) + def pikelet_definitions(config_env) config = {} return config unless config_env # backwards-compatible with config file for previous 'executive' pikelet exec_cfg = nil @@ -193,9 +146,65 @@ return config unless config_env && config_env['gateways'] && !config_env['gateways'].nil? config.merge( config_env['gateways'].select {|k, v| Flapjack::Pikelet.is_pikelet?(k) && v['enabled'] } ) + end + + # passed a hash with {PIKELET_TYPE => PIKELET_CFG, ...} + # returns unstarted pikelet instances. + def create_pikelets(pikelets_data = {}) + pikelets_data.inject([]) do |memo, (type, cfg)| + pikelets = Flapjack::Pikelet.create(type, @shutdown, :config => cfg, + :boot_time => @boot_time) + memo += pikelets + memo + end + end + + # NB: global config options (e.g. daemonize, pidfile, + # logfile, redis options) won't be checked on reload. + # should we do a full restart if some of these change? + def reload + # TODO refactor cfg load and key retrieval, consolidate with initial load + prev_pikelet_cfg = pikelet_definitions(@config.all) + + @config.reload + + current_pikelet_cfg = pikelet_definitions(@config.all) + + prev_keys = prev_pikelet_cfg.keys + current_keys = current_pikelet_cfg.keys + + removed = prev_keys - current_keys + added = current_keys - prev_keys + ask_running = current_keys - (added + removed) + + # for sections previously there and still there, ask them + # to make the config change; they will if they can, or will signal + # restart is needed if not + # reload() returns trinary value here; true means the change was made, false + # means the pikelet needs to be restarted, nil means no change + # was required. + ask_running.each do |ask_key| + next unless pikelet = @pikelets.detect {|pik| ask_key == pik.type} + + if pikelet.reload(current_pikelet_cfg[pikelet.type]).is_a?(FalseClass) + removed << pikelet.type + added << pikelet.type + end + end + + pikelets_to_remove = @pikelets.select{|pik| removed.include?(pik.type) } + pikelets_to_remove.map(&:stop) + @pikelets -= pikelets_to_remove + + added_defs = current_pikelet_cfg.select {|k, v| added.include?(k) } + + create_pikelets(added_defs).each do |pik| + @pikelets << pik + pik.start + end end end end