require "active_support/concern" require "fugit" module RocketJob module Plugins # Ensure that a job will only run between certain hours of the day, regardless of when it was # created/enqueued. Useful for creating a job now that should only be processed later during a # specific time window. If the time window is already active the job is able to be processed # immediately. # # Example: Process this job on Monday's between 8am and 10am. # # Example: Run this job on the 1st of every month from midnight for the entire day. # # Since the cron schedule supports time zones it is easy to setup jobs to run at UTC or any other time zone. # # Example: # # Only run the job between the hours of 8:30am and 8:30pm. If it is after 8:30pm schedule # # it to run at 8:30am the next day. # class BusinessHoursJob < RocketJob::Job # include RocketJob::Plugins::ProcessingWindow # # # The start of the processing window # self.processing_schedule = "30 8 * * * America/New_York" # # # How long the processing window is: # self.processing_duration = 12.hours # # def perform # # Job will only run between 8:30am and 8:30pm Eastern # end # end # # Note: # If a job is created/enqueued during the processing window, but due to busy/unavailable workers # is not processed during the window, the current job will be re-queued for the beginning # of the next processing window. module ProcessingWindow extend ActiveSupport::Concern included do field :processing_schedule, type: String, class_attribute: true, user_editable: true, copy_on_restart: true field :processing_duration, type: Integer, class_attribute: true, user_editable: true, copy_on_restart: true before_create :rocket_job_processing_window_set_run_at before_retry :rocket_job_processing_window_set_run_at after_start :rocket_job_processing_window_check validates_presence_of :processing_schedule, :processing_duration validates_each :processing_schedule do |record, attr, value| record.errors.add(attr, "Invalid schedule: #{value.inspect}") unless Fugit::Cron.new(value) end end # Returns [true|false] whether this job is currently inside its processing window def rocket_job_processing_window_active? time = Time.now.utc previous_time = Fugit::Cron.new(processing_schedule).previous_time(time).to_utc_time # Inside previous processing window? previous_time + processing_duration > time end private # Only process this job if it is still in its processing window def rocket_job_processing_window_check return if rocket_job_processing_window_active? next_time = Fugit::Cron.new(processing_schedule).next_time.to_utc_time logger.warn("Processing window closed before job was processed. Job is re-scheduled to run at: #{next_time}") self.worker_name ||= "inline" requeue!(worker_name) end def rocket_job_processing_window_set_run_at self.run_at = Fugit::Cron.new(processing_schedule).next_time.to_utc_time unless rocket_job_processing_window_active? end end end end