lib/flapjack/coordinator.rb in flapjack-0.6.39 vs lib/flapjack/coordinator.rb in flapjack-0.6.40
- old
+ new
@@ -28,268 +28,244 @@
class Coordinator
include Flapjack::Daemonizable
- def initialize(config = {})
+ def initialize(config, redis_options)
@config = config
+ @redis_options = redis_options
@pikelets = []
@logger = Log4r::Logger.new("flapjack-coordinator")
@logger.add(Log4r::StdoutOutputter.new("flapjack-coordinator"))
@logger.add(Log4r::SyslogOutputter.new("flapjack-coordinator"))
end
def start(options = {})
@signals = options[:signals]
if options[:daemonize]
+ @signals = options[:signals]
daemonize
else
- setup
+ run(:signals => options[:signals])
end
end
def after_daemonize
- setup
+ run(:signals => @signals)
end
def stop
+ return if @stopping
+ @stopping = true
shutdown
end
private
- def setup
+ # map from config key to pikelet class
+ PIKELET_TYPES = {'executive' => Flapjack::Executive,
+ 'jabber_gateway' => Flapjack::Jabber,
+ 'pagerduty_gateway' => Flapjack::Pagerduty,
+ 'oobetet' => Flapjack::Oobetet,
- # FIXME: the following is currently repeated in flapjack-populator and
- # flapjack-nagios-receiver - move to a method in a module and include it
- redis_host = @config['redis']['host'] || '127.0.0.1'
- redis_port = @config['redis']['port'] || 6379
- redis_path = @config['redis']['path'] || nil
- redis_db = @config['redis']['db'] || 0
+ 'web' => Flapjack::Web,
+ 'api' => Flapjack::API,
- if redis_path
- @redis_options = { :db => redis_db, :path => redis_path }
- else
- @redis_options = { :db => redis_db, :host => redis_host, :port => redis_port }
- end
+ 'email_notifier' => Flapjack::Notification::Email,
+ 'sms_notifier' => Flapjack::Notification::Sms}
+ def run(options = {})
+
EM.synchrony do
@logger.debug "config keys: #{@config.keys}"
- pikelet_keys = ['executive', 'jabber_gateway', 'pagerduty_gateway',
- 'email_notifier', 'sms_notifier', 'web', 'api',
- 'oobetet']
-
@config.keys.each do |pikelet_type|
- next unless pikelet_keys.include?(pikelet_type) &&
+ next unless PIKELET_TYPES.keys.include?(pikelet_type) &&
@config[pikelet_type].is_a?(Hash) &&
@config[pikelet_type]['enabled']
@logger.debug "coordinator is now initialising the #{pikelet_type} pikelet"
pikelet_cfg = @config[pikelet_type]
- case pikelet_type
- when 'executive', 'jabber_gateway', 'pagerduty_gateway', 'oobetet'
- build_pikelet(pikelet_type, pikelet_cfg)
- when 'web', 'api'
- build_thin_pikelet(pikelet_type, pikelet_cfg)
- when 'email_notifier', 'sms_notifier'
- build_resque_pikelet(pikelet_type, pikelet_cfg)
- end
+ build_pikelet(pikelet_type, pikelet_cfg)
end
setup_signals if @signals
end
end
+ # the global nature of this seems at odds with it calling stop
+ # within a single coordinator instance. Coordinator is essentially
+ # a singleton anyway...
def setup_signals
- trap('INT') { stop }
- trap('TERM') { stop }
- unless RUBY_PLATFORM =~ /mswin/
- trap('QUIT') { stop }
- # trap('HUP') { }
+ Kernel.trap('INT') { stop }
+ Kernel.trap('TERM') { stop }
+ unless RbConfig::CONFIG['host_os'] =~ /mswin|windows|cygwin/i
+ Kernel.trap('QUIT') { stop }
+ # Kernel.trap('HUP') { }
end
end
def build_pikelet(pikelet_type, pikelet_cfg)
- pikelet_class = case pikelet_type
- when 'executive'
- Flapjack::Executive
- when 'jabber_gateway'
- Flapjack::Jabber
- when 'pagerduty_gateway'
- Flapjack::Pagerduty
- when 'oobetet'
- Flapjack::Oobetet
- end
- return unless pikelet_class
+ return unless pikelet_class = PIKELET_TYPES[pikelet_type]
- pikelet = pikelet_class.new
- f = Fiber.new {
- begin
- pikelet.bootstrap(:redis => @redis_options, :config => pikelet_cfg)
- pikelet.main
- rescue Exception => e
- trace = e.backtrace.join("\n")
- @logger.fatal "#{e.message}\n#{trace}"
- stop
- end
- }
- @pikelets << {:fiber => f, :type => pikelet_type, :instance => pikelet}
- f.resume
- @logger.debug "new fiber created for #{pikelet_type}"
- end
+ inc_mod = pikelet_class.included_modules
+ ext_mod = extended_modules(pikelet_class)
- def build_thin_pikelet(pikelet_type, pikelet_cfg)
- pikelet_class = case pikelet_type
- when 'web'
- Flapjack::Web
- when 'api'
- Flapjack::API
- end
- return unless pikelet_class
+ pikelet = nil
+ fiber = nil
- port = nil
- if pikelet_cfg['port']
- port = pikelet_cfg['port'].to_i
- end
+ if inc_mod.include?(Flapjack::GenericPikelet)
+ pikelet = pikelet_class.new
+ pikelet.bootstrap(:config => pikelet_cfg, :redis_config => @redis_options)
- port = 3001 if (port.nil? || port <= 0 || port > 65535)
+ else
+ pikelet_class.bootstrap(:config => pikelet_cfg, :redis_config => @redis_options)
- pikelet_class.class_variable_set('@@redis',
- Flapjack::RedisPool.new(:config => @redis_options))
+ if ext_mod.include?(Flapjack::ThinPikelet)
- Thin::Logging.silent = true
+ unless @thin_silenced
+ Thin::Logging.silent = true
+ @thin_silenced = true
+ end
- pikelet = Thin::Server.new('0.0.0.0', port, pikelet_class, :signals => false)
- @pikelets << {:instance => pikelet, :type => pikelet_type}
- pikelet.start
- @logger.debug "new thin server instance started for #{pikelet_type}"
- end
+ pikelet = Thin::Server.new('0.0.0.0',
+ pikelet_class.instance_variable_get('@port'),
+ pikelet_class, :signals => false)
- def build_resque_pikelet(pikelet_type, pikelet_cfg)
- pikelet_class = case pikelet_type
- when 'email_notifier'
- Flapjack::Notification::Email
- when 'sms_notifier'
- Flapjack::Notification::Sms
- end
- return unless pikelet_class
+ elsif ext_mod.include?(Flapjack::ResquePikelet)
- # set up connection pooling, stop resque errors (ensure that it's only
- # done once)
- @resque_pool = nil
- if (['email_notifier', 'sms_notifier'] & @pikelets.collect {|p| p[:type]}).empty?
- pool = Flapjack::RedisPool.new(:config => @redis_options)
- ::Resque.redis = pool
- @resque_pool = pool
- ## NB: can override the default 'resque' namespace like this
- #::Resque.redis.namespace = 'flapjack'
+ # set up connection pooling, stop resque errors
+ unless @resque_pool
+ @resque_pool = Flapjack::RedisPool.new(:config => @redis_options)
+ ::Resque.redis = @resque_pool
+ ## NB: can override the default 'resque' namespace like this
+ #::Resque.redis.namespace = 'flapjack'
+ end
+
+ # TODO error if pikelet_cfg['queue'].nil?
+ pikelet = EM::Resque::Worker.new(pikelet_cfg['queue'])
+ # # Use these to debug the resque workers
+ # pikelet.verbose = true
+ # pikelet.very_verbose = true
+ end
+
end
- # See https://github.com/mikel/mail/blob/master/lib/mail/mail.rb#L53
- # & https://github.com/mikel/mail/blob/master/spec/mail/configuration_spec.rb
- # for details of configuring mail gem. defaults to SMTP, localhost, port 25
+ pikelet_info = {:class => pikelet_class, :instance => pikelet}
- if pikelet_type.eql?('email_notifier')
- smtp_config = {}
+ if inc_mod.include?(Flapjack::GenericPikelet) ||
+ ext_mod.include?(Flapjack::ResquePikelet)
- if pikelet_cfg['smtp_config']
- smtp_config = pikelet_cfg['smtp_config'].keys.inject({}) do |ret,obj|
- ret[obj.to_sym] = pikelet_cfg['smtp_config'][obj]
- ret
+ fiber = Fiber.new {
+ begin
+ # Can't use local inc_mod/ext_mod variables in the new fiber
+ if pikelet.is_a?(Flapjack::GenericPikelet)
+ pikelet.main
+ elsif extended_modules(pikelet_class).include?(Flapjack::ResquePikelet)
+ pikelet.work(0.1)
+ end
+ rescue Exception => e
+ trace = e.backtrace.join("\n")
+ @logger.fatal "#{e.message}\n#{trace}"
+ stop
end
- end
-
- Mail.defaults {
- delivery_method :smtp, {:enable_starttls_auto => false}.merge(smtp_config)
}
+
+ pikelet_info[:fiber] = fiber
+ fiber.resume
+ @logger.debug "new fiber created for #{pikelet_type}"
+ elsif ext_mod.include?(Flapjack::ThinPikelet)
+ pikelet.start
+ @logger.debug "new thin server instance started for #{pikelet_type}"
end
- pikelet_class.class_variable_set('@@config', pikelet_cfg)
+ @pikelets << pikelet_info
+ end
- # TODO error if pikelet_cfg['queue'].nil?
- pikelet = EM::Resque::Worker.new(pikelet_cfg['queue'])
- # # Use these to debug the resque workers
- # pikelet.verbose = true
- # pikelet.very_verbose = true
-
- f = Fiber.new {
- begin
- pikelet.work(0.1)
- rescue Exception => e
- trace = e.backtrace.join("\n")
- @logger.fatal "#{e.message}\n#{trace}"
- stop
+ # only prints state changes, otherwise pikelets not closing promptly can
+ # cause everything else to be spammy
+ def health_check
+ @pikelets.each do |pik|
+ status = if extended_modules(pik[:class]).include?(Flapjack::ThinPikelet)
+ pik[:instance].backend.size > 0 ? 'running' : 'stopped'
+ elsif pik[:fiber]
+ pik[:fiber].alive? ? 'running' : 'stopped'
end
- }
- @pikelets << {:fiber => f, :type => pikelet_type, :instance => pikelet}
- f.resume
- @logger.debug "new fiber created for #{pikelet_type}"
+ next if pik.has_key?(:status) && pik[:status].eql?(status)
+ @logger.info "#{pik[:class].name}: #{status}"
+ pik[:status] = status
+ end
end
- # # TODO rewrite to be less spammy -- print only initial state and changes
- # def health_check
- # @pikelets.each do |pik|
- # if pik[:instance].is_a?(Thin::Server)
- # s = pik[:instance].backend.size
- # @logger.debug "thin on port #{pik[:instance].port} - #{s} connections"
- # elsif pik[:fiber]
- # @logger.debug "#{pik[:type]}: #{pik[:fiber].alive? ? 'alive' : 'dead'}"
- # end
- # end
- # end
-
- # TODO whem merged with other changes, have this check pik[:class] instead,
- # makes tests neater
def shutdown
@pikelets.each do |pik|
- case pik[:instance]
- when Flapjack::Executive, Flapjack::Jabber, Flapjack::Pagerduty
+
+ pik_inst = pik[:instance]
+ ext_mod = extended_modules(pik[:class])
+
+ # would be neater if we could use something similar for the class << self
+ # included pikelets as well
+ if pik_inst.is_a?(Flapjack::GenericPikelet)
if pik[:fiber] && pik[:fiber].alive?
- pik[:instance].stop
+ pik_inst.stop
Fiber.new {
# this needs to use a separate Redis connection from the pikelet's
# one, as that's in the middle of its blpop
r = Redis.new(@redis_options.merge(:driver => 'synchrony'))
- pik[:instance].add_shutdown_event(:redis => r)
+ pik_inst.add_shutdown_event(:redis => r)
r.quit
}.resume
end
- when EM::Resque::Worker
+ elsif ext_mod.include?(Flapjack::ResquePikelet)
# resque is polling, so we don't need a shutdown object
- pik[:instance].shutdown if pik[:fiber] && pik[:fiber].alive?
- when Thin::Server # web, api
+ pik_inst.shutdown if pik[:fiber] && pik[:fiber].alive?
+ elsif ext_mod.include?(Flapjack::ThinPikelet)
# drop from this side, as HTTP keepalive etc. means browsers
# keep connections alive for ages, and we'd be hanging around
# waiting for them to drop
- pik[:instance].stop!
+ pik_inst.stop!
end
end
- fibers = @pikelets.collect {|p| p[:fiber] }.compact
- thin_pikelets = @pikelets.collect {|p| p[:instance]}.select {|i| i.is_a?(Thin::Server) }
-
Fiber.new {
+
loop do
- # health_check
- if fibers.any?(&:alive?) || thin_pikelets.any?{|tp| !tp.backend.empty? }
+ health_check
+
+ if @pikelets.any? {|p| p[:status] == 'running'}
EM::Synchrony.sleep 0.25
else
@resque_pool.empty! if @resque_pool
- [Flapjack::Web, Flapjack::API].each do |klass|
- next unless klass.class_variable_defined?('@@redis') &&
- redis = klass.class_variable_get('@@redis')
- redis.empty!
+ @pikelets.each do |pik|
+
+ pik_inst = pik[:instance]
+ ext_mod = extended_modules(pik[:class])
+
+ if pik_inst.is_a?(Flapjack::GenericPikelet)
+
+ pik_inst.cleanup
+
+ elsif [Flapjack::ResquePikelet, Flapjack::ThinPikelet].any?{|fp|
+ ext_mod.include?(fp)
+ }
+
+ pik[:class].cleanup
+
+ end
end
EM.stop
break
end
end
}.resume
+ end
+
+ def extended_modules(klass)
+ (class << klass; self; end).included_modules
end
end
end