lib/rocket_job/plugins/job/throttle.rb in rocketjob-3.0.5 vs lib/rocket_job/plugins/job/throttle.rb in rocketjob-3.1.0
- old
+ new
@@ -1,74 +1,99 @@
require 'active_support/concern'
module RocketJob
module Plugins
module Job
- # Throttle number of jobs of a specific class that are processed at the same time.
+ # Rocket Job Throttling Framework.
#
# Example:
+ # # Do not run this job when the MySQL slave delay exceeds 5 minutes.
# class MyJob < RocketJob
- # # Maximum number of workers to process instances of this job at the same time.
- # self.throttle_running_jobs = 25
+ # # Define a custom mysql throttle
+ # # Prevents all jobs of this class from running on the current server.
+ # define_throttle :mysql_throttle_exceeded?
#
# def perform
# # ....
# end
- # end
#
- # Notes:
- # - The actual number will be around this value, it con go over slightly and
- # can drop depending on check interval can drop slightly below this value.
- # - By avoid hard locks and counters performance can be maintained while still
- # supporting good enough throttling.
- # - If throughput is not as important as preventing brief spikes when many
- # workers are running, add a double check into the perform:
- # class MyJob < RocketJob
- # self.throttle_running_jobs = 25
+ # private
#
- # def perform
- # # (Optional) Prevent a brief spike from exceeding the wax worker throttle
- # self.class.throttle_double_check
- #
- # # ....
- # end
+ # # Returns true if the MySQL slave delay exceeds 5 minutes
+ # def mysql_throttle_exceeded?
+ # status = ActiveRecord::Base.connection.select_one('show slave status')
+ # seconds_delay = Hash(status)['Seconds_Behind_Master'].to_i
+ # seconds_delay >= 300
# end
+ # end
module Throttle
extend ActiveSupport::Concern
included do
- class_attribute :throttle_running_jobs
- self.throttle_running_jobs = nil
+ class_attribute :rocket_job_throttles
+ self.rocket_job_throttles = []
end
- # Throttle to add when the throttle is exceeded
- def throttle_filter
- {:_type.nin => [self.class.name]}
+ module ClassMethods
+ # Add a new throttle.
+ #
+ # Parameters:
+ # method_name: [Symbol]
+ # Name of method to call to evaluate whether a throttle has been exceeded.
+ # Note: Must return true or false.
+ # filter: [Symbol|Proc]
+ # Name of method to call to return the filter when the throttle has been exceeded.
+ # Or, a block that will return the filter.
+ # Default: :throttle_filter_class (Throttle all jobs of this class)
+ #
+ # Note: Throttles are executed in the order they are defined.
+ def define_throttle(method_name, filter: :throttle_filter_class)
+ raise(ArgumentError, "Filter for #{method_name} must be a Symbol or Proc") unless filter.is_a?(Symbol) || filter.is_a?(Proc)
+ raise(ArgumentError, "Cannot define #{method_name} twice, undefine previous throttle first") if has_throttle?(method_name)
+
+ self.rocket_job_throttles += [ThrottleDefinition.new(method_name, filter)]
+ end
+
+ # Undefine a previously defined throttle
+ def undefine_throttle(method_name)
+ rocket_job_throttles.delete_if { |throttle| throttle.method_name }
+ end
+
+ # Has a throttle been defined?
+ def has_throttle?(method_name)
+ rocket_job_throttles.find { |throttle| throttle.method_name == method_name }
+ end
end
- # Returns [Boolean] whether the throttle for this job has been exceeded
- def throttle_exceeded?
- throttle_running_jobs && (throttle_running_jobs != 0) ? (self.class.running.where(:id.ne => id).count >= throttle_running_jobs) : false
+ # Default throttle to use when the throttle is exceeded.
+ # When the throttle has been exceeded all jobs of this class will be ignored until the
+ # next refresh. `RocketJob::Config::re_check_seconds` which by default is 60 seconds.
+ def throttle_filter_class
+ {:_type.nin => [self.class.name]}
end
- # Prevent a brief spike from exceeding the wax worker throttle
- def throttle_double_check(check_seconds = 1)
- while !throttle_exceeded?
- sleep check_seconds
- end
+ # Filter out only this instance of the job.
+ # When the throttle has been exceeded this job will be ignored by this server until the next refresh.
+ # `RocketJob::Config::re_check_seconds` which by default is 60 seconds.
+ def throttle_filter_id
+ {:id.nin => [id]}
end
- # Merge filter(s)
- def throttle_merge_filter(target, source)
- source.each_pair do |k, v|
- target[k] =
- if previous = target[k]
- v.is_a?(Array) ? previous + v : v
- else
- v
- end
+ private
+
+ ThrottleDefinition = Struct.new(:method_name, :filter)
+
+ # Returns the matching filter, or nil if no throttles were triggered.
+ def rocket_job_evaluate_throttles
+ rocket_job_throttles.each do |throttle|
+ # Throttle exceeded?
+ if send(throttle.method_name)
+ logger.debug { "Throttle: #{throttle.method_name} has been exceeded. #{self.class.name}:#{id}" }
+ filter = throttle.filter
+ return filter.is_a?(Proc) ? filter.call(self) : send(filter)
+ end
end
- target
+ nil
end
end
end