lib/rocket_job/sliced/slice.rb in rocketjob-5.1.1 vs lib/rocket_job/sliced/slice.rb in rocketjob-5.2.0.beta1
- old
+ new
@@ -1,6 +1,6 @@
-require 'forwardable'
+require "forwardable"
module RocketJob
module Sliced
# A slice is an Array of Records, along with meta-data that is used
# or set during processing of the individual records
#
@@ -19,16 +19,15 @@
class Slice
include RocketJob::Plugins::Document
include RocketJob::Plugins::StateMachine
extend Forwardable
- store_in client: 'rocketjob_slices'
+ store_in client: "rocketjob_slices"
# The record number of the first record in this slice.
- #
- # Optional: If present the record_number is set while the job
- # is being processed.
+ # Useful in knowing the line number of each record in this slice
+ # relative to the original file that was uploaded.
field :first_record_number, type: Integer
#
# Read-only attributes
#
@@ -40,15 +39,18 @@
field :started_at, type: Time
# Number of times that this job has failed to process
field :failure_count, type: Integer
+ # Number of the record within this slice (not the entire file/job) currently being processed. (One based index)
+ field :processing_record_number, type: Integer
+
# This name of the worker that this job is being processed by, or was processed by
field :worker_name, type: String
# The last exception for this slice if any
- embeds_one :exception, class_name: 'RocketJob::JobException'
+ embeds_one :exception, class_name: "RocketJob::JobException"
after_find :parse_records
# State Machine events and transitions
#
@@ -106,67 +108,88 @@
end
def_instance_delegators :records, :each, :<<, :size, :concat, :at
def_instance_delegators :records, *(Enumerable.instance_methods - Module.methods)
- # Fail this slice, along with the exception that caused the failure
- def set_exception(exc = nil, record_number = nil)
+ # Returns [Integer] the record number of the record currently being processed relative to the entire file.
+ def current_record_number
+ first_record_number.to_i + processing_record_number.to_i
+ end
+
+ # Before Fail save the exception to this slice.
+ def set_exception(exc = nil)
if exc
- self.exception = JobException.from_exception(exc)
- exception.worker_name = worker_name
- exception.record_number = record_number
+ self.exception = JobException.from_exception(exc)
+ exception.worker_name = worker_name
end
self.failure_count = failure_count.to_i + 1
self.worker_name = nil
end
# Returns the failed record.
# Returns [nil] if there is no failed record
def failed_record
- if exception && (record_number = exception.record_number)
- at(record_number - 1)
+ if exception && processing_record_number
+ at(processing_record_number - 1)
end
end
# Returns [Hash] the slice as a Hash for storage purposes
# Compresses / Encrypts the slice according to the job setting
if ::Mongoid::VERSION.to_i >= 6
def as_attributes
attrs = super
- attrs['records'] = serialize_records if @records
+ attrs["records"] = serialize_records if @records
attrs
end
else
def as_document
attrs = super
- attrs['records'] = serialize_records if @records
+ attrs["records"] = serialize_records if @records
attrs
end
end
def inspect
"#{super[0...-1]}, records: #{@records.inspect}, collection_name: #{collection_name.inspect}>"
end
+ # Fail this slice if an exception occurs during processing.
+ def fail_on_exception!(re_raise_exceptions = false, &block)
+ SemanticLogger.named_tagged(slice: id.to_s, &block)
+ rescue Exception => e
+ SemanticLogger.named_tagged(slice: id.to_s) do
+ if failed? || !may_fail?
+ exception = JobException.from_exception(e)
+ exception.worker_name = worker_name
+ save! unless new_record? || destroyed?
+ elsif new_record? || destroyed?
+ fail(e)
+ else
+ fail!(e)
+ end
+ raise e if re_raise_exceptions
+ end
+ end
+
private
# Always add records to any updates.
def atomic_updates(*args)
- r = super(*args)
- if @records
- (r['$set'] ||= {})['records'] = serialize_records
- end
+ r = super(*args)
+ (r["$set"] ||= {})["records"] = serialize_records if @records
r
end
def parse_records
- @records = attributes.delete('records')
+ @records = attributes.delete("records")
end
def serialize_records
records.mongoize
end
+ # Before Start
def set_started_at
self.started_at = Time.now
end
end
end