lib/rocket_job/worker.rb in rocketjob-3.0.3 vs lib/rocket_job/worker.rb in rocketjob-3.0.4

- old
+ new

@@ -38,11 +38,11 @@ @shutdown = Concurrent::AtomicBoolean.new(false) else @shutdown = false end @name = "#{server_name}:#{id}" - @re_check_seconds = re_check_seconds || 60 + @re_check_seconds = (re_check_seconds || 60).to_f @re_check_start = Time.now @filter = filter || {} @current_filter = @filter.dup @thread = Thread.new { run } unless inline end @@ -94,25 +94,32 @@ end # Process the next available job # Returns [Boolean] whether any job was actually processed def process_available_jobs - # Only clear out the current_filter after every `re_check_seconds` - time = Time.now - if (time - @re_check_start) > re_check_seconds.to_f - @re_check_start = time - self.current_filter = filter.dup - end - processed = false - while (job = Job.rocket_job_next_job(name, current_filter)) && !shutdown? + while !shutdown? + reset_filter_if_expired + job = Job.rocket_job_next_job(name, current_filter) + break unless job + logger.fast_tag("job:#{job.id}") do unless job.rocket_job_work(self, false, current_filter) processed = true end end end processed + end + + # Resets the current job filter if the relevant time interval has passed + def reset_filter_if_expired + # Only clear out the current_filter after every `re_check_seconds` + time = Time.now + if (time - @re_check_start) > re_check_seconds + @re_check_start = time + self.current_filter = filter.dup + end end end end