lib/rocket_job/worker.rb in rocketjob-5.1.1 vs lib/rocket_job/worker.rb in rocketjob-5.2.0.beta1
- old
+ new
@@ -1,7 +1,7 @@
-require 'concurrent'
-require 'forwardable'
+require "concurrent"
+require "forwardable"
module RocketJob
# Worker
#
# A worker runs on a single operating system thread
# Is usually started under a Rocket Job server process.
@@ -10,17 +10,17 @@
include ActiveSupport::Callbacks
define_callbacks :running
attr_accessor :id, :current_filter
- attr_reader :thread, :name, :inline
+ attr_reader :thread, :name, :inline, :server_name
# Raised when a worker is killed so that it shutdown immediately, yet cleanly.
#
# Note:
# - It is not recommended to catch this exception since it is to shutdown workers quickly.
- class Shutdown < Interrupt
+ class Shutdown < RuntimeError
end
def self.before_running(*filters, &blk)
set_callback(:running, :before, *filters, &blk)
end
@@ -31,11 +31,11 @@
def self.around_running(*filters, &blk)
set_callback(:running, :around, *filters, &blk)
end
- def initialize(id: 0, server_name: 'inline:0', inline: false)
+ def initialize(id: 0, server_name: "inline:0", inline: false)
@id = id
@server_name = server_name
@shutdown = Concurrent::Event.new
@name = "#{server_name}:#{id}"
@re_check_start = Time.now
@@ -74,61 +74,143 @@
# Returns [true|false] whether the shutdown indicator was set
def wait_for_shutdown?(timeout = nil)
@shutdown.wait(timeout)
end
- private
-
# Process jobs until it shuts down
#
# Params
# worker_id [Integer]
# The number of this worker for logging purposes
def run
- Thread.current.name = format('rocketjob %03i', id)
- logger.info 'Started'
- until shutdown?
- wait = Config.max_poll_seconds
- if process_available_jobs
- # Keeps workers staggered across the poll interval so that
- # all workers don't poll at the same time
- wait = rand(wait * 1000) / 1000
- end
- break if wait_for_shutdown?(wait)
- end
- logger.info 'Stopping'
- rescue Exception => exc
- logger.fatal('Unhandled exception in job processing thread', exc)
- ensure
- ActiveRecord::Base.clear_active_connections! if defined?(ActiveRecord::Base)
- end
+ Thread.current.name = format("rocketjob %03i", id)
+ logger.info "Started"
- # Process the next available job
- # Returns [Boolean] whether any job was actually processed
- def process_available_jobs
- processed = false
until shutdown?
+ sleep_seconds = Config.max_poll_seconds
reset_filter_if_expired
- job = Job.rocket_job_next_job(name, current_filter)
- break unless job
+ job = next_available_job
- SemanticLogger.named_tagged(job: job.id.to_s) do
- processed = true unless job.rocket_job_work(self, false, current_filter)
-
+ # Returns true when work was completed, but no other work is available
+ if job&.rocket_job_work(self, false)
# Return the database connections for this thread back to the connection pool
ActiveRecord::Base.clear_active_connections! if defined?(ActiveRecord::Base)
+
+ # Stagger workers so that they don't all poll at the same time.
+ sleep_seconds = random_wait_interval
end
+
+ wait_for_shutdown?(sleep_seconds)
end
- processed
+
+ logger.info "Stopping"
+ rescue Exception => e
+ logger.fatal("Unhandled exception in job processing thread", e)
+ ensure
+ ActiveRecord::Base.clear_active_connections! if defined?(ActiveRecord::Base)
end
# Resets the current job filter if the relevant time interval has passed
def reset_filter_if_expired
# Only clear out the current_filter after every `re_check_seconds`
time = Time.now
return unless (time - @re_check_start) > Config.re_check_seconds
@re_check_start = time
self.current_filter = Config.filter || {}
+ end
+
+ # Returns [RocketJob::Job] the next job available for processing.
+ # Returns [nil] if no job is available for processing.
+ #
+ # Notes:
+ # - Destroys expired jobs
+ # - Runs job throttles and skips the job if it is throttled.
+ # - Adding that filter to the current filter to exclude from subsequent polling.
+ def next_available_job
+ until shutdown?
+ job = find_and_assign_job
+ return unless job
+
+ if job.expired?
+ job.fail_on_exception! do
+ job.worker_name = name
+ job.destroy
+ logger.info("Destroyed expired job.")
+ end
+ next
+ end
+
+ # Batch Job that is already started?
+ # Batch has its own throttles for slices.
+ return job if job.running?
+
+ # Should this job be throttled?
+ next if job.fail_on_exception! { throttled_job?(job) }
+
+ # Start this job!
+ job.fail_on_exception! { job.start!(name) }
+ return job if job.running?
+ end
+ end
+
+ # Whether the supplied job has been throttled and should be ignored.
+ def throttled_job?(job)
+ # Evaluate job throttles, if any.
+ filter = job.rocket_job_throttles.matching_filter(job)
+ return false unless filter
+
+ add_to_current_filter(filter)
+ # Restore retrieved job so that other workers can process it later
+ job.set(worker_name: nil, state: :queued)
+ true
+ end
+
+ # Finds the next job to work on in priority based order
+ # and assigns it to this worker.
+ #
+ # Applies the current filter to exclude filtered jobs.
+ #
+ # Returns nil if no jobs are available for processing.
+ if Mongoid::VERSION.to_f >= 7.1
+ def find_and_assign_job
+ SemanticLogger.silence(:info) do
+ scheduled = RocketJob::Job.where(run_at: nil).or(:run_at.lte => Time.now)
+ working = RocketJob::Job.queued.or(state: :running, sub_state: :processing)
+ query = RocketJob::Job.and(working, scheduled)
+ query = query.and(current_filter) unless current_filter.blank?
+ update = {"$set" => {"worker_name" => name, "state" => "running"}}
+ query.sort(priority: 1, _id: 1).find_one_and_update(update, bypass_document_validation: true)
+ end
+ end
+ else
+ def find_and_assign_job
+ SemanticLogger.silence(:info) do
+ scheduled = {"$or" => [{run_at: nil}, {:run_at.lte => Time.now}]}
+ working = {"$or" => [{state: :queued}, {state: :running, sub_state: :processing}]}
+ query = RocketJob::Job.and(working, scheduled)
+ query = query.where(current_filter) unless current_filter.blank?
+ update = {"$set" => {"worker_name" => name, "state" => "running"}}
+ query.sort(priority: 1, _id: 1).find_one_and_update(update, bypass_document_validation: true)
+ end
+ end
+ end
+
+ # Add the supplied filter to the current filter.
+ def add_to_current_filter(filter)
+ filter.each_pair do |k, v|
+ current_filter[k] =
+ if (previous = current_filter[k])
+ v.is_a?(Array) ? previous + v : v
+ else
+ v
+ end
+ end
+ current_filter
+ end
+
+ # Returns [Float] a randomized poll interval in seconds up to the maximum configured poll interval.
+ def random_wait_interval
+ rand(Config.max_poll_seconds * 1000) / 1000
end
end
end