Sha256: 2656cccf04aa5f81a0ce47d2fb3e92c6515e11dbc944506b536edb004c9a33c0

Contents?: true

Size: 1.35 KB

Versions: 5

Compression:

Stored size: 1.35 KB

Contents

# typed: true
# frozen_string_literal: true

module JobIteration
  # ThrottleEnumerator allows you to throttle iterations
  # based on external signal (e.g. database health).
  # @example
  #   def build_enumerator(_params, cursor:)
  #     enumerator_builder.build_throttle_enumerator(
  #       enumerator_builder.active_record_on_batches(
  #         Account.inactive,
  #         cursor: cursor
  #       ),
  #       throttle_on: -> { DatabaseStatus.unhealthy? },
  #       backoff: 30.seconds
  #     )
  #   end
  # The enumerator from above will mimic +active_record_on_batches+,
  # except when +DatabaseStatus.unhealthy?+ starts to return true.
  # In that case, it will re-enqueue the job with a specified backoff.
  class ThrottleEnumerator
    def initialize(enum, job, throttle_on:, backoff:)
      @enum = enum
      @job = job
      @throttle_on = throttle_on
      @backoff = backoff
    end

    def to_enum
      Enumerator.new(-> { @enum.size }) do |yielder|
        @enum.each do |*val|
          if should_throttle?
            ActiveSupport::Notifications.instrument("throttled.iteration", job_class: @job.class.name)
            @job.retry_job(wait: @backoff)
            throw(:abort, :skip_complete_callbacks)
          end

          yielder.yield(*val)
        end
      end
    end

    def should_throttle?
      @throttle_on.call
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
job-iteration-1.3.5 lib/job-iteration/throttle_enumerator.rb
job-iteration-1.3.4 lib/job-iteration/throttle_enumerator.rb
job-iteration-1.3.3 lib/job-iteration/throttle_enumerator.rb
job-iteration-1.3.2 lib/job-iteration/throttle_enumerator.rb
job-iteration-1.3.0 lib/job-iteration/throttle_enumerator.rb