require "active_support/concern" module RocketJob module Plugins module Job # Prevent more than one instance of this job class from running at a time module Model extend ActiveSupport::Concern included do # Fields that are end user editable. # For example are editable in Rocket Job Web Interface. class_attribute :user_editable_fields, instance_accessor: false self.user_editable_fields = [] # Attributes to include when copying across the attributes to a new instance on restart. class_attribute :rocket_job_restart_attributes self.rocket_job_restart_attributes = [] # # User definable attributes # # The following attributes are set when the job is created # Description for this job instance field :description, type: String, class_attribute: true, user_editable: true, copy_on_restart: true # Priority of this job as it relates to other jobs [1..100] # 1: Highest Priority # 50: Default Priority # 100: Lowest Priority # # Example: # A job with a priority of 40 will execute before a job with priority 50 # # In RocketJob Pro, if a SlicedJob is running and a higher priority job # arrives, then the current job will complete the current slices and process # the new higher priority job field :priority, type: Integer, default: 50, class_attribute: true, user_editable: true, copy_on_restart: true validates_inclusion_of :priority, in: 1..100 # When the job completes destroy it from both the database and the UI field :destroy_on_complete, type: Mongoid::Boolean, default: true, class_attribute: true, copy_on_restart: true # Run this job no earlier than this time field :run_at, type: Time, user_editable: true # If a job has not started by this time, destroy it field :expires_at, type: Time, copy_on_restart: true # Raise or lower the log level when calling the job # Can be used to reduce log noise, especially during high volume calls # For debugging a single job can be logged at a low level such as :trace # Levels supported: :trace, :debug, :info, :warn, :error, :fatal field :log_level, type: Mongoid::StringifiedSymbol, class_attribute: true, user_editable: true, copy_on_restart: true validates_inclusion_of :log_level, in: SemanticLogger::LEVELS + [nil] # # Read-only attributes # # Current state, as set by the state machine. Do not modify this value directly. field :state, type: Mongoid::StringifiedSymbol, default: :queued # When the job was created field :created_at, type: Time, default: -> { Time.now } # When processing started on this job field :started_at, type: Time # When the job completed processing field :completed_at, type: Time # Number of times that this job has failed to process field :failure_count, type: Integer, default: 0 # This name of the worker that this job is being processed by, or was processed by field :worker_name, type: String # # Values that jobs can update during processing # # Allow a job to updates its estimated progress # Any integer from 0 to 100 field :percent_complete, type: Integer, default: 0 # Store the last exception for this job embeds_one :exception, class_name: "RocketJob::JobException" # Used when workers fetch jobs to work on. index({state: 1, priority: 1, _id: 1}, background: true) # Used by Mission Control to display completed jobs sorted by completion. index({completed_at: 1}, background: true) validates_presence_of :state, :failure_count, :created_at end module ClassMethods # Returns [String] the singular name for this job class # # Example: # job = DataStudyJob.new # job.underscore_name # # => "data_study" def underscore_name @underscore_name ||= name.sub(/Job$/, "").underscore end # Allow the collective name for this job class to be overridden def underscore_name=(underscore_name) @underscore_name = underscore_name end # Returns [String] the human readable name for this job class # # Example: # job = DataStudyJob.new # job.human_name # # => "Data Study" def human_name @human_name ||= name.sub(/Job$/, "").titleize end # Allow the human readable job name for this job class to be overridden def human_name=(human_name) @human_name = human_name end # Returns [String] the collective name for this job class # # Example: # job = DataStudyJob.new # job.collective_name # # => "data_studies" def collective_name @collective_name ||= name.sub(/Job$/, "").pluralize.underscore end # Allow the collective name for this job class to be overridden def collective_name=(collective_name) @collective_name = collective_name end # Scope for jobs scheduled to run in the future def scheduled queued.where(:run_at.gt => Time.now) end # Scope for queued jobs that can run now # I.e. Queued jobs excluding scheduled jobs def queued_now queued.and(RocketJob::Job.where(run_at: nil).or(:run_at.lte => Time.now)) end # Defines all the fields that are accessible on the Document # For each field that is defined, a getter and setter will be # added as an instance method to the Document. # # @example Define a field. # field :score, :type => Integer, :default => 0 # # @param [ Symbol ] name The name of the field. # @param [ Hash ] options The options to pass to the field. # # @option options [ Class ] :type The type of the field. # @option options [ String ] :label The label for the field. # @option options [ Object, Proc ] :default The field's default # @option options [ Boolean ] :class_attribute Keep the fields default in a class_attribute # @option options [ Boolean ] :user_editable Field can be edited by end users in RJMC # # @return [ Field ] The generated field def field(name, options) if (options.delete(:user_editable) == true) && !user_editable_fields.include?(name.to_sym) self.user_editable_fields += [name.to_sym] end if options.delete(:class_attribute) == true class_attribute(name, instance_accessor: false) public_send("#{name}=", options[:default]) if options.key?(:default) options[:default] = -> { self.class.public_send(name) } end if (options.delete(:copy_on_restart) == true) && !rocket_job_restart_attributes.include?(name.to_sym) self.rocket_job_restart_attributes += [name.to_sym] end super(name, options) end # Builds this job instance from the supplied properties hash. # Overridden by batch to support child objects. def from_properties(properties) new(properties) end end # Returns [Float] the number of seconds the job has taken # - Elapsed seconds to process the job from when a worker first started working on it # until now if still running, or until it was completed # - Seconds in the queue if queued def seconds if completed_at completed_at - (started_at || created_at) elsif started_at Time.now - started_at else Time.now - created_at end end # Returns a human readable duration the job has taken def duration RocketJob.seconds_as_duration(seconds) end # Returns [true|false] whether the job has expired def expired? expires_at && (expires_at < Time.now) end # Returns [true|false] whether the job is scheduled to run in the future def scheduled? queued? && run_at.present? && (run_at > Time.now) end # Return [true|false] whether this job is sleeping. # I.e. No workers currently working on this job even if it is running. def sleeping? running? && worker_count.zero? end # Returns [Integer] the number of workers currently working on this job. def worker_count running? && worker_name.present? ? 1 : 0 end # Returns [Array] names of workers currently working this job. def worker_names running? && worker_name.present? ? [worker_name] : [] end # Clear `run_at` so that this job will run now. def run_now! update_attributes(run_at: nil) if run_at end # Returns [Time] at which this job was intended to run at. # # Takes into account any delays that could occur. # Recommended to use this Time instead of Time.now in the `#perform` since the job could run outside its # intended window. Especially if a failed job is only retried quite sometime later. def scheduled_at run_at || created_at end # Returns [Hash] status of this job def as_json attrs = serializable_hash(methods: %i[seconds duration]) attrs.delete("failure_count") unless failure_count.positive? if queued? attrs.delete("started_at") attrs.delete("completed_at") attrs.delete("result") attrs elsif running? attrs.delete("completed_at") attrs.delete("result") attrs elsif completed? attrs.delete("percent_complete") attrs elsif paused? attrs.delete("completed_at") attrs.delete("result") # Ensure 'paused_at' appears first in the hash {"paused_at" => completed_at}.merge(attrs) elsif aborted? attrs.delete("completed_at") attrs.delete("result") {"aborted_at" => completed_at}.merge(attrs) elsif failed? attrs.delete("completed_at") attrs.delete("result") {"failed_at" => completed_at}.merge(attrs) else attrs end end # Returns [Hash] the status of this job def status(time_zone = "Eastern Time (US & Canada)") h = as_json h.delete("seconds") h.dup.each_pair do |k, v| case v when Time h[k] = v.in_time_zone(time_zone).to_s when BSON::ObjectId h[k] = v.to_s end end h end # Returns [true|false] whether the worker runs on a particular server. def worker_on_server?(server_name) return false unless worker_name.present? && server_name.present? worker_name.start_with?(server_name) end end end end end