lib/rocket_job/worker.rb in rocketjob-5.4.1 vs lib/rocket_job/worker.rb in rocketjob-6.0.0.rc1
- old
+ new
@@ -1,81 +1,58 @@
-require "concurrent"
-require "forwardable"
module RocketJob
# Worker
#
# A worker runs on a single operating system thread
# Is usually started under a Rocket Job server process.
class Worker
include SemanticLogger::Loggable
- include ActiveSupport::Callbacks
- define_callbacks :running
-
attr_accessor :id, :current_filter
- attr_reader :thread, :name, :inline, :server_name
+ attr_reader :name, :server_name
# Raised when a worker is killed so that it shutdown immediately, yet cleanly.
#
# Note:
# - It is not recommended to catch this exception since it is to shutdown workers quickly.
class Shutdown < RuntimeError
end
- def self.before_running(*filters, &blk)
- set_callback(:running, :before, *filters, &blk)
- end
-
- def self.after_running(*filters, &blk)
- set_callback(:running, :after, *filters, &blk)
- end
-
- def self.around_running(*filters, &blk)
- set_callback(:running, :around, *filters, &blk)
- end
-
- def initialize(id: 0, server_name: "inline:0", inline: false)
+ def initialize(id: 0, server_name: "inline:0")
@id = id
@server_name = server_name
- @shutdown = Concurrent::Event.new
@name = "#{server_name}:#{id}"
@re_check_start = Time.now
@current_filter = Config.filter || {}
- @thread = Thread.new { run } unless inline
- @inline = inline
end
def alive?
- inline ? true : @thread.alive?
+ true
end
def backtrace
- inline ? Thread.current.backtrace : @thread.backtrace
+ Thread.current.backtrace
end
- def join(*args)
- @thread.join(*args) unless inline
+ def join(*_args)
+ true
end
- # Send each active worker the RocketJob::ShutdownException so that stops processing immediately.
def kill
- return true if inline
-
- @thread.raise(Shutdown, "Shutdown due to kill request for worker: #{name}") if @thread.alive?
+ true
end
def shutdown?
- @shutdown.set?
+ false
end
def shutdown!
- @shutdown.set
+ true
end
# Returns [true|false] whether the shutdown indicator was set
- def wait_for_shutdown?(timeout = nil)
- @shutdown.wait(timeout)
+ def wait_for_shutdown?(_timeout = nil)
+ false
end
# Process jobs until it shuts down
#
# Params
@@ -144,10 +121,12 @@
# Batch has its own throttles for slices.
return job if job.running?
# Should this job be throttled?
next if job.fail_on_exception! { throttled_job?(job) }
+ # Job failed during throttle execution?
+ next if job.failed?
# Start this job!
job.fail_on_exception! { job.start!(name) }
return job if job.running?
end
@@ -169,30 +148,17 @@
# and assigns it to this worker.
#
# Applies the current filter to exclude filtered jobs.
#
# Returns nil if no jobs are available for processing.
- if Mongoid::VERSION.to_f >= 7.1
- def find_and_assign_job
- SemanticLogger.silence(:info) do
- scheduled = RocketJob::Job.where(run_at: nil).or(:run_at.lte => Time.now)
- working = RocketJob::Job.queued.or(state: :running, sub_state: :processing)
- query = RocketJob::Job.and(working, scheduled)
- query = query.and(current_filter) unless current_filter.blank?
- update = {"$set" => {"worker_name" => name, "state" => "running"}}
- query.sort(priority: 1, _id: 1).find_one_and_update(update, bypass_document_validation: true)
- end
- end
- else
- def find_and_assign_job
- SemanticLogger.silence(:info) do
- scheduled = {"$or" => [{run_at: nil}, {:run_at.lte => Time.now}]}
- working = {"$or" => [{state: :queued}, {state: :running, sub_state: :processing}]}
- query = RocketJob::Job.and(working, scheduled)
- query = query.where(current_filter) unless current_filter.blank?
- update = {"$set" => {"worker_name" => name, "state" => "running"}}
- query.sort(priority: 1, _id: 1).find_one_and_update(update, bypass_document_validation: true)
- end
+ def find_and_assign_job
+ SemanticLogger.silence(:info) do
+ scheduled = RocketJob::Job.where(run_at: nil).or(:run_at.lte => Time.now)
+ working = RocketJob::Job.queued.or(state: "running", sub_state: "processing")
+ query = RocketJob::Job.and(working, scheduled)
+ query = query.and(current_filter) unless current_filter.blank?
+ update = {"$set" => {"worker_name" => name, "state" => "running"}}
+ query.sort(priority: 1, _id: 1).find_one_and_update(update, bypass_document_validation: true)
end
end
# Add the supplied filter to the current filter.
def add_to_current_filter(filter)