lib/rocket_job/batch/worker.rb in rocketjob-5.2.0.beta2 vs lib/rocket_job/batch/worker.rb in rocketjob-5.2.0.beta3
- old
+ new
@@ -128,62 +128,73 @@
# Process a single slice from Mongo
# Once the slice has been successfully processed it will be removed from the input collection
# Returns [Integer] the number of records successfully processed
def rocket_job_process_slice(slice, &block)
- # TODO: Skip records already processed
- @rocket_job_record_number = slice.first_record_number || 0
- @rocket_job_slice = slice
+ @rocket_job_slice = slice
+ count = 0
- processed_records = 0
run_callbacks(:slice) do
# Allow before_slice callbacks to fail, complete or abort this slice.
return 0 unless running?
- RocketJob::Sliced::Writer::Output.collect(self, slice) do |writer|
- slice.each do |record|
- SemanticLogger.named_tagged(record: @rocket_job_record_number) do
- writer << rocket_job_batch_perform(slice, record, &block)
- processed_records += 1
- end
- # JRuby thinks self.rocket_job_record_number= is private and cannot be accessed
- @rocket_job_record_number += 1
- end
- end
- @rocket_job_slice = nil
- @rocket_job_record_number = nil
+ count = rocket_job_perform_slice(slice, &block)
end
+ @rocket_job_slice = nil
# On successful completion remove the slice from the input queue
# TODO: Add option to complete slice instead of destroying it to retain input data.
slice.destroy
- processed_records
+ count
end
+ # Perform individual slice without callbacks
+ def rocket_job_perform_slice(slice, &block)
+ count = 0
+ RocketJob::Sliced::Writer::Output.collect(self, slice) do |writer|
+ records = slice.records
+ slice.processing_record_number ||= 0
+
+ # Skip records already processed, if any.
+ # TODO: Must append to existing output slices before this can be enabled.
+ # if !collect_output && (slice.processing_record_number > 1)
+ # records = records[slice.processing_record_number - 1..-1]
+ # end
+
+ records.each do |record|
+ slice.processing_record_number += 1
+ SemanticLogger.named_tagged(record: slice.current_record_number) do
+ writer << rocket_job_batch_perform(slice, record, &block)
+ count += 1
+ end
+ end
+ end
+ count
+ end
+
# Perform a single record within the current slice.
def rocket_job_batch_perform(slice, record)
- slice.processing_record_number ||= 0
- slice.processing_record_number += 1
+ @rocket_job_record_number = slice.current_record_number
return block_given? ? yield(record) : perform(record) if _perform_callbacks.empty?
# @rocket_job_input and @rocket_job_output can be modified by before/around callbacks
- @rocket_job_input = record
- @rocket_job_output = nil
+ @rocket_job_input = record
+ @rocket_job_output = nil
run_callbacks(:perform) do
@rocket_job_output =
if block_given?
yield(@rocket_job_input)
else
perform(@rocket_job_input)
end
end
- @rocket_job_input = nil
- result = @rocket_job_output
- @rocket_job_output = nil
+ @rocket_job_input = nil
+ result = @rocket_job_output
+ @rocket_job_output = nil
result
end
# Checks for completion and runs after_batch if defined
# Returns true if the job is now complete/aborted/failed
@@ -195,28 +206,11 @@
input_count = input.count
failed_count = input.failed.count
if failed_count.positive? && (input_count == failed_count)
# Reload to pull in any counters or other data that was modified.
reload unless new_record?
- if may_fail?
- fail_job = true
- unless new_record?
- # Fail job iff no other worker has already finished it
- # Must set write concern to at least 1 since we need the nModified back
- result = self.class.with(write: {w: 1}) do |query|
- query.
- where(id: id, state: :running, sub_state: :processing).
- update({"$set" => {state: :failed, worker_name: worker_name}})
- end
- fail_job = false unless result.modified_count.positive?
- end
- if fail_job
- message = "#{failed_count} slices failed to process"
- self.exception = JobException.new(message: message)
- fail!(worker_name, message)
- end
- end
+ rocket_job_batch_fail!(worker_name) if may_fail?
return true
end
# Any work left?
return false if input_count.positive?
@@ -235,16 +229,39 @@
update("$set" => {sub_state: :after, worker_name: worker_name})
end
# Reload to pull in any counters or other data that was modified.
reload
+
if result.modified_count.positive?
rocket_job_batch_run_after_callbacks(false)
- else
+ elsif aborted?
# Repeat cleanup in case this worker was still running when the job was aborted
- cleanup! if aborted?
+ cleanup!
end
true
+ end
+
+ # Fail the job
+ def rocket_job_batch_fail!(worker_name)
+ fail_job = true
+
+ unless new_record?
+ # Fail job iff no other worker has already finished it
+ # Must set write concern to at least 1 since we need the nModified back
+ result = self.class.with(write: {w: 1}) do |query|
+ query.
+ where(id: id, state: :running, sub_state: :processing).
+ update({"$set" => {state: :failed, worker_name: worker_name}})
+ end
+ fail_job = false unless result.modified_count.positive?
+ end
+
+ return unless fail_job
+
+ message = "#{input.failed.count} slices failed to process"
+ self.exception = JobException.new(message: message)
+ new_record? ? fail(worker_name, message) : fail!(worker_name, message)
end
# Run the before_batch callbacks
# Saves the current state before and after running callbacks if callbacks present
def rocket_job_batch_run_before_callbacks