lib/rocket_job/plugins/job/model.rb in rocketjob-5.4.1 vs lib/rocket_job/plugins/job/model.rb in rocketjob-6.0.0.rc1
- old
+ new
@@ -35,35 +35,34 @@
#
# In RocketJob Pro, if a SlicedJob is running and a higher priority job
# arrives, then the current job will complete the current slices and process
# the new higher priority job
field :priority, type: Integer, default: 50, class_attribute: true, user_editable: true, copy_on_restart: true
+ validates_inclusion_of :priority, in: 1..100
# When the job completes destroy it from both the database and the UI
- field :destroy_on_complete, type: Boolean, default: true, class_attribute: true, copy_on_restart: true
+ field :destroy_on_complete, type: Mongoid::Boolean, default: true, class_attribute: true, copy_on_restart: true
- # Whether to store the results from this job
- field :collect_output, type: Boolean, default: false, class_attribute: true
-
# Run this job no earlier than this time
field :run_at, type: Time, user_editable: true
# If a job has not started by this time, destroy it
field :expires_at, type: Time, copy_on_restart: true
# Raise or lower the log level when calling the job
# Can be used to reduce log noise, especially during high volume calls
# For debugging a single job can be logged at a low level such as :trace
# Levels supported: :trace, :debug, :info, :warn, :error, :fatal
- field :log_level, type: Symbol, class_attribute: true, user_editable: true, copy_on_restart: true
+ field :log_level, type: Mongoid::StringifiedSymbol, class_attribute: true, user_editable: true, copy_on_restart: true
+ validates_inclusion_of :log_level, in: SemanticLogger::LEVELS + [nil]
#
# Read-only attributes
#
# Current state, as set by the state machine. Do not modify this value directly.
- field :state, type: Symbol, default: :queued
+ field :state, type: Mongoid::StringifiedSymbol, default: :queued
# When the job was created
field :created_at, type: Time, default: -> { Time.now }
# When processing started on this job
@@ -87,21 +86,16 @@
field :percent_complete, type: Integer, default: 0
# Store the last exception for this job
embeds_one :exception, class_name: "RocketJob::JobException"
- # Store the Hash result from this job if collect_output is true,
- # and the job returned actually returned a Hash, otherwise nil
- # Not applicable to SlicedJob jobs, since its output is stored in a
- # separate collection
- field :result, type: Hash
-
+ # Used when workers fetch jobs to work on.
index({state: 1, priority: 1, _id: 1}, background: true)
+ # Used by Mission Control to display completed jobs sorted by completion.
+ index({completed_at: 1}, background: true)
validates_presence_of :state, :failure_count, :created_at
- validates :priority, inclusion: 1..100
- validates :log_level, inclusion: SemanticLogger::LEVELS + [nil]
end
module ClassMethods
# Returns [String] the singular name for this job class
#
@@ -153,18 +147,12 @@
queued.where(:run_at.gt => Time.now)
end
# Scope for queued jobs that can run now
# I.e. Queued jobs excluding scheduled jobs
- if Mongoid::VERSION.to_f >= 7.1
- def queued_now
- queued.and(RocketJob::Job.where(run_at: nil).or(:run_at.lte => Time.now))
- end
- else
- def queued_now
- queued.or({run_at: nil}, :run_at.lte => Time.now)
- end
+ def queued_now
+ queued.and(RocketJob::Job.where(run_at: nil).or(:run_at.lte => Time.now))
end
# Defines all the fields that are accessible on the Document
# For each field that is defined, a getter and setter will be
# added as an instance method to the Document.
@@ -181,47 +169,34 @@
# @option options [ Boolean ] :class_attribute Keep the fields default in a class_attribute
# @option options [ Boolean ] :user_editable Field can be edited by end users in RJMC
#
# @return [ Field ] The generated field
def field(name, options)
- if options.delete(:user_editable) == true
- self.user_editable_fields += [name.to_sym] unless user_editable_fields.include?(name.to_sym)
+ if (options.delete(:user_editable) == true) && !user_editable_fields.include?(name.to_sym)
+ self.user_editable_fields += [name.to_sym]
end
+
if options.delete(:class_attribute) == true
class_attribute(name, instance_accessor: false)
public_send("#{name}=", options[:default]) if options.key?(:default)
options[:default] = -> { self.class.public_send(name) }
end
- if options.delete(:copy_on_restart) == true
- self.rocket_job_restart_attributes += [name.to_sym] unless rocket_job_restart_attributes.include?(name.to_sym)
+
+ if (options.delete(:copy_on_restart) == true) && !rocket_job_restart_attributes.include?(name.to_sym)
+ self.rocket_job_restart_attributes += [name.to_sym]
end
+
super(name, options)
end
- # DEPRECATED
- def rocket_job
- warn "Replace calls to .rocket_job with calls to set class instance variables. For example: self.priority = 50"
- yield(self)
+ # Builds this job instance from the supplied properties hash.
+ # Overridden by batch to support child objects.
+ def from_properties(properties)
+ new(properties)
end
-
- # DEPRECATED
- def public_rocket_job_properties(*args)
- warn "Replace calls to .public_rocket_job_properties by adding `user_editable: true` option to the field declaration in #{name} for: #{args.inspect}"
- self.user_editable_fields += args.collect(&:to_sym)
- end
end
- # Returns [true|false] whether to collect nil results from running this batch
- def collect_nil_output?
- collect_output? ? (collect_nil_output == true) : false
- end
-
- # Returns [true|false] whether to collect the results from running this batch
- def collect_output?
- collect_output == true
- end
-
# Returns [Float] the number of seconds the job has taken
# - Elapsed seconds to process the job from when a worker first started working on it
# until now if still running, or until it was completed
# - Seconds in the queue if queued
def seconds
@@ -280,11 +255,10 @@
end
# Returns [Hash] status of this job
def as_json
attrs = serializable_hash(methods: %i[seconds duration])
- attrs.delete("result") unless collect_output?
attrs.delete("failure_count") unless failure_count.positive?
if queued?
attrs.delete("started_at")
attrs.delete("completed_at")
attrs.delete("result")
@@ -317,19 +291,20 @@
# Returns [Hash] the status of this job
def status(time_zone = "Eastern Time (US & Canada)")
h = as_json
h.delete("seconds")
h.dup.each_pair do |k, v|
- if v.is_a?(Time)
+ case v
+ when Time
h[k] = v.in_time_zone(time_zone).to_s
- elsif v.is_a?(BSON::ObjectId)
+ when BSON::ObjectId
h[k] = v.to_s
end
end
h
end
- # Returns [Boolean] whether the worker runs on a particular server.
+ # Returns [true|false] whether the worker runs on a particular server.
def worker_on_server?(server_name)
return false unless worker_name.present? && server_name.present?
worker_name.start_with?(server_name)
end