lib/rocket_job/worker.rb in rocketjob-5.1.1 vs lib/rocket_job/worker.rb in rocketjob-5.2.0.beta1

- old
+ new

@@ -1,7 +1,7 @@ -require 'concurrent' -require 'forwardable' +require "concurrent" +require "forwardable" module RocketJob # Worker # # A worker runs on a single operating system thread # Is usually started under a Rocket Job server process. @@ -10,17 +10,17 @@ include ActiveSupport::Callbacks define_callbacks :running attr_accessor :id, :current_filter - attr_reader :thread, :name, :inline + attr_reader :thread, :name, :inline, :server_name # Raised when a worker is killed so that it shutdown immediately, yet cleanly. # # Note: # - It is not recommended to catch this exception since it is to shutdown workers quickly. - class Shutdown < Interrupt + class Shutdown < RuntimeError end def self.before_running(*filters, &blk) set_callback(:running, :before, *filters, &blk) end @@ -31,11 +31,11 @@ def self.around_running(*filters, &blk) set_callback(:running, :around, *filters, &blk) end - def initialize(id: 0, server_name: 'inline:0', inline: false) + def initialize(id: 0, server_name: "inline:0", inline: false) @id = id @server_name = server_name @shutdown = Concurrent::Event.new @name = "#{server_name}:#{id}" @re_check_start = Time.now @@ -74,61 +74,143 @@ # Returns [true|false] whether the shutdown indicator was set def wait_for_shutdown?(timeout = nil) @shutdown.wait(timeout) end - private - # Process jobs until it shuts down # # Params # worker_id [Integer] # The number of this worker for logging purposes def run - Thread.current.name = format('rocketjob %03i', id) - logger.info 'Started' - until shutdown? - wait = Config.max_poll_seconds - if process_available_jobs - # Keeps workers staggered across the poll interval so that - # all workers don't poll at the same time - wait = rand(wait * 1000) / 1000 - end - break if wait_for_shutdown?(wait) - end - logger.info 'Stopping' - rescue Exception => exc - logger.fatal('Unhandled exception in job processing thread', exc) - ensure - ActiveRecord::Base.clear_active_connections! if defined?(ActiveRecord::Base) - end + Thread.current.name = format("rocketjob %03i", id) + logger.info "Started" - # Process the next available job - # Returns [Boolean] whether any job was actually processed - def process_available_jobs - processed = false until shutdown? + sleep_seconds = Config.max_poll_seconds reset_filter_if_expired - job = Job.rocket_job_next_job(name, current_filter) - break unless job + job = next_available_job - SemanticLogger.named_tagged(job: job.id.to_s) do - processed = true unless job.rocket_job_work(self, false, current_filter) - + # Returns true when work was completed, but no other work is available + if job&.rocket_job_work(self, false) # Return the database connections for this thread back to the connection pool ActiveRecord::Base.clear_active_connections! if defined?(ActiveRecord::Base) + + # Stagger workers so that they don't all poll at the same time. + sleep_seconds = random_wait_interval end + + wait_for_shutdown?(sleep_seconds) end - processed + + logger.info "Stopping" + rescue Exception => e + logger.fatal("Unhandled exception in job processing thread", e) + ensure + ActiveRecord::Base.clear_active_connections! if defined?(ActiveRecord::Base) end # Resets the current job filter if the relevant time interval has passed def reset_filter_if_expired # Only clear out the current_filter after every `re_check_seconds` time = Time.now return unless (time - @re_check_start) > Config.re_check_seconds @re_check_start = time self.current_filter = Config.filter || {} + end + + # Returns [RocketJob::Job] the next job available for processing. + # Returns [nil] if no job is available for processing. + # + # Notes: + # - Destroys expired jobs + # - Runs job throttles and skips the job if it is throttled. + # - Adding that filter to the current filter to exclude from subsequent polling. + def next_available_job + until shutdown? + job = find_and_assign_job + return unless job + + if job.expired? + job.fail_on_exception! do + job.worker_name = name + job.destroy + logger.info("Destroyed expired job.") + end + next + end + + # Batch Job that is already started? + # Batch has its own throttles for slices. + return job if job.running? + + # Should this job be throttled? + next if job.fail_on_exception! { throttled_job?(job) } + + # Start this job! + job.fail_on_exception! { job.start!(name) } + return job if job.running? + end + end + + # Whether the supplied job has been throttled and should be ignored. + def throttled_job?(job) + # Evaluate job throttles, if any. + filter = job.rocket_job_throttles.matching_filter(job) + return false unless filter + + add_to_current_filter(filter) + # Restore retrieved job so that other workers can process it later + job.set(worker_name: nil, state: :queued) + true + end + + # Finds the next job to work on in priority based order + # and assigns it to this worker. + # + # Applies the current filter to exclude filtered jobs. + # + # Returns nil if no jobs are available for processing. + if Mongoid::VERSION.to_f >= 7.1 + def find_and_assign_job + SemanticLogger.silence(:info) do + scheduled = RocketJob::Job.where(run_at: nil).or(:run_at.lte => Time.now) + working = RocketJob::Job.queued.or(state: :running, sub_state: :processing) + query = RocketJob::Job.and(working, scheduled) + query = query.and(current_filter) unless current_filter.blank? + update = {"$set" => {"worker_name" => name, "state" => "running"}} + query.sort(priority: 1, _id: 1).find_one_and_update(update, bypass_document_validation: true) + end + end + else + def find_and_assign_job + SemanticLogger.silence(:info) do + scheduled = {"$or" => [{run_at: nil}, {:run_at.lte => Time.now}]} + working = {"$or" => [{state: :queued}, {state: :running, sub_state: :processing}]} + query = RocketJob::Job.and(working, scheduled) + query = query.where(current_filter) unless current_filter.blank? + update = {"$set" => {"worker_name" => name, "state" => "running"}} + query.sort(priority: 1, _id: 1).find_one_and_update(update, bypass_document_validation: true) + end + end + end + + # Add the supplied filter to the current filter. + def add_to_current_filter(filter) + filter.each_pair do |k, v| + current_filter[k] = + if (previous = current_filter[k]) + v.is_a?(Array) ? previous + v : v + else + v + end + end + current_filter + end + + # Returns [Float] a randomized poll interval in seconds up to the maximum configured poll interval. + def random_wait_interval + rand(Config.max_poll_seconds * 1000) / 1000 end end end