lib/rocket_job/plugins/job/throttle.rb in rocketjob-3.0.5 vs lib/rocket_job/plugins/job/throttle.rb in rocketjob-3.1.0

- old
+ new

@@ -1,74 +1,99 @@ require 'active_support/concern' module RocketJob module Plugins module Job - # Throttle number of jobs of a specific class that are processed at the same time. + # Rocket Job Throttling Framework. # # Example: + # # Do not run this job when the MySQL slave delay exceeds 5 minutes. # class MyJob < RocketJob - # # Maximum number of workers to process instances of this job at the same time. - # self.throttle_running_jobs = 25 + # # Define a custom mysql throttle + # # Prevents all jobs of this class from running on the current server. + # define_throttle :mysql_throttle_exceeded? # # def perform # # .... # end - # end # - # Notes: - # - The actual number will be around this value, it con go over slightly and - # can drop depending on check interval can drop slightly below this value. - # - By avoid hard locks and counters performance can be maintained while still - # supporting good enough throttling. - # - If throughput is not as important as preventing brief spikes when many - # workers are running, add a double check into the perform: - # class MyJob < RocketJob - # self.throttle_running_jobs = 25 + # private # - # def perform - # # (Optional) Prevent a brief spike from exceeding the wax worker throttle - # self.class.throttle_double_check - # - # # .... - # end + # # Returns true if the MySQL slave delay exceeds 5 minutes + # def mysql_throttle_exceeded? + # status = ActiveRecord::Base.connection.select_one('show slave status') + # seconds_delay = Hash(status)['Seconds_Behind_Master'].to_i + # seconds_delay >= 300 # end + # end module Throttle extend ActiveSupport::Concern included do - class_attribute :throttle_running_jobs - self.throttle_running_jobs = nil + class_attribute :rocket_job_throttles + self.rocket_job_throttles = [] end - # Throttle to add when the throttle is exceeded - def throttle_filter - {:_type.nin => [self.class.name]} + module ClassMethods + # Add a new throttle. + # + # Parameters: + # method_name: [Symbol] + # Name of method to call to evaluate whether a throttle has been exceeded. + # Note: Must return true or false. + # filter: [Symbol|Proc] + # Name of method to call to return the filter when the throttle has been exceeded. + # Or, a block that will return the filter. + # Default: :throttle_filter_class (Throttle all jobs of this class) + # + # Note: Throttles are executed in the order they are defined. + def define_throttle(method_name, filter: :throttle_filter_class) + raise(ArgumentError, "Filter for #{method_name} must be a Symbol or Proc") unless filter.is_a?(Symbol) || filter.is_a?(Proc) + raise(ArgumentError, "Cannot define #{method_name} twice, undefine previous throttle first") if has_throttle?(method_name) + + self.rocket_job_throttles += [ThrottleDefinition.new(method_name, filter)] + end + + # Undefine a previously defined throttle + def undefine_throttle(method_name) + rocket_job_throttles.delete_if { |throttle| throttle.method_name } + end + + # Has a throttle been defined? + def has_throttle?(method_name) + rocket_job_throttles.find { |throttle| throttle.method_name == method_name } + end end - # Returns [Boolean] whether the throttle for this job has been exceeded - def throttle_exceeded? - throttle_running_jobs && (throttle_running_jobs != 0) ? (self.class.running.where(:id.ne => id).count >= throttle_running_jobs) : false + # Default throttle to use when the throttle is exceeded. + # When the throttle has been exceeded all jobs of this class will be ignored until the + # next refresh. `RocketJob::Config::re_check_seconds` which by default is 60 seconds. + def throttle_filter_class + {:_type.nin => [self.class.name]} end - # Prevent a brief spike from exceeding the wax worker throttle - def throttle_double_check(check_seconds = 1) - while !throttle_exceeded? - sleep check_seconds - end + # Filter out only this instance of the job. + # When the throttle has been exceeded this job will be ignored by this server until the next refresh. + # `RocketJob::Config::re_check_seconds` which by default is 60 seconds. + def throttle_filter_id + {:id.nin => [id]} end - # Merge filter(s) - def throttle_merge_filter(target, source) - source.each_pair do |k, v| - target[k] = - if previous = target[k] - v.is_a?(Array) ? previous + v : v - else - v - end + private + + ThrottleDefinition = Struct.new(:method_name, :filter) + + # Returns the matching filter, or nil if no throttles were triggered. + def rocket_job_evaluate_throttles + rocket_job_throttles.each do |throttle| + # Throttle exceeded? + if send(throttle.method_name) + logger.debug { "Throttle: #{throttle.method_name} has been exceeded. #{self.class.name}:#{id}" } + filter = throttle.filter + return filter.is_a?(Proc) ? filter.call(self) : send(filter) + end end - target + nil end end end