lib/flapjack/processor.rb in flapjack-1.6.0 vs lib/flapjack/processor.rb in flapjack-2.0.0b1
- old
+ new
@@ -1,285 +1,417 @@
#!/usr/bin/env ruby
require 'chronic_duration'
-require 'em-hiredis'
+require 'flapjack/redis_proxy'
require 'flapjack/filters/acknowledgement'
require 'flapjack/filters/ok'
require 'flapjack/filters/scheduled_maintenance'
require 'flapjack/filters/unscheduled_maintenance'
require 'flapjack/filters/delays'
-require 'flapjack/data/entity_check'
+require 'flapjack/data/check'
require 'flapjack/data/event'
-require 'flapjack/redis_pool'
+require 'flapjack/data/notification'
+require 'flapjack/data/statistic'
+
+require 'flapjack/exceptions'
require 'flapjack/utility'
module Flapjack
class Processor
include Flapjack::Utility
def initialize(opts = {})
+ @lock = opts[:lock]
+
@config = opts[:config]
- @redis_config = opts[:redis_config] || {}
- @logger = opts[:logger]
- @redis = Flapjack::RedisPool.new(:config => @redis_config, :size => 2, :logger => @logger)
+ @boot_time = opts[:boot_time]
@queue = @config['queue'] || 'events'
- @notifier_queue = @config['notifier_queue'] || 'notifications'
+ @initial_failure_delay = @config['initial_failure_delay']
+ if !@initial_failure_delay.is_a?(Integer) || (@initial_failure_delay < 0)
+ @initial_failure_delay = nil
+ end
+ @repeat_failure_delay = @config['repeat_failure_delay']
+ if !@repeat_failure_delay.is_a?(Integer) || (@repeat_failure_delay < 0)
+ @repeat_failure_delay = nil
+ end
+
+ @notifier_queue = Flapjack::RecordQueue.new(@config['notifier_queue'] || 'notifications',
+ Flapjack::Data::Notification)
+
@archive_events = @config['archive_events'] || false
@events_archive_maxage = @config['events_archive_maxage']
ncsm_duration_conf = @config['new_check_scheduled_maintenance_duration'] || '100 years'
@ncsm_duration = ChronicDuration.parse(ncsm_duration_conf, :keep_zero => true)
- @ncsm_ignore_tags = @config['new_check_scheduled_maintenance_ignore_tags'] || []
+ ncsm_ignore = @config['new_check_scheduled_maintenance_ignore_regex']
+ @ncsm_ignore_regex = if ncsm_ignore.nil? || ncsm_ignore.strip.empty?
+ nil
+ else
+ Regexp.new(ncsm_ignore)
+ end
- @exit_on_queue_empty = !! @config['exit_on_queue_empty']
+ @exit_on_queue_empty = !!@config['exit_on_queue_empty']
- options = { :logger => opts[:logger], :redis => @redis }
- @filters = []
- @filters << Flapjack::Filters::Ok.new(options)
- @filters << Flapjack::Filters::ScheduledMaintenance.new(options)
- @filters << Flapjack::Filters::UnscheduledMaintenance.new(options)
- @filters << Flapjack::Filters::Delays.new(options)
- @filters << Flapjack::Filters::Acknowledgement.new(options)
+ @filters = [Flapjack::Filters::Ok.new,
+ Flapjack::Filters::ScheduledMaintenance.new,
+ Flapjack::Filters::UnscheduledMaintenance.new,
+ Flapjack::Filters::Delays.new,
+ Flapjack::Filters::Acknowledgement.new]
- boot_time = opts[:boot_time]
fqdn = `/bin/hostname -f`.chomp
pid = Process.pid
@instance_id = "#{fqdn}:#{pid}"
+ end
- # FIXME: all of the below keys assume there is only ever one executive running;
- # we could generate a fuid and save it to disk, and prepend it from that
- # point on...
+ def start_stats
+ empty_stats = {:created_at => @boot_time, :all_events => 0,
+ :ok_events => 0, :failure_events => 0, :action_events => 0,
+ :invalid_events => 0}
- # FIXME: add an administrative function to reset all event counters
+ @global_stats = Flapjack::Data::Statistic.
+ intersect(:instance_name => 'global').all.first
- @redis.hset('event_counters', 'all', 0) if @redis.hget('event_counters', 'all').nil?
- @redis.hset('event_counters', 'ok', 0) if @redis.hget('event_counters', 'ok').nil?
- @redis.hset('event_counters', 'failure', 0) if @redis.hget('event_counters', 'failure').nil?
- @redis.hset('event_counters', 'action', 0) if @redis.hget('event_counters', 'action').nil?
- @redis.hset('event_counters', 'invalid', 0) if @redis.hget('event_counters', 'invalid').nil?
+ if @global_stats.nil?
+ @global_stats = Flapjack::Data::Statistic.new(empty_stats.merge(
+ :instance_name => 'global'))
+ @global_stats.save!
+ end
- @redis.hset("executive_instance:#{@instance_id}", 'boot_time', boot_time.to_i)
- @redis.hset("event_counters:#{@instance_id}", 'all', 0)
- @redis.hset("event_counters:#{@instance_id}", 'ok', 0)
- @redis.hset("event_counters:#{@instance_id}", 'failure', 0)
- @redis.hset("event_counters:#{@instance_id}", 'action', 0)
- @redis.hset("event_counters:#{@instance_id}", 'invalid', 0)
- touch_keys
+ @instance_stats = Flapjack::Data::Statistic.new(empty_stats.merge(
+ :instance_name => @instance_id))
+ @instance_stats.save!
end
- # expire instance keys after one week
- # TODO: set up a separate EM timer to reset key expiry every minute
- # and reduce the expiry to, say, five minutes
- # TODO: remove these keys on process exit
- def touch_keys
- [ "executive_instance:#{@instance_id}",
- "event_counters:#{@instance_id}",
- "event_counters:#{@instance_id}",
- "event_counters:#{@instance_id}",
- "event_counters:#{@instance_id}" ].each {|key|
- @redis.expire(key, 1036800)
- }
- end
-
def start
- @logger.info("Booting main loop.")
+ Flapjack.logger.info("Booting main loop.")
- until @should_quit
- @logger.debug("Waiting for event...")
- event = Flapjack::Data::Event.next(@queue,
- :redis => @redis,
- :archive_events => @archive_events,
- :events_archive_maxage => @events_archive_maxage,
- :logger => @logger,
- :block => ! @exit_on_queue_empty )
- if @exit_on_queue_empty && event.nil? && Flapjack::Data::Event.pending_count(@queue, :redis => @redis)
- # SHUT IT ALL DOWN!!!
- @logger.warn "Shutting down as exit_on_queue_empty is true, and the queue is empty"
- Process.kill('INT', Process.pid)
- break
- end
+ begin
+ Zermelo.redis = Flapjack.redis
- if event.nil?
- @redis.hincrby('event_counters', 'all', 1)
- @redis.hincrby("event_counters:#{@instance_id}", 'all', 1)
- @redis.hincrby('event_counters', 'invalid', 1)
- @redis.hincrby("event_counters:#{@instance_id}", 'invalid', 1)
- else
- process_event(event)
+ start_stats
+
+ queue = (@config['queue'] || 'events')
+
+ loop do
+ @lock.synchronize do
+ foreach_on_queue(queue) {|event| process_event(event)}
+ end
+
+ raise Flapjack::GlobalStop if @exit_on_queue_empty
+
+ wait_for_queue(queue)
end
+
+ ensure
+ @instance_stats.destroy unless @instance_stats.nil? || !@instance_stats.persisted?
+ Flapjack.redis.quit
end
+ end
- @logger.info("Exiting main loop.")
+ def stop_type
+ :exception
end
- # this must use a separate connection to the main Executive one, as it's running
- # from a different fiber while the main one is blocking.
- def stop
- unless @should_quit
- @should_quit = true
- redis_uri = @redis_config[:path] ||
- "redis://#{@redis_config[:host] || '127.0.0.1'}:#{@redis_config[:port] || '6379'}/#{@redis_config[:db] || '0'}"
- shutdown_redis = EM::Hiredis.connect(redis_uri)
- shutdown_redis.rpush('events', Flapjack.dump_json('type' => 'noop'))
+ private
+
+ def foreach_on_queue(queue, opts = {})
+ base_time_str = Time.now.utc.strftime "%Y%m%d%H"
+ rejects = "events_rejected:#{base_time_str}"
+ archive = @archive_events ? "events_archive:#{base_time_str}" : nil
+ max_age = archive ? @events_archive_maxage : nil
+
+ while event_json = (archive ? Flapjack.redis.rpoplpush(queue, archive) :
+ Flapjack.redis.rpop(queue))
+ parsed, errors = Flapjack::Data::Event.parse_and_validate(event_json)
+ if !errors.nil? && !errors.empty?
+ Flapjack.redis.multi do |multi|
+ if archive
+ multi.lrem(archive, 1, event_json)
+ end
+ multi.lpush(rejects, event_json)
+ @global_stats.all_events += 1
+ @global_stats.invalid_events += 1
+ @instance_stats.all_events += 1
+ @instance_stats.invalid_events += 1
+ if archive
+ multi.expire(archive, max_age)
+ end
+ end
+ Flapjack::Data::Statistic.lock do
+ @global_stats.save!
+ @instance_stats.save!
+ end
+ Flapjack.logger.error {
+ error_str = errors.nil? ? '' : errors.join(', ')
+ "Invalid event data received, #{error_str} #{parsed.inspect}"
+ }
+ else
+ Flapjack.redis.expire(archive, max_age) if archive
+ yield Flapjack::Data::Event.new(parsed) if block_given?
+ end
end
end
- private
+ def wait_for_queue(queue)
+ Flapjack.redis.brpop("#{queue}_actions")
+ end
def process_event(event)
- pending = Flapjack::Data::Event.pending_count(@queue, :redis => @redis)
- @logger.debug("#{pending} events waiting on the queue")
- @logger.debug("Raw event received: #{event.inspect}")
+ Flapjack.logger.debug {
+ pending = Flapjack::Data::Event.pending_count(@queue)
+ "#{pending} events waiting on the queue"
+ }
+ Flapjack.logger.debug { "Event received: #{event.inspect}" }
+ Flapjack.logger.debug { "Processing Event: #{event.dump}" }
- if ('noop' == event.type)
- return
+ timestamp = Time.now
+
+ event_condition = case event.state
+ when 'acknowledgement', /\Atest_notifications(?:\s+#{Flapjack::Data::Condition.unhealthy.keys.join('|')})?\z/
+ nil
+ else
+ cond = Flapjack::Data::Condition.for_name(event.state)
+ if cond.nil?
+ Flapjack.logger.error { "Invalid event received: #{event.inspect}" }
+ Flapjack::Data::Statistic.lock do
+ @global_stats.all_events += 1
+ @global_stats.invalid_events += 1
+ @instance_stats.all_events += 1
+ @instance_stats.invalid_events += 1
+ @global_stats.save!
+ @instance_stats.save!
+ end
+ return
+ end
+ cond
end
- event_str = "#{event.id}, #{event.type}, #{event.state}, #{event.summary}"
- event_str << ", #{Time.at(event.time).to_s}" if event.time
- @logger.debug("Processing Event: #{event_str}")
+ Flapjack::Data::Check.lock(Flapjack::Data::State,
+ Flapjack::Data::ScheduledMaintenance, Flapjack::Data::UnscheduledMaintenance,
+ Flapjack::Data::Tag,
+ # Flapjack::Data::Route,
+ Flapjack::Data::Medium,
+ Flapjack::Data::Notification, Flapjack::Data::Statistic) do
- entity_check = Flapjack::Data::EntityCheck.for_event_id(event.id, :create_entity => true, :redis => @redis)
- timestamp = Time.now.to_i
+ check = Flapjack::Data::Check.intersect(:name => event.id).all.first ||
+ Flapjack::Data::Check.new(:name => event.id)
- event.tags = (event.tags || Set.new) + entity_check.tags
+ # result will be nil if check has been created via API but has no events
+ old_state = check.id.nil? ? nil : check.states.first
- should_notify, previous_state = update_keys(event, entity_check, timestamp)
+ # NB new_state won't be saved unless;
+ # * the condition differs from old_state (goes into history); or
+ # * it's being used for a notification (attach to medium, notification)
+ new_state = Flapjack::Data::State.new(:created_at => timestamp,
+ :updated_at => timestamp)
- if !should_notify
- @logger.debug("Not generating notification for event #{event.id} because filtering was skipped")
- return
- elsif blocker = @filters.find {|filter| filter.block?(event, entity_check, previous_state) }
- @logger.debug("Not generating notification for event #{event.id} because this filter blocked: #{blocker.name}")
- return
- end
+ update_check(check, old_state, new_state, event, event_condition,
+ timestamp)
- @logger.info("Generating notification for event #{event_str}")
- generate_notification(event, entity_check, timestamp, previous_state)
- end
+ check.enabled = true unless event_condition.nil?
- def update_keys(event, entity_check, timestamp)
- # TODO: run touch_keys from a separate EM timer for efficiency
- touch_keys
+ ncsm_sched_maint = nil
+ if check.id.nil? && (@ncsm_duration > 0) && (@ncsm_ignore_regex.nil? ||
+ @ncsm_ignore_regex.match(check.name).nil?)
- result = true
- previous_state = nil
+ Flapjack.logger.info { "Setting scheduled maintenance for #{time_period_in_words(@ncsm_duration)}" }
+ ncsm_sched_maint = Flapjack::Data::ScheduledMaintenance.new(:start_time => timestamp,
+ :end_time => timestamp + @ncsm_duration,
+ :summary => 'Automatically created for new check')
+ ncsm_sched_maint.save!
+ end
- event.counter = @redis.hincrby('event_counters', 'all', 1)
- @redis.hincrby("event_counters:#{@instance_id}", 'all', 1)
+ check.save! # no-op if not new and not changed
+ check.scheduled_maintenances << ncsm_sched_maint unless ncsm_sched_maint.nil?
- # FIXME skip if entity_check.nil?
+ @global_stats.save!
+ @instance_stats.save!
- # FIXME: validate that the event is sane before we ever get here
- # FIXME: create an event if there is dodgy data
+ if (old_state.nil? || old_state.condition.nil?) && !event_condition.nil? &&
+ Flapjack::Data::Condition.healthy?(event_condition.name)
- case event.type
- # Service events represent current state of checks on monitored systems
- when 'service'
- if event.failure?
- # ensure that the check's hash is stored for later lookup
- # can't happen inside the multi as it must get a value
- event.id_hash = entity_check.ack_hash
- end
+ new_state.save!
+ check.states << new_state
+ check.current_state = new_state
+ old_state.destroy unless old_state.nil? # will fail if still linked
- @redis.multi do |multi|
- if event.ok?
- multi.hincrby('event_counters', 'ok', 1)
- multi.hincrby("event_counters:#{@instance_id}", 'ok', 1)
- elsif event.failure?
- multi.hincrby('event_counters', 'failure', 1)
- multi.hincrby("event_counters:#{@instance_id}", 'failure', 1)
- else
- multi.hincrby('event_counters', 'invalid', 1)
- multi.hincrby("event_counters:#{@instance_id}", 'invalid', 1)
- @logger.error("Invalid event received: #{event.inspect}")
- end
- end
+ # If the service event's condition is ok and there was no previous condition, don't alert.
+ # This stops new checks from alerting as "recovery" after they have been added.
+ Flapjack.logger.debug {
+ "Not generating notification for event #{event.id} because " \
+ "filtering was skipped"
+ }
- previous_state = entity_check.state
+ else
+ # only change notification delays on service (non-action) events;
+ # fall back to check-local, config-global or default values unless
+ # sustained by the event flow
+ init_fail_delay = (event_condition.nil? ? nil : event.initial_failure_delay) ||
+ check.initial_failure_delay ||
+ @initial_failure_delay ||
+ Flapjack::DEFAULT_INITIAL_FAILURE_DELAY
- if previous_state.nil?
- @logger.info("No previous state for event #{event.id}")
+ repeat_fail_delay = (event_condition.nil? ? nil : event.repeat_failure_delay) ||
+ check.repeat_failure_delay ||
+ @repeat_failure_delay ||
+ Flapjack::DEFAULT_REPEAT_FAILURE_DELAY
- if @ncsm_duration > 0 && (event.tags & @ncsm_ignore_tags).empty?
- @logger.info("Setting scheduled maintenance for #{time_period_in_words(@ncsm_duration)}")
- entity_check.create_scheduled_maintenance(timestamp,
- @ncsm_duration, :summary => 'Automatically created for new check')
+ filter_opts = {
+ :initial_failure_delay => init_fail_delay,
+ :repeat_failure_delay => repeat_fail_delay,
+ :old_state => old_state, :new_state => new_state,
+ :timestamp => timestamp, :duration => event.duration
+ }
+
+ # acks only go into latest_notifications
+ save_to_history = new_state.action.nil? && !event_condition.nil? &&
+ (old_state.nil? || (old_state.condition != event_condition.name))
+
+ if save_to_history
+ new_state.save!
+ check.states << new_state
+ check.current_state = new_state
+ elsif new_state.action.nil?
+ old_state.updated_at = timestamp
+ old_state.summary = new_state.summary
+ old_state.details = new_state.details
+ old_state.save!
end
- # If the service event's state is ok and there was no previous state, don't alert.
- # This stops new checks from alerting as "recovery" after they have been added.
- if event.ok?
- @logger.debug("setting skip_filters to true because there was no previous state and event is ok")
- result = false
+ blocker = @filters.find {|f| f.block?(check, filter_opts) }
+
+ if blocker.nil?
+ Flapjack.logger.info { "Generating notification for event #{event.dump}" }
+ new_state.save! unless new_state.persisted?
+ generate_notification(check, old_state, new_state, event,
+ event_condition)
+ else
+ Flapjack.logger.debug {
+ "Not generating notification for event #{event.id} " \
+ "because this filter blocked: #{blocker.name}"
+ }
end
+
end
+ end
+ end
- entity_check.update_state(event.state, :timestamp => timestamp,
- :summary => event.summary, :count => event.counter,
- :details => event.details, :perfdata => event.perfdata,
- :initial_failure_delay => event.initial_failure_delay,
- :repeat_failure_delay => event.repeat_failure_delay)
+ def update_check(check, old_state, new_state, event, event_condition, timestamp)
+ @global_stats.all_events += 1
+ @instance_stats.all_events += 1
- entity_check.update_current_scheduled_maintenance
+ event.counter = @global_stats.all_events
- # Action events represent human or automated interaction with Flapjack
- when 'action'
- # When an action event is processed, store the event.
- @redis.multi do |multi|
- multi.hset(event.id + ':actions', timestamp, event.state)
- multi.hincrby('event_counters', 'action', 1)
- multi.hincrby("event_counters:#{@instance_id}", 'action', 1)
+ # ncsm_sched_maint = nil
+
+ if event_condition.nil?
+ # Action events represent human or automated interaction with Flapjack
+ new_state.action = event.state
+ new_state.condition = old_state.condition unless old_state.nil?
+
+ unless new_state.action =~ /\Atest_notifications(?:\s+#{Flapjack::Data::Condition.unhealthy.keys.join('|')})?\z/
+ @global_stats.action_events += 1
+ @instance_stats.action_events += 1
end
else
- @redis.multi do |multi|
- multi.hincrby('event_counters', 'invalid', 1)
- multi.hincrby("event_counters:#{@instance_id}", 'invalid', 1)
+ # Service events represent current state of checks on monitored systems
+ check.failing = !Flapjack::Data::Condition.healthy?(event_condition.name)
+ check.condition = event_condition.name
+
+ if check.failing
+ @global_stats.failure_events += 1
+ @instance_stats.failure_events += 1
+ else
+ @global_stats.ok_events += 1
+ @instance_stats.ok_events += 1
end
- @logger.error("Invalid event received: #{event.inspect}")
+
+ new_state.condition = event_condition.name
+ new_state.perfdata = event.perfdata
end
- [result, previous_state]
+ new_state.summary = event.summary
+ new_state.details = event.details
end
- def generate_notification(event, entity_check, timestamp, previous_state)
- notification_type = Flapjack::Data::Notification.type_for_event(event)
- max_notified_severity = entity_check.max_notified_severity_of_current_failure
+ def generate_notification(check, old_state, new_state, event, event_condition)
+ severity = nil
- @redis.multi do |multi|
- multi.set("#{event.id}:last_#{notification_type}_notification", timestamp)
- multi.set("#{event.id}:last_#{event.state}_notification", timestamp) if event.failure?
- multi.rpush("#{event.id}:#{notification_type}_notifications", timestamp)
- multi.rpush("#{event.id}:#{event.state}_notifications", timestamp) if event.failure?
+ # accepts test_notifications without condition, for backwards compatibility
+ if new_state.action =~ /\Atest_notifications(\s+#{Flapjack::Data::Condition.unhealthy.keys.join('|')})?\z/
+ # the state won't be preserved for any time after the notification is
+ # sent via association to a state or check
+ severity = Regexp.last_match(1) || Flapjack::Data::Condition.most_unhealthy
+ else
+ latest_notif = check.latest_notifications
+
+ notification_ids_to_remove = if new_state.action.nil?
+ latest_notif.intersect(:condition => new_state.condition).ids
+ else
+ latest_notif.intersect(:action => new_state.action).ids
+ end
+ latest_notif.add(new_state)
+ latest_notif.remove_ids(*notification_ids_to_remove) unless notification_ids_to_remove.empty?
+
+ most_severe = check.most_severe
+
+ most_severe_cond = most_severe.nil? ? nil :
+ Flapjack::Data::Condition.for_name(most_severe.condition)
+
+ if !event_condition.nil? &&
+ Flapjack::Data::Condition.unhealthy.has_key?(event_condition.name) &&
+ (most_severe_cond.nil? || (event_condition < most_severe_cond))
+
+ check.most_severe = new_state
+ most_severe_cond = event_condition
+ elsif 'acknowledgement'.eql?(new_state.action)
+ check.most_severe = nil
+ end
+
+ severity = most_severe_cond.nil? ? 'ok' : most_severe_cond.name
end
- @logger.debug("Notification of type #{notification_type} is being generated for #{event.id}: " + event.inspect)
+ Flapjack.logger.info { "severity #{severity}"}
- severity = Flapjack::Data::Notification.severity_for_event(event, max_notified_severity)
+ Flapjack.logger.debug("Notification is being generated for #{event.id}: " + event.inspect)
- lc = entity_check.last_change
- state_duration = lc ? (timestamp - lc) : nil
+ event_hash = (event_condition.nil? || Flapjack::Data::Condition.healthy?(event_condition.name)) ?
+ nil : check.ack_hash
- Flapjack::Data::Notification.add(@notifier_queue, event,
- :type => notification_type, :severity => severity,
- :last_state => previous_unique_state(entity_check), :state_duration => state_duration,
- :redis => @redis)
- end
+ condition_duration = old_state.nil? ? nil :
+ (new_state.created_at - old_state.created_at)
- def previous_unique_state(entity_check)
- hs = entity_check.historical_states(nil, nil, :order => 'desc', :limit => 2)
- return { :last_state => nil, :last_summary => nil } unless hs.length == 2
- return hs.last
- end
+ notification = Flapjack::Data::Notification.new(:duration => event.duration,
+ :severity => severity, :condition_duration => condition_duration,
+ :event_hash => event_hash)
+ notification.save!
+ notification.state = new_state
+ check.notifications << notification
+
+ @notifier_queue.push(notification)
+
+ return if new_state.action =~ /\Atest_notifications(?:\s+#{Flapjack::Data::Condition.unhealthy.keys.join('|')})?\z/
+
+ Flapjack.logger.info "notification count: #{check.notification_count}"
+
+ if check.notification_count.nil?
+ check.notification_count = 1
+ else
+ check.notification_count += 1
+ end
+ check.save!
+
+ Flapjack.logger.info "#{check.name} #{check.errors.full_messages} notification count: #{check.notification_count}"
+ end
end
end