lib/flapjack/processor.rb in flapjack-1.6.0 vs lib/flapjack/processor.rb in flapjack-2.0.0b1

- old
+ new

@@ -1,285 +1,417 @@ #!/usr/bin/env ruby require 'chronic_duration' -require 'em-hiredis' +require 'flapjack/redis_proxy' require 'flapjack/filters/acknowledgement' require 'flapjack/filters/ok' require 'flapjack/filters/scheduled_maintenance' require 'flapjack/filters/unscheduled_maintenance' require 'flapjack/filters/delays' -require 'flapjack/data/entity_check' +require 'flapjack/data/check' require 'flapjack/data/event' -require 'flapjack/redis_pool' +require 'flapjack/data/notification' +require 'flapjack/data/statistic' + +require 'flapjack/exceptions' require 'flapjack/utility' module Flapjack class Processor include Flapjack::Utility def initialize(opts = {}) + @lock = opts[:lock] + @config = opts[:config] - @redis_config = opts[:redis_config] || {} - @logger = opts[:logger] - @redis = Flapjack::RedisPool.new(:config => @redis_config, :size => 2, :logger => @logger) + @boot_time = opts[:boot_time] @queue = @config['queue'] || 'events' - @notifier_queue = @config['notifier_queue'] || 'notifications' + @initial_failure_delay = @config['initial_failure_delay'] + if !@initial_failure_delay.is_a?(Integer) || (@initial_failure_delay < 0) + @initial_failure_delay = nil + end + @repeat_failure_delay = @config['repeat_failure_delay'] + if !@repeat_failure_delay.is_a?(Integer) || (@repeat_failure_delay < 0) + @repeat_failure_delay = nil + end + + @notifier_queue = Flapjack::RecordQueue.new(@config['notifier_queue'] || 'notifications', + Flapjack::Data::Notification) + @archive_events = @config['archive_events'] || false @events_archive_maxage = @config['events_archive_maxage'] ncsm_duration_conf = @config['new_check_scheduled_maintenance_duration'] || '100 years' @ncsm_duration = ChronicDuration.parse(ncsm_duration_conf, :keep_zero => true) - @ncsm_ignore_tags = @config['new_check_scheduled_maintenance_ignore_tags'] || [] + ncsm_ignore = @config['new_check_scheduled_maintenance_ignore_regex'] + @ncsm_ignore_regex = if ncsm_ignore.nil? || ncsm_ignore.strip.empty? + nil + else + Regexp.new(ncsm_ignore) + end - @exit_on_queue_empty = !! @config['exit_on_queue_empty'] + @exit_on_queue_empty = !!@config['exit_on_queue_empty'] - options = { :logger => opts[:logger], :redis => @redis } - @filters = [] - @filters << Flapjack::Filters::Ok.new(options) - @filters << Flapjack::Filters::ScheduledMaintenance.new(options) - @filters << Flapjack::Filters::UnscheduledMaintenance.new(options) - @filters << Flapjack::Filters::Delays.new(options) - @filters << Flapjack::Filters::Acknowledgement.new(options) + @filters = [Flapjack::Filters::Ok.new, + Flapjack::Filters::ScheduledMaintenance.new, + Flapjack::Filters::UnscheduledMaintenance.new, + Flapjack::Filters::Delays.new, + Flapjack::Filters::Acknowledgement.new] - boot_time = opts[:boot_time] fqdn = `/bin/hostname -f`.chomp pid = Process.pid @instance_id = "#{fqdn}:#{pid}" + end - # FIXME: all of the below keys assume there is only ever one executive running; - # we could generate a fuid and save it to disk, and prepend it from that - # point on... + def start_stats + empty_stats = {:created_at => @boot_time, :all_events => 0, + :ok_events => 0, :failure_events => 0, :action_events => 0, + :invalid_events => 0} - # FIXME: add an administrative function to reset all event counters + @global_stats = Flapjack::Data::Statistic. + intersect(:instance_name => 'global').all.first - @redis.hset('event_counters', 'all', 0) if @redis.hget('event_counters', 'all').nil? - @redis.hset('event_counters', 'ok', 0) if @redis.hget('event_counters', 'ok').nil? - @redis.hset('event_counters', 'failure', 0) if @redis.hget('event_counters', 'failure').nil? - @redis.hset('event_counters', 'action', 0) if @redis.hget('event_counters', 'action').nil? - @redis.hset('event_counters', 'invalid', 0) if @redis.hget('event_counters', 'invalid').nil? + if @global_stats.nil? + @global_stats = Flapjack::Data::Statistic.new(empty_stats.merge( + :instance_name => 'global')) + @global_stats.save! + end - @redis.hset("executive_instance:#{@instance_id}", 'boot_time', boot_time.to_i) - @redis.hset("event_counters:#{@instance_id}", 'all', 0) - @redis.hset("event_counters:#{@instance_id}", 'ok', 0) - @redis.hset("event_counters:#{@instance_id}", 'failure', 0) - @redis.hset("event_counters:#{@instance_id}", 'action', 0) - @redis.hset("event_counters:#{@instance_id}", 'invalid', 0) - touch_keys + @instance_stats = Flapjack::Data::Statistic.new(empty_stats.merge( + :instance_name => @instance_id)) + @instance_stats.save! end - # expire instance keys after one week - # TODO: set up a separate EM timer to reset key expiry every minute - # and reduce the expiry to, say, five minutes - # TODO: remove these keys on process exit - def touch_keys - [ "executive_instance:#{@instance_id}", - "event_counters:#{@instance_id}", - "event_counters:#{@instance_id}", - "event_counters:#{@instance_id}", - "event_counters:#{@instance_id}" ].each {|key| - @redis.expire(key, 1036800) - } - end - def start - @logger.info("Booting main loop.") + Flapjack.logger.info("Booting main loop.") - until @should_quit - @logger.debug("Waiting for event...") - event = Flapjack::Data::Event.next(@queue, - :redis => @redis, - :archive_events => @archive_events, - :events_archive_maxage => @events_archive_maxage, - :logger => @logger, - :block => ! @exit_on_queue_empty ) - if @exit_on_queue_empty && event.nil? && Flapjack::Data::Event.pending_count(@queue, :redis => @redis) - # SHUT IT ALL DOWN!!! - @logger.warn "Shutting down as exit_on_queue_empty is true, and the queue is empty" - Process.kill('INT', Process.pid) - break - end + begin + Zermelo.redis = Flapjack.redis - if event.nil? - @redis.hincrby('event_counters', 'all', 1) - @redis.hincrby("event_counters:#{@instance_id}", 'all', 1) - @redis.hincrby('event_counters', 'invalid', 1) - @redis.hincrby("event_counters:#{@instance_id}", 'invalid', 1) - else - process_event(event) + start_stats + + queue = (@config['queue'] || 'events') + + loop do + @lock.synchronize do + foreach_on_queue(queue) {|event| process_event(event)} + end + + raise Flapjack::GlobalStop if @exit_on_queue_empty + + wait_for_queue(queue) end + + ensure + @instance_stats.destroy unless @instance_stats.nil? || !@instance_stats.persisted? + Flapjack.redis.quit end + end - @logger.info("Exiting main loop.") + def stop_type + :exception end - # this must use a separate connection to the main Executive one, as it's running - # from a different fiber while the main one is blocking. - def stop - unless @should_quit - @should_quit = true - redis_uri = @redis_config[:path] || - "redis://#{@redis_config[:host] || '127.0.0.1'}:#{@redis_config[:port] || '6379'}/#{@redis_config[:db] || '0'}" - shutdown_redis = EM::Hiredis.connect(redis_uri) - shutdown_redis.rpush('events', Flapjack.dump_json('type' => 'noop')) + private + + def foreach_on_queue(queue, opts = {}) + base_time_str = Time.now.utc.strftime "%Y%m%d%H" + rejects = "events_rejected:#{base_time_str}" + archive = @archive_events ? "events_archive:#{base_time_str}" : nil + max_age = archive ? @events_archive_maxage : nil + + while event_json = (archive ? Flapjack.redis.rpoplpush(queue, archive) : + Flapjack.redis.rpop(queue)) + parsed, errors = Flapjack::Data::Event.parse_and_validate(event_json) + if !errors.nil? && !errors.empty? + Flapjack.redis.multi do |multi| + if archive + multi.lrem(archive, 1, event_json) + end + multi.lpush(rejects, event_json) + @global_stats.all_events += 1 + @global_stats.invalid_events += 1 + @instance_stats.all_events += 1 + @instance_stats.invalid_events += 1 + if archive + multi.expire(archive, max_age) + end + end + Flapjack::Data::Statistic.lock do + @global_stats.save! + @instance_stats.save! + end + Flapjack.logger.error { + error_str = errors.nil? ? '' : errors.join(', ') + "Invalid event data received, #{error_str} #{parsed.inspect}" + } + else + Flapjack.redis.expire(archive, max_age) if archive + yield Flapjack::Data::Event.new(parsed) if block_given? + end end end - private + def wait_for_queue(queue) + Flapjack.redis.brpop("#{queue}_actions") + end def process_event(event) - pending = Flapjack::Data::Event.pending_count(@queue, :redis => @redis) - @logger.debug("#{pending} events waiting on the queue") - @logger.debug("Raw event received: #{event.inspect}") + Flapjack.logger.debug { + pending = Flapjack::Data::Event.pending_count(@queue) + "#{pending} events waiting on the queue" + } + Flapjack.logger.debug { "Event received: #{event.inspect}" } + Flapjack.logger.debug { "Processing Event: #{event.dump}" } - if ('noop' == event.type) - return + timestamp = Time.now + + event_condition = case event.state + when 'acknowledgement', /\Atest_notifications(?:\s+#{Flapjack::Data::Condition.unhealthy.keys.join('|')})?\z/ + nil + else + cond = Flapjack::Data::Condition.for_name(event.state) + if cond.nil? + Flapjack.logger.error { "Invalid event received: #{event.inspect}" } + Flapjack::Data::Statistic.lock do + @global_stats.all_events += 1 + @global_stats.invalid_events += 1 + @instance_stats.all_events += 1 + @instance_stats.invalid_events += 1 + @global_stats.save! + @instance_stats.save! + end + return + end + cond end - event_str = "#{event.id}, #{event.type}, #{event.state}, #{event.summary}" - event_str << ", #{Time.at(event.time).to_s}" if event.time - @logger.debug("Processing Event: #{event_str}") + Flapjack::Data::Check.lock(Flapjack::Data::State, + Flapjack::Data::ScheduledMaintenance, Flapjack::Data::UnscheduledMaintenance, + Flapjack::Data::Tag, + # Flapjack::Data::Route, + Flapjack::Data::Medium, + Flapjack::Data::Notification, Flapjack::Data::Statistic) do - entity_check = Flapjack::Data::EntityCheck.for_event_id(event.id, :create_entity => true, :redis => @redis) - timestamp = Time.now.to_i + check = Flapjack::Data::Check.intersect(:name => event.id).all.first || + Flapjack::Data::Check.new(:name => event.id) - event.tags = (event.tags || Set.new) + entity_check.tags + # result will be nil if check has been created via API but has no events + old_state = check.id.nil? ? nil : check.states.first - should_notify, previous_state = update_keys(event, entity_check, timestamp) + # NB new_state won't be saved unless; + # * the condition differs from old_state (goes into history); or + # * it's being used for a notification (attach to medium, notification) + new_state = Flapjack::Data::State.new(:created_at => timestamp, + :updated_at => timestamp) - if !should_notify - @logger.debug("Not generating notification for event #{event.id} because filtering was skipped") - return - elsif blocker = @filters.find {|filter| filter.block?(event, entity_check, previous_state) } - @logger.debug("Not generating notification for event #{event.id} because this filter blocked: #{blocker.name}") - return - end + update_check(check, old_state, new_state, event, event_condition, + timestamp) - @logger.info("Generating notification for event #{event_str}") - generate_notification(event, entity_check, timestamp, previous_state) - end + check.enabled = true unless event_condition.nil? - def update_keys(event, entity_check, timestamp) - # TODO: run touch_keys from a separate EM timer for efficiency - touch_keys + ncsm_sched_maint = nil + if check.id.nil? && (@ncsm_duration > 0) && (@ncsm_ignore_regex.nil? || + @ncsm_ignore_regex.match(check.name).nil?) - result = true - previous_state = nil + Flapjack.logger.info { "Setting scheduled maintenance for #{time_period_in_words(@ncsm_duration)}" } + ncsm_sched_maint = Flapjack::Data::ScheduledMaintenance.new(:start_time => timestamp, + :end_time => timestamp + @ncsm_duration, + :summary => 'Automatically created for new check') + ncsm_sched_maint.save! + end - event.counter = @redis.hincrby('event_counters', 'all', 1) - @redis.hincrby("event_counters:#{@instance_id}", 'all', 1) + check.save! # no-op if not new and not changed + check.scheduled_maintenances << ncsm_sched_maint unless ncsm_sched_maint.nil? - # FIXME skip if entity_check.nil? + @global_stats.save! + @instance_stats.save! - # FIXME: validate that the event is sane before we ever get here - # FIXME: create an event if there is dodgy data + if (old_state.nil? || old_state.condition.nil?) && !event_condition.nil? && + Flapjack::Data::Condition.healthy?(event_condition.name) - case event.type - # Service events represent current state of checks on monitored systems - when 'service' - if event.failure? - # ensure that the check's hash is stored for later lookup - # can't happen inside the multi as it must get a value - event.id_hash = entity_check.ack_hash - end + new_state.save! + check.states << new_state + check.current_state = new_state + old_state.destroy unless old_state.nil? # will fail if still linked - @redis.multi do |multi| - if event.ok? - multi.hincrby('event_counters', 'ok', 1) - multi.hincrby("event_counters:#{@instance_id}", 'ok', 1) - elsif event.failure? - multi.hincrby('event_counters', 'failure', 1) - multi.hincrby("event_counters:#{@instance_id}", 'failure', 1) - else - multi.hincrby('event_counters', 'invalid', 1) - multi.hincrby("event_counters:#{@instance_id}", 'invalid', 1) - @logger.error("Invalid event received: #{event.inspect}") - end - end + # If the service event's condition is ok and there was no previous condition, don't alert. + # This stops new checks from alerting as "recovery" after they have been added. + Flapjack.logger.debug { + "Not generating notification for event #{event.id} because " \ + "filtering was skipped" + } - previous_state = entity_check.state + else + # only change notification delays on service (non-action) events; + # fall back to check-local, config-global or default values unless + # sustained by the event flow + init_fail_delay = (event_condition.nil? ? nil : event.initial_failure_delay) || + check.initial_failure_delay || + @initial_failure_delay || + Flapjack::DEFAULT_INITIAL_FAILURE_DELAY - if previous_state.nil? - @logger.info("No previous state for event #{event.id}") + repeat_fail_delay = (event_condition.nil? ? nil : event.repeat_failure_delay) || + check.repeat_failure_delay || + @repeat_failure_delay || + Flapjack::DEFAULT_REPEAT_FAILURE_DELAY - if @ncsm_duration > 0 && (event.tags & @ncsm_ignore_tags).empty? - @logger.info("Setting scheduled maintenance for #{time_period_in_words(@ncsm_duration)}") - entity_check.create_scheduled_maintenance(timestamp, - @ncsm_duration, :summary => 'Automatically created for new check') + filter_opts = { + :initial_failure_delay => init_fail_delay, + :repeat_failure_delay => repeat_fail_delay, + :old_state => old_state, :new_state => new_state, + :timestamp => timestamp, :duration => event.duration + } + + # acks only go into latest_notifications + save_to_history = new_state.action.nil? && !event_condition.nil? && + (old_state.nil? || (old_state.condition != event_condition.name)) + + if save_to_history + new_state.save! + check.states << new_state + check.current_state = new_state + elsif new_state.action.nil? + old_state.updated_at = timestamp + old_state.summary = new_state.summary + old_state.details = new_state.details + old_state.save! end - # If the service event's state is ok and there was no previous state, don't alert. - # This stops new checks from alerting as "recovery" after they have been added. - if event.ok? - @logger.debug("setting skip_filters to true because there was no previous state and event is ok") - result = false + blocker = @filters.find {|f| f.block?(check, filter_opts) } + + if blocker.nil? + Flapjack.logger.info { "Generating notification for event #{event.dump}" } + new_state.save! unless new_state.persisted? + generate_notification(check, old_state, new_state, event, + event_condition) + else + Flapjack.logger.debug { + "Not generating notification for event #{event.id} " \ + "because this filter blocked: #{blocker.name}" + } end + end + end + end - entity_check.update_state(event.state, :timestamp => timestamp, - :summary => event.summary, :count => event.counter, - :details => event.details, :perfdata => event.perfdata, - :initial_failure_delay => event.initial_failure_delay, - :repeat_failure_delay => event.repeat_failure_delay) + def update_check(check, old_state, new_state, event, event_condition, timestamp) + @global_stats.all_events += 1 + @instance_stats.all_events += 1 - entity_check.update_current_scheduled_maintenance + event.counter = @global_stats.all_events - # Action events represent human or automated interaction with Flapjack - when 'action' - # When an action event is processed, store the event. - @redis.multi do |multi| - multi.hset(event.id + ':actions', timestamp, event.state) - multi.hincrby('event_counters', 'action', 1) - multi.hincrby("event_counters:#{@instance_id}", 'action', 1) + # ncsm_sched_maint = nil + + if event_condition.nil? + # Action events represent human or automated interaction with Flapjack + new_state.action = event.state + new_state.condition = old_state.condition unless old_state.nil? + + unless new_state.action =~ /\Atest_notifications(?:\s+#{Flapjack::Data::Condition.unhealthy.keys.join('|')})?\z/ + @global_stats.action_events += 1 + @instance_stats.action_events += 1 end else - @redis.multi do |multi| - multi.hincrby('event_counters', 'invalid', 1) - multi.hincrby("event_counters:#{@instance_id}", 'invalid', 1) + # Service events represent current state of checks on monitored systems + check.failing = !Flapjack::Data::Condition.healthy?(event_condition.name) + check.condition = event_condition.name + + if check.failing + @global_stats.failure_events += 1 + @instance_stats.failure_events += 1 + else + @global_stats.ok_events += 1 + @instance_stats.ok_events += 1 end - @logger.error("Invalid event received: #{event.inspect}") + + new_state.condition = event_condition.name + new_state.perfdata = event.perfdata end - [result, previous_state] + new_state.summary = event.summary + new_state.details = event.details end - def generate_notification(event, entity_check, timestamp, previous_state) - notification_type = Flapjack::Data::Notification.type_for_event(event) - max_notified_severity = entity_check.max_notified_severity_of_current_failure + def generate_notification(check, old_state, new_state, event, event_condition) + severity = nil - @redis.multi do |multi| - multi.set("#{event.id}:last_#{notification_type}_notification", timestamp) - multi.set("#{event.id}:last_#{event.state}_notification", timestamp) if event.failure? - multi.rpush("#{event.id}:#{notification_type}_notifications", timestamp) - multi.rpush("#{event.id}:#{event.state}_notifications", timestamp) if event.failure? + # accepts test_notifications without condition, for backwards compatibility + if new_state.action =~ /\Atest_notifications(\s+#{Flapjack::Data::Condition.unhealthy.keys.join('|')})?\z/ + # the state won't be preserved for any time after the notification is + # sent via association to a state or check + severity = Regexp.last_match(1) || Flapjack::Data::Condition.most_unhealthy + else + latest_notif = check.latest_notifications + + notification_ids_to_remove = if new_state.action.nil? + latest_notif.intersect(:condition => new_state.condition).ids + else + latest_notif.intersect(:action => new_state.action).ids + end + latest_notif.add(new_state) + latest_notif.remove_ids(*notification_ids_to_remove) unless notification_ids_to_remove.empty? + + most_severe = check.most_severe + + most_severe_cond = most_severe.nil? ? nil : + Flapjack::Data::Condition.for_name(most_severe.condition) + + if !event_condition.nil? && + Flapjack::Data::Condition.unhealthy.has_key?(event_condition.name) && + (most_severe_cond.nil? || (event_condition < most_severe_cond)) + + check.most_severe = new_state + most_severe_cond = event_condition + elsif 'acknowledgement'.eql?(new_state.action) + check.most_severe = nil + end + + severity = most_severe_cond.nil? ? 'ok' : most_severe_cond.name end - @logger.debug("Notification of type #{notification_type} is being generated for #{event.id}: " + event.inspect) + Flapjack.logger.info { "severity #{severity}"} - severity = Flapjack::Data::Notification.severity_for_event(event, max_notified_severity) + Flapjack.logger.debug("Notification is being generated for #{event.id}: " + event.inspect) - lc = entity_check.last_change - state_duration = lc ? (timestamp - lc) : nil + event_hash = (event_condition.nil? || Flapjack::Data::Condition.healthy?(event_condition.name)) ? + nil : check.ack_hash - Flapjack::Data::Notification.add(@notifier_queue, event, - :type => notification_type, :severity => severity, - :last_state => previous_unique_state(entity_check), :state_duration => state_duration, - :redis => @redis) - end + condition_duration = old_state.nil? ? nil : + (new_state.created_at - old_state.created_at) - def previous_unique_state(entity_check) - hs = entity_check.historical_states(nil, nil, :order => 'desc', :limit => 2) - return { :last_state => nil, :last_summary => nil } unless hs.length == 2 - return hs.last - end + notification = Flapjack::Data::Notification.new(:duration => event.duration, + :severity => severity, :condition_duration => condition_duration, + :event_hash => event_hash) + notification.save! + notification.state = new_state + check.notifications << notification + + @notifier_queue.push(notification) + + return if new_state.action =~ /\Atest_notifications(?:\s+#{Flapjack::Data::Condition.unhealthy.keys.join('|')})?\z/ + + Flapjack.logger.info "notification count: #{check.notification_count}" + + if check.notification_count.nil? + check.notification_count = 1 + else + check.notification_count += 1 + end + check.save! + + Flapjack.logger.info "#{check.name} #{check.errors.full_messages} notification count: #{check.notification_count}" + end end end