lib/rocket_job/batch/throttle.rb in rocketjob-5.1.1 vs lib/rocket_job/batch/throttle.rb in rocketjob-5.2.0.beta1
- old
+ new
@@ -1,6 +1,6 @@
-require 'active_support/concern'
+require "active_support/concern"
module RocketJob
module Batch
# Rocket Job Batch Throttling Framework.
#
@@ -29,11 +29,10 @@
module Throttle
extend ActiveSupport::Concern
included do
class_attribute :rocket_job_batch_throttles
- self.rocket_job_batch_throttles = []
end
module ClassMethods
# Add a new throttle.
#
@@ -46,46 +45,28 @@
# Or, a block that will return the filter.
# Default: :throttle_filter_class (Throttle all jobs of this class)
#
# Note: Throttles are executed in the order they are defined.
def define_batch_throttle(method_name, filter: :throttle_filter_class)
- unless filter.is_a?(Symbol) || filter.is_a?(Proc)
- raise(ArgumentError, "Filter for #{method_name} must be a Symbol or Proc")
- end
- if batch_throttle?(method_name)
- raise(ArgumentError, "Cannot define #{method_name} twice, undefine previous throttle first")
- end
-
- self.rocket_job_batch_throttles += [ThrottleDefinition.new(method_name, filter)]
+ # Duplicate to prevent modifying parent class throttles
+ definitions = rocket_job_batch_throttles ? rocket_job_batch_throttles.dup : ThrottleDefinitions.new
+ definitions.add(method_name, filter)
+ self.rocket_job_batch_throttles = definitions
end
# Undefine a previously defined throttle
def undefine_batch_throttle(method_name)
- rocket_job_batch_throttles.delete_if { |throttle| throttle.method_name == method_name }
+ return unless rocket_job_batch_throttles
+
+ definitions = rocket_job_batch_throttles.dup
+ definitions.remove(method_name)
+ self.rocket_job_batch_throttles = definitions
end
# Has a throttle been defined?
def batch_throttle?(method_name)
- rocket_job_batch_throttles.any? { |throttle| throttle.method_name == method_name }
+ rocket_job_batch_throttles.exist?(method_name)
end
end
-
- private
-
- ThrottleDefinition = Struct.new(:method_name, :filter)
-
- # Returns the matching filter, or nil if no throttles were triggered.
- def rocket_job_batch_evaluate_throttles(slice)
- rocket_job_batch_throttles.each do |throttle|
- throttle_exceeded = method(throttle.method_name).arity == 0 ? send(throttle.method_name) : send(throttle.method_name, slice)
- next unless throttle_exceeded
-
- logger.debug { "Batch Throttle: #{throttle.method_name} has been exceeded. #{self.class.name}:#{id}" }
- filter = throttle.filter
- return filter.is_a?(Proc) ? filter.call(self) : send(filter)
- end
- nil
- end
-
end
end
end