lib/flapjack/coordinator.rb in flapjack-0.6.39 vs lib/flapjack/coordinator.rb in flapjack-0.6.40

- old
+ new

@@ -28,268 +28,244 @@ class Coordinator include Flapjack::Daemonizable - def initialize(config = {}) + def initialize(config, redis_options) @config = config + @redis_options = redis_options @pikelets = [] @logger = Log4r::Logger.new("flapjack-coordinator") @logger.add(Log4r::StdoutOutputter.new("flapjack-coordinator")) @logger.add(Log4r::SyslogOutputter.new("flapjack-coordinator")) end def start(options = {}) @signals = options[:signals] if options[:daemonize] + @signals = options[:signals] daemonize else - setup + run(:signals => options[:signals]) end end def after_daemonize - setup + run(:signals => @signals) end def stop + return if @stopping + @stopping = true shutdown end private - def setup + # map from config key to pikelet class + PIKELET_TYPES = {'executive' => Flapjack::Executive, + 'jabber_gateway' => Flapjack::Jabber, + 'pagerduty_gateway' => Flapjack::Pagerduty, + 'oobetet' => Flapjack::Oobetet, - # FIXME: the following is currently repeated in flapjack-populator and - # flapjack-nagios-receiver - move to a method in a module and include it - redis_host = @config['redis']['host'] || '127.0.0.1' - redis_port = @config['redis']['port'] || 6379 - redis_path = @config['redis']['path'] || nil - redis_db = @config['redis']['db'] || 0 + 'web' => Flapjack::Web, + 'api' => Flapjack::API, - if redis_path - @redis_options = { :db => redis_db, :path => redis_path } - else - @redis_options = { :db => redis_db, :host => redis_host, :port => redis_port } - end + 'email_notifier' => Flapjack::Notification::Email, + 'sms_notifier' => Flapjack::Notification::Sms} + def run(options = {}) + EM.synchrony do @logger.debug "config keys: #{@config.keys}" - pikelet_keys = ['executive', 'jabber_gateway', 'pagerduty_gateway', - 'email_notifier', 'sms_notifier', 'web', 'api', - 'oobetet'] - @config.keys.each do |pikelet_type| - next unless pikelet_keys.include?(pikelet_type) && + next unless PIKELET_TYPES.keys.include?(pikelet_type) && @config[pikelet_type].is_a?(Hash) && @config[pikelet_type]['enabled'] @logger.debug "coordinator is now initialising the #{pikelet_type} pikelet" pikelet_cfg = @config[pikelet_type] - case pikelet_type - when 'executive', 'jabber_gateway', 'pagerduty_gateway', 'oobetet' - build_pikelet(pikelet_type, pikelet_cfg) - when 'web', 'api' - build_thin_pikelet(pikelet_type, pikelet_cfg) - when 'email_notifier', 'sms_notifier' - build_resque_pikelet(pikelet_type, pikelet_cfg) - end + build_pikelet(pikelet_type, pikelet_cfg) end setup_signals if @signals end end + # the global nature of this seems at odds with it calling stop + # within a single coordinator instance. Coordinator is essentially + # a singleton anyway... def setup_signals - trap('INT') { stop } - trap('TERM') { stop } - unless RUBY_PLATFORM =~ /mswin/ - trap('QUIT') { stop } - # trap('HUP') { } + Kernel.trap('INT') { stop } + Kernel.trap('TERM') { stop } + unless RbConfig::CONFIG['host_os'] =~ /mswin|windows|cygwin/i + Kernel.trap('QUIT') { stop } + # Kernel.trap('HUP') { } end end def build_pikelet(pikelet_type, pikelet_cfg) - pikelet_class = case pikelet_type - when 'executive' - Flapjack::Executive - when 'jabber_gateway' - Flapjack::Jabber - when 'pagerduty_gateway' - Flapjack::Pagerduty - when 'oobetet' - Flapjack::Oobetet - end - return unless pikelet_class + return unless pikelet_class = PIKELET_TYPES[pikelet_type] - pikelet = pikelet_class.new - f = Fiber.new { - begin - pikelet.bootstrap(:redis => @redis_options, :config => pikelet_cfg) - pikelet.main - rescue Exception => e - trace = e.backtrace.join("\n") - @logger.fatal "#{e.message}\n#{trace}" - stop - end - } - @pikelets << {:fiber => f, :type => pikelet_type, :instance => pikelet} - f.resume - @logger.debug "new fiber created for #{pikelet_type}" - end + inc_mod = pikelet_class.included_modules + ext_mod = extended_modules(pikelet_class) - def build_thin_pikelet(pikelet_type, pikelet_cfg) - pikelet_class = case pikelet_type - when 'web' - Flapjack::Web - when 'api' - Flapjack::API - end - return unless pikelet_class + pikelet = nil + fiber = nil - port = nil - if pikelet_cfg['port'] - port = pikelet_cfg['port'].to_i - end + if inc_mod.include?(Flapjack::GenericPikelet) + pikelet = pikelet_class.new + pikelet.bootstrap(:config => pikelet_cfg, :redis_config => @redis_options) - port = 3001 if (port.nil? || port <= 0 || port > 65535) + else + pikelet_class.bootstrap(:config => pikelet_cfg, :redis_config => @redis_options) - pikelet_class.class_variable_set('@@redis', - Flapjack::RedisPool.new(:config => @redis_options)) + if ext_mod.include?(Flapjack::ThinPikelet) - Thin::Logging.silent = true + unless @thin_silenced + Thin::Logging.silent = true + @thin_silenced = true + end - pikelet = Thin::Server.new('0.0.0.0', port, pikelet_class, :signals => false) - @pikelets << {:instance => pikelet, :type => pikelet_type} - pikelet.start - @logger.debug "new thin server instance started for #{pikelet_type}" - end + pikelet = Thin::Server.new('0.0.0.0', + pikelet_class.instance_variable_get('@port'), + pikelet_class, :signals => false) - def build_resque_pikelet(pikelet_type, pikelet_cfg) - pikelet_class = case pikelet_type - when 'email_notifier' - Flapjack::Notification::Email - when 'sms_notifier' - Flapjack::Notification::Sms - end - return unless pikelet_class + elsif ext_mod.include?(Flapjack::ResquePikelet) - # set up connection pooling, stop resque errors (ensure that it's only - # done once) - @resque_pool = nil - if (['email_notifier', 'sms_notifier'] & @pikelets.collect {|p| p[:type]}).empty? - pool = Flapjack::RedisPool.new(:config => @redis_options) - ::Resque.redis = pool - @resque_pool = pool - ## NB: can override the default 'resque' namespace like this - #::Resque.redis.namespace = 'flapjack' + # set up connection pooling, stop resque errors + unless @resque_pool + @resque_pool = Flapjack::RedisPool.new(:config => @redis_options) + ::Resque.redis = @resque_pool + ## NB: can override the default 'resque' namespace like this + #::Resque.redis.namespace = 'flapjack' + end + + # TODO error if pikelet_cfg['queue'].nil? + pikelet = EM::Resque::Worker.new(pikelet_cfg['queue']) + # # Use these to debug the resque workers + # pikelet.verbose = true + # pikelet.very_verbose = true + end + end - # See https://github.com/mikel/mail/blob/master/lib/mail/mail.rb#L53 - # & https://github.com/mikel/mail/blob/master/spec/mail/configuration_spec.rb - # for details of configuring mail gem. defaults to SMTP, localhost, port 25 + pikelet_info = {:class => pikelet_class, :instance => pikelet} - if pikelet_type.eql?('email_notifier') - smtp_config = {} + if inc_mod.include?(Flapjack::GenericPikelet) || + ext_mod.include?(Flapjack::ResquePikelet) - if pikelet_cfg['smtp_config'] - smtp_config = pikelet_cfg['smtp_config'].keys.inject({}) do |ret,obj| - ret[obj.to_sym] = pikelet_cfg['smtp_config'][obj] - ret + fiber = Fiber.new { + begin + # Can't use local inc_mod/ext_mod variables in the new fiber + if pikelet.is_a?(Flapjack::GenericPikelet) + pikelet.main + elsif extended_modules(pikelet_class).include?(Flapjack::ResquePikelet) + pikelet.work(0.1) + end + rescue Exception => e + trace = e.backtrace.join("\n") + @logger.fatal "#{e.message}\n#{trace}" + stop end - end - - Mail.defaults { - delivery_method :smtp, {:enable_starttls_auto => false}.merge(smtp_config) } + + pikelet_info[:fiber] = fiber + fiber.resume + @logger.debug "new fiber created for #{pikelet_type}" + elsif ext_mod.include?(Flapjack::ThinPikelet) + pikelet.start + @logger.debug "new thin server instance started for #{pikelet_type}" end - pikelet_class.class_variable_set('@@config', pikelet_cfg) + @pikelets << pikelet_info + end - # TODO error if pikelet_cfg['queue'].nil? - pikelet = EM::Resque::Worker.new(pikelet_cfg['queue']) - # # Use these to debug the resque workers - # pikelet.verbose = true - # pikelet.very_verbose = true - - f = Fiber.new { - begin - pikelet.work(0.1) - rescue Exception => e - trace = e.backtrace.join("\n") - @logger.fatal "#{e.message}\n#{trace}" - stop + # only prints state changes, otherwise pikelets not closing promptly can + # cause everything else to be spammy + def health_check + @pikelets.each do |pik| + status = if extended_modules(pik[:class]).include?(Flapjack::ThinPikelet) + pik[:instance].backend.size > 0 ? 'running' : 'stopped' + elsif pik[:fiber] + pik[:fiber].alive? ? 'running' : 'stopped' end - } - @pikelets << {:fiber => f, :type => pikelet_type, :instance => pikelet} - f.resume - @logger.debug "new fiber created for #{pikelet_type}" + next if pik.has_key?(:status) && pik[:status].eql?(status) + @logger.info "#{pik[:class].name}: #{status}" + pik[:status] = status + end end - # # TODO rewrite to be less spammy -- print only initial state and changes - # def health_check - # @pikelets.each do |pik| - # if pik[:instance].is_a?(Thin::Server) - # s = pik[:instance].backend.size - # @logger.debug "thin on port #{pik[:instance].port} - #{s} connections" - # elsif pik[:fiber] - # @logger.debug "#{pik[:type]}: #{pik[:fiber].alive? ? 'alive' : 'dead'}" - # end - # end - # end - - # TODO whem merged with other changes, have this check pik[:class] instead, - # makes tests neater def shutdown @pikelets.each do |pik| - case pik[:instance] - when Flapjack::Executive, Flapjack::Jabber, Flapjack::Pagerduty + + pik_inst = pik[:instance] + ext_mod = extended_modules(pik[:class]) + + # would be neater if we could use something similar for the class << self + # included pikelets as well + if pik_inst.is_a?(Flapjack::GenericPikelet) if pik[:fiber] && pik[:fiber].alive? - pik[:instance].stop + pik_inst.stop Fiber.new { # this needs to use a separate Redis connection from the pikelet's # one, as that's in the middle of its blpop r = Redis.new(@redis_options.merge(:driver => 'synchrony')) - pik[:instance].add_shutdown_event(:redis => r) + pik_inst.add_shutdown_event(:redis => r) r.quit }.resume end - when EM::Resque::Worker + elsif ext_mod.include?(Flapjack::ResquePikelet) # resque is polling, so we don't need a shutdown object - pik[:instance].shutdown if pik[:fiber] && pik[:fiber].alive? - when Thin::Server # web, api + pik_inst.shutdown if pik[:fiber] && pik[:fiber].alive? + elsif ext_mod.include?(Flapjack::ThinPikelet) # drop from this side, as HTTP keepalive etc. means browsers # keep connections alive for ages, and we'd be hanging around # waiting for them to drop - pik[:instance].stop! + pik_inst.stop! end end - fibers = @pikelets.collect {|p| p[:fiber] }.compact - thin_pikelets = @pikelets.collect {|p| p[:instance]}.select {|i| i.is_a?(Thin::Server) } - Fiber.new { + loop do - # health_check - if fibers.any?(&:alive?) || thin_pikelets.any?{|tp| !tp.backend.empty? } + health_check + + if @pikelets.any? {|p| p[:status] == 'running'} EM::Synchrony.sleep 0.25 else @resque_pool.empty! if @resque_pool - [Flapjack::Web, Flapjack::API].each do |klass| - next unless klass.class_variable_defined?('@@redis') && - redis = klass.class_variable_get('@@redis') - redis.empty! + @pikelets.each do |pik| + + pik_inst = pik[:instance] + ext_mod = extended_modules(pik[:class]) + + if pik_inst.is_a?(Flapjack::GenericPikelet) + + pik_inst.cleanup + + elsif [Flapjack::ResquePikelet, Flapjack::ThinPikelet].any?{|fp| + ext_mod.include?(fp) + } + + pik[:class].cleanup + + end end EM.stop break end end }.resume + end + + def extended_modules(klass) + (class << klass; self; end).included_modules end end end