lib/rocket_job/batch/model.rb in rocketjob-5.4.1 vs lib/rocket_job/batch/model.rb in rocketjob-6.0.0.rc1

- old
+ new

@@ -9,51 +9,11 @@ # # User definable attributes # # The following attributes are set when the job is created - # Number of records to include in each slice that is processed - # Note: - # slice_size is only used by SlicedJob#upload & Sliced::Input#upload - # When slices are supplied directly, their size is not modified to match this number - field :slice_size, type: Integer, default: 100, class_attribute: true, user_editable: true, copy_on_restart: true - - # Whether to retain nil results. # - # Only applicable if `collect_output` is `true` - # Set to `false` to prevent collecting output from the perform - # method when it returns `nil`. - field :collect_nil_output, type: Boolean, default: true, class_attribute: true - - # Optional Array<Symbol> list of categories that this job can output to - # - # By using categories the output from #perform can be placed in different - # output collections, and therefore different output files - # - # Categories must be declared in advance to avoid a #perform method - # accidentally writing its results to an unknown category - field :output_categories, type: Array, default: [:main], class_attribute: true - - # Optional Array<Symbol> list of categories that this job can load input data into - field :input_categories, type: Array, default: [:main], class_attribute: true - - # The file name of the uploaded file, if any. - # Set by #upload if a file name was supplied, but can also be set explicitly. - # May or may not include the fully qualified path name. - field :upload_file_name, type: String - - # Compress uploaded records. - # The fields are not affected in any way, only the data stored in the - # records and results collections will compressed - field :compress, type: Object, default: false, class_attribute: true - - # Encrypt uploaded records. - # The fields are not affected in any way, only the data stored in the - # records and results collections will be encrypted - field :encrypt, type: Object, default: false, class_attribute: true - - # # Values that jobs can also update during processing # # Number of records in this job # Note: @@ -67,47 +27,24 @@ # Read-only attributes # # Breaks the :running state up into multiple sub-states: # :running -> :before -> :processing -> :after -> :complete - field :sub_state, type: Symbol - - validates_presence_of :slice_size - - validates_each :output_categories, :input_categories do |record, attr, value| - # Under some circumstances ActiveModel is passing in a nil value even though the - # attributes have default values - Array(value).each do |category| - record.errors.add(attr, "must only contain Symbol values") unless category.is_a?(Symbol) - unless category.to_s =~ /\A[a-z_0-9]+\Z/ - record.errors.add(attr, "must only consist of lowercase characters, digits, and _") - end - end - end + field :sub_state, type: Mongoid::StringifiedSymbol end - # Returns [true|false] whether the slices for this job are encrypted - def encrypted? - encrypt == true - end - - # Returns [true|false] whether the slices for this job are compressed - def compressed? - compress == true - end - # Returns [Integer] percent of records completed so far # Returns 0 if the total record count has not yet been set def percent_complete return 100 if completed? return 0 unless record_count.to_i.positive? # Approximate number of input records - input_records = input.count.to_f * slice_size + input_records = input.count.to_f * input_category.slice_size if input_records > record_count # Sanity check in case slice_size is not being adhered to - 99 + 0 else ((1.0 - (input_records.to_f / record_count)) * 100).to_i end end @@ -118,23 +55,26 @@ h["queued_slices"] = input.queued.count elsif running? || paused? || failed? h["active_slices"] = worker_count h["failed_slices"] = input.failed.count h["queued_slices"] = input.queued.count + output_categories.each do |category| + name_str = category.name == :main ? "" : "_#{category.name}" + h["output_slices#{name_str}"] = output(category).count + end # Very high level estimated time left if record_count && running? && record_count.positive? percent = percent_complete if percent >= 5 secs = seconds.to_f h["est_remaining_duration"] = RocketJob.seconds_as_duration((((secs / percent) * 100) - secs)) end end elsif completed? - secs = seconds.to_f + secs = seconds.to_f h["records_per_hour"] = ((record_count.to_f / secs) * 60 * 60).round if record_count&.positive? && (secs > 0.0) end - h["output_slices"] = output.count if collect_output? && !completed? h.merge!(super(time_zone)) h.delete("result") # Worker name should be retrieved from the slices when processing h.delete("worker_name") if sub_state == :processing h @@ -169,9 +109,21 @@ else 0 end @worker_count_last = Time.now.to_i @worker_count + end + + # @deprecated + # For backward compatibility + def upload_file_name + input_category.file_name + end + + # @deprecated + # For backward compatibility + def upload_file_name=(upload_file_name) + input_category.file_name = upload_file_name end end end end