lib/rocket_job/worker.rb in rocketjob-3.0.3 vs lib/rocket_job/worker.rb in rocketjob-3.0.4
- old
+ new
@@ -38,11 +38,11 @@
@shutdown = Concurrent::AtomicBoolean.new(false)
else
@shutdown = false
end
@name = "#{server_name}:#{id}"
- @re_check_seconds = re_check_seconds || 60
+ @re_check_seconds = (re_check_seconds || 60).to_f
@re_check_start = Time.now
@filter = filter || {}
@current_filter = @filter.dup
@thread = Thread.new { run } unless inline
end
@@ -94,25 +94,32 @@
end
# Process the next available job
# Returns [Boolean] whether any job was actually processed
def process_available_jobs
- # Only clear out the current_filter after every `re_check_seconds`
- time = Time.now
- if (time - @re_check_start) > re_check_seconds.to_f
- @re_check_start = time
- self.current_filter = filter.dup
- end
-
processed = false
- while (job = Job.rocket_job_next_job(name, current_filter)) && !shutdown?
+ while !shutdown?
+ reset_filter_if_expired
+ job = Job.rocket_job_next_job(name, current_filter)
+ break unless job
+
logger.fast_tag("job:#{job.id}") do
unless job.rocket_job_work(self, false, current_filter)
processed = true
end
end
end
processed
+ end
+
+ # Resets the current job filter if the relevant time interval has passed
+ def reset_filter_if_expired
+ # Only clear out the current_filter after every `re_check_seconds`
+ time = Time.now
+ if (time - @re_check_start) > re_check_seconds
+ @re_check_start = time
+ self.current_filter = filter.dup
+ end
end
end
end