lib/rocket_job/batch/model.rb in rocketjob-5.4.1 vs lib/rocket_job/batch/model.rb in rocketjob-6.0.0.rc1
- old
+ new
@@ -9,51 +9,11 @@
#
# User definable attributes
#
# The following attributes are set when the job is created
- # Number of records to include in each slice that is processed
- # Note:
- # slice_size is only used by SlicedJob#upload & Sliced::Input#upload
- # When slices are supplied directly, their size is not modified to match this number
- field :slice_size, type: Integer, default: 100, class_attribute: true, user_editable: true, copy_on_restart: true
-
- # Whether to retain nil results.
#
- # Only applicable if `collect_output` is `true`
- # Set to `false` to prevent collecting output from the perform
- # method when it returns `nil`.
- field :collect_nil_output, type: Boolean, default: true, class_attribute: true
-
- # Optional Array<Symbol> list of categories that this job can output to
- #
- # By using categories the output from #perform can be placed in different
- # output collections, and therefore different output files
- #
- # Categories must be declared in advance to avoid a #perform method
- # accidentally writing its results to an unknown category
- field :output_categories, type: Array, default: [:main], class_attribute: true
-
- # Optional Array<Symbol> list of categories that this job can load input data into
- field :input_categories, type: Array, default: [:main], class_attribute: true
-
- # The file name of the uploaded file, if any.
- # Set by #upload if a file name was supplied, but can also be set explicitly.
- # May or may not include the fully qualified path name.
- field :upload_file_name, type: String
-
- # Compress uploaded records.
- # The fields are not affected in any way, only the data stored in the
- # records and results collections will compressed
- field :compress, type: Object, default: false, class_attribute: true
-
- # Encrypt uploaded records.
- # The fields are not affected in any way, only the data stored in the
- # records and results collections will be encrypted
- field :encrypt, type: Object, default: false, class_attribute: true
-
- #
# Values that jobs can also update during processing
#
# Number of records in this job
# Note:
@@ -67,47 +27,24 @@
# Read-only attributes
#
# Breaks the :running state up into multiple sub-states:
# :running -> :before -> :processing -> :after -> :complete
- field :sub_state, type: Symbol
-
- validates_presence_of :slice_size
-
- validates_each :output_categories, :input_categories do |record, attr, value|
- # Under some circumstances ActiveModel is passing in a nil value even though the
- # attributes have default values
- Array(value).each do |category|
- record.errors.add(attr, "must only contain Symbol values") unless category.is_a?(Symbol)
- unless category.to_s =~ /\A[a-z_0-9]+\Z/
- record.errors.add(attr, "must only consist of lowercase characters, digits, and _")
- end
- end
- end
+ field :sub_state, type: Mongoid::StringifiedSymbol
end
- # Returns [true|false] whether the slices for this job are encrypted
- def encrypted?
- encrypt == true
- end
-
- # Returns [true|false] whether the slices for this job are compressed
- def compressed?
- compress == true
- end
-
# Returns [Integer] percent of records completed so far
# Returns 0 if the total record count has not yet been set
def percent_complete
return 100 if completed?
return 0 unless record_count.to_i.positive?
# Approximate number of input records
- input_records = input.count.to_f * slice_size
+ input_records = input.count.to_f * input_category.slice_size
if input_records > record_count
# Sanity check in case slice_size is not being adhered to
- 99
+ 0
else
((1.0 - (input_records.to_f / record_count)) * 100).to_i
end
end
@@ -118,23 +55,26 @@
h["queued_slices"] = input.queued.count
elsif running? || paused? || failed?
h["active_slices"] = worker_count
h["failed_slices"] = input.failed.count
h["queued_slices"] = input.queued.count
+ output_categories.each do |category|
+ name_str = category.name == :main ? "" : "_#{category.name}"
+ h["output_slices#{name_str}"] = output(category).count
+ end
# Very high level estimated time left
if record_count && running? && record_count.positive?
percent = percent_complete
if percent >= 5
secs = seconds.to_f
h["est_remaining_duration"] = RocketJob.seconds_as_duration((((secs / percent) * 100) - secs))
end
end
elsif completed?
- secs = seconds.to_f
+ secs = seconds.to_f
h["records_per_hour"] = ((record_count.to_f / secs) * 60 * 60).round if record_count&.positive? && (secs > 0.0)
end
- h["output_slices"] = output.count if collect_output? && !completed?
h.merge!(super(time_zone))
h.delete("result")
# Worker name should be retrieved from the slices when processing
h.delete("worker_name") if sub_state == :processing
h
@@ -169,9 +109,21 @@
else
0
end
@worker_count_last = Time.now.to_i
@worker_count
+ end
+
+ # @deprecated
+ # For backward compatibility
+ def upload_file_name
+ input_category.file_name
+ end
+
+ # @deprecated
+ # For backward compatibility
+ def upload_file_name=(upload_file_name)
+ input_category.file_name = upload_file_name
end
end
end
end