lib/chaotic_job/performer.rb in chaotic_job-0.0.1 vs lib/chaotic_job/performer.rb in chaotic_job-0.1.0

- old
+ new

@@ -1,40 +1,84 @@ # frozen_string_literal: true -# Performer.new(Job1).perform_all -# Performer.new(Job1).perform_all_within(time) -# Performer.new(Job1).perform_all_after(time) +require "active_job" module ChaoticJob - class Performer - include ActiveJob::TestHelper + module Performer + extend ActiveJob::TestHelper + extend self - def initialize(job, retry_window: 4) - @job = job - @retry_window = retry_window + def perform_all + until (jobs = enqueued_jobs_where).empty? + perform(jobs) + end end - def perform_all - @job.enqueue - enqueued_jobs_with.sort_by(&:scheduled_at, nil: :first).each do |job| - perform_job(job) + def perform_all_before(cutoff) + time = resolve_cutoff(cutoff) + + until (jobs = enqueued_jobs_where(before: time)).empty? + perform(jobs) end end + alias_method :perform_all_within, :perform_all_before - def perform_all_after(2.seconds.from_now) + def perform_all_after(cutoff) + time = resolve_cutoff(cutoff) + + until (jobs = enqueued_jobs_where(after: time)).empty? + perform(jobs) + end end - def perform_all_within(time) + def perform(jobs) + jobs.each do |payload| + queue_adapter.enqueued_jobs.delete(payload) + queue_adapter.performed_jobs << payload + instantiate_job(payload, skip_deserialize_arguments: true).perform_now + end.count end - private + def enqueued_jobs_where(before: nil, after: nil) + enqueued_jobs + .sort_by { |job| job["scheduled_at"] } + .select do |job| + scheduled_at = job[:at] - def perform_enqueued_jobs_only_with_retries - retry_window = Time.now + @retry_window - flush_enqueued_jobs(at: retry_window) until enqueued_jobs_with(at: retry_window).empty? + next true if scheduled_at.nil? + + # Skip if the job is scheduled after the cutoff time + if before + next false if scheduled_at > before.to_f + end + + # Skip if the job is scheduled before the cutoff time + if after + next false if scheduled_at < after.to_f + end + + true + end end - def perform_any_enqueued_jobs_including_future_scheduled_ones - flush_enqueued_jobs until enqueued_jobs_with.empty? + def resolve_cutoff(cutoff) + time = case cutoff + in ActiveSupport::Duration + cutoff.from_now + in Time + cutoff + end + delta = (Time.now - time).abs + changeset = case delta + when 0..59 # seconds + {usec: 0} + when 60..3599 # minutes + {sec: 0, usec: 0} + when 3600..86_399 # hours + {min: 0, sec: 0, usec: 0} + when 86_400..Float::INFINITY # days+ + {hour: 0, min: 0, sec: 0, usec: 0} + end + time.change(**changeset) end end end