# # Handle the coordination of which server should be running the cron jobs # module Cron class Server include StandardModel # # Constants # STATE_PRIMARY = 'primary'.freeze unless defined? STATE_PRIMARY STATE_SECONDARY = 'secondary'.freeze unless defined? STATE_SECONDARY ALL_STATES = [STATE_PRIMARY, STATE_SECONDARY].freeze unless defined? ALL_STATES # # Fields # field :host_name, type: String field :pid, type: Integer field :desired_server_count, type: Integer, default: 0 field :current_server_count, type: Integer, default: 0 field :last_check_in_at, type: Time, default: Time.now.utc field :state, type: String, default: STATE_SECONDARY # # Validations # validates :host_name, presence: true validates :pid, presence: true validates :last_check_in_at, presence: true validates :state, inclusion: { in: ALL_STATES } validate :high_lander # # Go through the logic once a minute # def execute if primary? run_cron_jobs else primary = Cron::Server.where(state: STATE_PRIMARY).first if primary.blank? || primary.dead? become_primary run_cron_jobs end end time_to_next_run rescue StandardError => error App47Logger.log_error 'Unable to run cron jobs', error time_to_next_run ensure check_in end def run_cron_jobs run_jobs check_auto_scale end # # Run all cron tab jobs # def run_jobs now = Time.now.utc CronTab.all.each { |tab| tab.run if tab.time_to_run?(now) } end # # Determine the next minute to run, # def time_to_next_run 60 - Time.now.utc.to_i % 60 end # # Find a record for this server # def self.find_or_create_server Cron::Server.find_or_create_by!(host_name: Socket.gethostname, pid: Process.pid) end # # Find a the current master # def self.primary_server Cron::Server.where(state: STATE_PRIMARY).first end # # Warm up a server on the next evaluation # def self.warm_up_server return unless SystemConfiguration.auto_scaling_configured? primary_server.auto_scale([primary_server.desired_server_count + 1, 10].min) end # # Become primary, making others secondary # def become_primary Cron::Server.each(&:become_secondary) # sleep a small amount of time to randomize a new primary sleep rand(1..15) # Check to see if another node already became primary primary = Cron::Server.primary_server return if primary.present? && primary.alive? # no one else is in, so become primary update_attributes! state: STATE_PRIMARY, last_check_in_at: Time.now.utc end # # Become secondary node # def become_secondary(user = nil) if user.present? update_attributes_and_log! user, state: STATE_SECONDARY else update_attributes! state: STATE_SECONDARY end end # # Am I the primary server # def primary? alive? && STATE_PRIMARY.eql?(state) end # # Am I a secondary server # def secondary? STATE_SECONDARY.eql?(state) end # # Return true if I've reported in the last two minutes # def alive? last_check_in_at >= 90.seconds.ago.utc end # # Is the server dead, meaning is it not reporting within the last two minutes # def dead? !alive? end # # Perform a check in for the server # def check_in set last_check_in_at: Time.now.utc end # # Auto scale environment # def check_auto_scale return unless SystemConfiguration.auto_scaling_configured? if delayed_jobs_count.eql?(0) handle_zero_job_count else handle_auto_scale_jobs end end # # Returns the AWS AutoScaling Client # def client credentials = { access_key_id: sys_config.access_key_id, secret_access_key: sys_config.secret_access_key, region: sys_config.region } @client ||= Aws::AutoScaling::Client.new(credentials) end def sys_config @sys_config ||= SystemConfiguration.configuration end # # Returns the AutoScalingGroup associated with the account # def auto_scaling_group filter = { auto_scaling_group_names: [sys_config.auto_scaling_group_name] } @auto_scaling_group ||= client.describe_auto_scaling_groups(filter).auto_scaling_groups.first end # # Returns a count of the Delayed Jobs in queue that have not failed # def delayed_jobs_count @delayed_jobs_count ||= Delayed::Backend::Mongoid::Job.where(failed_at: nil).read(mode: :primary).count end # # Returns the current value of 'desired capacity' for the AutoScalingGroup # def current_desired_capacity current = auto_scaling_group.desired_capacity set current_server_count: current current rescue StandardError 0 end # # Calls the 'auto_scale' method with a 'desired_count' of 0 unless the capacity is already at 0 # def handle_zero_job_count return if current_desired_capacity.eql?(0) auto_scale end # # Calls the 'auto_scale' method with a variable 'desired_count' based on how many jobs are running # We don't need any more workers if the job count is less than 1,000 # def handle_auto_scale_jobs return if delayed_jobs_count < 50 case delayed_jobs_count when 50..250 auto_scale(1) when 251..500 auto_scale(2) when 501..1_000 auto_scale(3) when 1_001..2_000 auto_scale(4) when 2_001..3_999 auto_scale(4) when 4_000..7_999 auto_scale(5) when 8_000..10_999 auto_scale(5) when 11_000..13_999 auto_scale(6) when 14_000..17_999 auto_scale(6) else auto_scale(7) end end # # Sets the desired and minimum number of EC2 instances to run # def auto_scale(desired_count = 0) set desired_server_count: desired_count # Make sure we don't remove any workers with assigned jobs by accident return if desired_count.positive? && desired_count <= current_desired_capacity client.update_auto_scaling_group(auto_scaling_group_name: sys_config.auto_scaling_group_name, min_size: desired_count, desired_capacity: desired_count) end # # Look to make sure there is only one primary # def high_lander return if secondary? # Don't need to check if not primary primary = Cron::Server.where(state: STATE_PRIMARY).first errors.add(:state, 'there can only be one primary') unless primary.blank? || primary.eql?(self) end # # Returns the count of active servers # def active_count current_server_count end # # Returns the count of inactive servers # def inactive_count desired_server_count end end end