lib/rocket_job/sliced/slice.rb in rocketjob-5.1.1 vs lib/rocket_job/sliced/slice.rb in rocketjob-5.2.0.beta1

- old
+ new

@@ -1,6 +1,6 @@ -require 'forwardable' +require "forwardable" module RocketJob module Sliced # A slice is an Array of Records, along with meta-data that is used # or set during processing of the individual records # @@ -19,16 +19,15 @@ class Slice include RocketJob::Plugins::Document include RocketJob::Plugins::StateMachine extend Forwardable - store_in client: 'rocketjob_slices' + store_in client: "rocketjob_slices" # The record number of the first record in this slice. - # - # Optional: If present the record_number is set while the job - # is being processed. + # Useful in knowing the line number of each record in this slice + # relative to the original file that was uploaded. field :first_record_number, type: Integer # # Read-only attributes # @@ -40,15 +39,18 @@ field :started_at, type: Time # Number of times that this job has failed to process field :failure_count, type: Integer + # Number of the record within this slice (not the entire file/job) currently being processed. (One based index) + field :processing_record_number, type: Integer + # This name of the worker that this job is being processed by, or was processed by field :worker_name, type: String # The last exception for this slice if any - embeds_one :exception, class_name: 'RocketJob::JobException' + embeds_one :exception, class_name: "RocketJob::JobException" after_find :parse_records # State Machine events and transitions # @@ -106,67 +108,88 @@ end def_instance_delegators :records, :each, :<<, :size, :concat, :at def_instance_delegators :records, *(Enumerable.instance_methods - Module.methods) - # Fail this slice, along with the exception that caused the failure - def set_exception(exc = nil, record_number = nil) + # Returns [Integer] the record number of the record currently being processed relative to the entire file. + def current_record_number + first_record_number.to_i + processing_record_number.to_i + end + + # Before Fail save the exception to this slice. + def set_exception(exc = nil) if exc - self.exception = JobException.from_exception(exc) - exception.worker_name = worker_name - exception.record_number = record_number + self.exception = JobException.from_exception(exc) + exception.worker_name = worker_name end self.failure_count = failure_count.to_i + 1 self.worker_name = nil end # Returns the failed record. # Returns [nil] if there is no failed record def failed_record - if exception && (record_number = exception.record_number) - at(record_number - 1) + if exception && processing_record_number + at(processing_record_number - 1) end end # Returns [Hash] the slice as a Hash for storage purposes # Compresses / Encrypts the slice according to the job setting if ::Mongoid::VERSION.to_i >= 6 def as_attributes attrs = super - attrs['records'] = serialize_records if @records + attrs["records"] = serialize_records if @records attrs end else def as_document attrs = super - attrs['records'] = serialize_records if @records + attrs["records"] = serialize_records if @records attrs end end def inspect "#{super[0...-1]}, records: #{@records.inspect}, collection_name: #{collection_name.inspect}>" end + # Fail this slice if an exception occurs during processing. + def fail_on_exception!(re_raise_exceptions = false, &block) + SemanticLogger.named_tagged(slice: id.to_s, &block) + rescue Exception => e + SemanticLogger.named_tagged(slice: id.to_s) do + if failed? || !may_fail? + exception = JobException.from_exception(e) + exception.worker_name = worker_name + save! unless new_record? || destroyed? + elsif new_record? || destroyed? + fail(e) + else + fail!(e) + end + raise e if re_raise_exceptions + end + end + private # Always add records to any updates. def atomic_updates(*args) - r = super(*args) - if @records - (r['$set'] ||= {})['records'] = serialize_records - end + r = super(*args) + (r["$set"] ||= {})["records"] = serialize_records if @records r end def parse_records - @records = attributes.delete('records') + @records = attributes.delete("records") end def serialize_records records.mongoize end + # Before Start def set_started_at self.started_at = Time.now end end end