lib/chaotic_job/performer.rb in chaotic_job-0.0.1 vs lib/chaotic_job/performer.rb in chaotic_job-0.1.0
- old
+ new
@@ -1,40 +1,84 @@
# frozen_string_literal: true
-# Performer.new(Job1).perform_all
-# Performer.new(Job1).perform_all_within(time)
-# Performer.new(Job1).perform_all_after(time)
+require "active_job"
module ChaoticJob
- class Performer
- include ActiveJob::TestHelper
+ module Performer
+ extend ActiveJob::TestHelper
+ extend self
- def initialize(job, retry_window: 4)
- @job = job
- @retry_window = retry_window
+ def perform_all
+ until (jobs = enqueued_jobs_where).empty?
+ perform(jobs)
+ end
end
- def perform_all
- @job.enqueue
- enqueued_jobs_with.sort_by(&:scheduled_at, nil: :first).each do |job|
- perform_job(job)
+ def perform_all_before(cutoff)
+ time = resolve_cutoff(cutoff)
+
+ until (jobs = enqueued_jobs_where(before: time)).empty?
+ perform(jobs)
end
end
+ alias_method :perform_all_within, :perform_all_before
- def perform_all_after(2.seconds.from_now)
+ def perform_all_after(cutoff)
+ time = resolve_cutoff(cutoff)
+
+ until (jobs = enqueued_jobs_where(after: time)).empty?
+ perform(jobs)
+ end
end
- def perform_all_within(time)
+ def perform(jobs)
+ jobs.each do |payload|
+ queue_adapter.enqueued_jobs.delete(payload)
+ queue_adapter.performed_jobs << payload
+ instantiate_job(payload, skip_deserialize_arguments: true).perform_now
+ end.count
end
- private
+ def enqueued_jobs_where(before: nil, after: nil)
+ enqueued_jobs
+ .sort_by { |job| job["scheduled_at"] }
+ .select do |job|
+ scheduled_at = job[:at]
- def perform_enqueued_jobs_only_with_retries
- retry_window = Time.now + @retry_window
- flush_enqueued_jobs(at: retry_window) until enqueued_jobs_with(at: retry_window).empty?
+ next true if scheduled_at.nil?
+
+ # Skip if the job is scheduled after the cutoff time
+ if before
+ next false if scheduled_at > before.to_f
+ end
+
+ # Skip if the job is scheduled before the cutoff time
+ if after
+ next false if scheduled_at < after.to_f
+ end
+
+ true
+ end
end
- def perform_any_enqueued_jobs_including_future_scheduled_ones
- flush_enqueued_jobs until enqueued_jobs_with.empty?
+ def resolve_cutoff(cutoff)
+ time = case cutoff
+ in ActiveSupport::Duration
+ cutoff.from_now
+ in Time
+ cutoff
+ end
+ delta = (Time.now - time).abs
+ changeset = case delta
+ when 0..59 # seconds
+ {usec: 0}
+ when 60..3599 # minutes
+ {sec: 0, usec: 0}
+ when 3600..86_399 # hours
+ {min: 0, sec: 0, usec: 0}
+ when 86_400..Float::INFINITY # days+
+ {hour: 0, min: 0, sec: 0, usec: 0}
+ end
+ time.change(**changeset)
end
end
end