Sha256: a926f32dcf67213eb04a02dda81d0a59036c60ae0e66f1102269daf93476aa52
Contents?: true
Size: 2 KB
Versions: 1
Compression:
Stored size: 2 KB
Contents
# frozen_string_literal: true require "active_job" module ChaoticJob module Performer extend ActiveJob::TestHelper extend self def perform_all until (jobs = enqueued_jobs_where).empty? perform(jobs) end end def perform_all_before(cutoff) time = resolve_cutoff(cutoff) until (jobs = enqueued_jobs_where(before: time)).empty? perform(jobs) end end alias_method :perform_all_within, :perform_all_before def perform_all_after(cutoff) time = resolve_cutoff(cutoff) until (jobs = enqueued_jobs_where(after: time)).empty? perform(jobs) end end def perform(jobs) jobs.each do |payload| queue_adapter.enqueued_jobs.delete(payload) queue_adapter.performed_jobs << payload instantiate_job(payload, skip_deserialize_arguments: true).perform_now end.count end def enqueued_jobs_where(before: nil, after: nil) enqueued_jobs .sort_by { |job| job["scheduled_at"] } .select do |job| scheduled_at = job[:at] next true if scheduled_at.nil? # Skip if the job is scheduled after the cutoff time if before next false if scheduled_at > before.to_f end # Skip if the job is scheduled before the cutoff time if after next false if scheduled_at < after.to_f end true end end def resolve_cutoff(cutoff) time = case cutoff in ActiveSupport::Duration cutoff.from_now in Time cutoff end delta = (Time.now - time).abs changeset = case delta when 0..59 # seconds {usec: 0} when 60..3599 # minutes {sec: 0, usec: 0} when 3600..86_399 # hours {min: 0, sec: 0, usec: 0} when 86_400..Float::INFINITY # days+ {hour: 0, min: 0, sec: 0, usec: 0} end time.change(**changeset) end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
chaotic_job-0.1.0 | lib/chaotic_job/performer.rb |