lib/rocket_job/worker.rb in rocketjob-3.4.3 vs lib/rocket_job/worker.rb in rocketjob-3.5.0

- old
+ new

@@ -2,11 +2,11 @@ require 'forwardable' module RocketJob # Worker # # A worker runs on a single operating system thread - # Is usually started under a RocketJob server process. + # Is usually started under a Rocket Job server process. class Worker include SemanticLogger::Loggable include ActiveSupport::Callbacks extend Forwardable @@ -29,18 +29,23 @@ def self.around_running(*filters, &blk) set_callback(:running, :around, *filters, &blk) end - def initialize(id: 0, server_name: 'inline:0', inline: false, re_check_seconds: Config.instance.re_check_seconds, filter: nil) - @id = id - @server_name = server_name - if defined?(Concurrent::JavaAtomicBoolean) || defined?(Concurrent::CAtomicBoolean) - @shutdown = Concurrent::AtomicBoolean.new(false) - else - @shutdown = false - end + def initialize(id: 0, + server_name: 'inline:0', + inline: false, + re_check_seconds: Config.instance.re_check_seconds, + filter: nil) + @id = id + @server_name = server_name + @shutdown = + if defined?(Concurrent::JavaAtomicBoolean) || defined?(Concurrent::CAtomicBoolean) + Concurrent::AtomicBoolean.new(false) + else + false + end @name = "#{server_name}:#{id}" @re_check_seconds = (re_check_seconds || 60).to_f @re_check_start = Time.now @filter = filter || {} @current_filter = @filter.dup @@ -72,13 +77,13 @@ # # Params # worker_id [Integer] # The number of this worker for logging purposes def run - Thread.current.name = 'rocketjob %03i' % id + Thread.current.name = format('rocketjob %03i', id) logger.info 'Started' - while !shutdown? + until shutdown? if process_available_jobs # Keeps workers staggered across the poll interval so that # all workers don't poll at the same time sleep rand(RocketJob::Config.instance.max_poll_seconds * 1000) / 1000 else @@ -95,32 +100,28 @@ # Process the next available job # Returns [Boolean] whether any job was actually processed def process_available_jobs processed = false - while !shutdown? + until shutdown? reset_filter_if_expired job = Job.rocket_job_next_job(name, current_filter) break unless job SemanticLogger.named_tagged(job: job.id.to_s) do - unless job.rocket_job_work(self, false, current_filter) - processed = true - end + processed = true unless job.rocket_job_work(self, false, current_filter) end end processed end # Resets the current job filter if the relevant time interval has passed def reset_filter_if_expired # Only clear out the current_filter after every `re_check_seconds` time = Time.now - if (time - @re_check_start) > re_check_seconds - @re_check_start = time - self.current_filter = filter.dup - end - end + return unless (time - @re_check_start) > re_check_seconds + @re_check_start = time + self.current_filter = filter.dup + end end end -