lib/rocket_job/worker.rb in rocketjob-3.4.3 vs lib/rocket_job/worker.rb in rocketjob-3.5.0
- old
+ new
@@ -2,11 +2,11 @@
require 'forwardable'
module RocketJob
# Worker
#
# A worker runs on a single operating system thread
- # Is usually started under a RocketJob server process.
+ # Is usually started under a Rocket Job server process.
class Worker
include SemanticLogger::Loggable
include ActiveSupport::Callbacks
extend Forwardable
@@ -29,18 +29,23 @@
def self.around_running(*filters, &blk)
set_callback(:running, :around, *filters, &blk)
end
- def initialize(id: 0, server_name: 'inline:0', inline: false, re_check_seconds: Config.instance.re_check_seconds, filter: nil)
- @id = id
- @server_name = server_name
- if defined?(Concurrent::JavaAtomicBoolean) || defined?(Concurrent::CAtomicBoolean)
- @shutdown = Concurrent::AtomicBoolean.new(false)
- else
- @shutdown = false
- end
+ def initialize(id: 0,
+ server_name: 'inline:0',
+ inline: false,
+ re_check_seconds: Config.instance.re_check_seconds,
+ filter: nil)
+ @id = id
+ @server_name = server_name
+ @shutdown =
+ if defined?(Concurrent::JavaAtomicBoolean) || defined?(Concurrent::CAtomicBoolean)
+ Concurrent::AtomicBoolean.new(false)
+ else
+ false
+ end
@name = "#{server_name}:#{id}"
@re_check_seconds = (re_check_seconds || 60).to_f
@re_check_start = Time.now
@filter = filter || {}
@current_filter = @filter.dup
@@ -72,13 +77,13 @@
#
# Params
# worker_id [Integer]
# The number of this worker for logging purposes
def run
- Thread.current.name = 'rocketjob %03i' % id
+ Thread.current.name = format('rocketjob %03i', id)
logger.info 'Started'
- while !shutdown?
+ until shutdown?
if process_available_jobs
# Keeps workers staggered across the poll interval so that
# all workers don't poll at the same time
sleep rand(RocketJob::Config.instance.max_poll_seconds * 1000) / 1000
else
@@ -95,32 +100,28 @@
# Process the next available job
# Returns [Boolean] whether any job was actually processed
def process_available_jobs
processed = false
- while !shutdown?
+ until shutdown?
reset_filter_if_expired
job = Job.rocket_job_next_job(name, current_filter)
break unless job
SemanticLogger.named_tagged(job: job.id.to_s) do
- unless job.rocket_job_work(self, false, current_filter)
- processed = true
- end
+ processed = true unless job.rocket_job_work(self, false, current_filter)
end
end
processed
end
# Resets the current job filter if the relevant time interval has passed
def reset_filter_if_expired
# Only clear out the current_filter after every `re_check_seconds`
time = Time.now
- if (time - @re_check_start) > re_check_seconds
- @re_check_start = time
- self.current_filter = filter.dup
- end
- end
+ return unless (time - @re_check_start) > re_check_seconds
+ @re_check_start = time
+ self.current_filter = filter.dup
+ end
end
end
-