lib/rocket_job/batch/worker.rb in rocketjob-5.2.0.beta2 vs lib/rocket_job/batch/worker.rb in rocketjob-5.2.0.beta3

- old
+ new

@@ -128,62 +128,73 @@ # Process a single slice from Mongo # Once the slice has been successfully processed it will be removed from the input collection # Returns [Integer] the number of records successfully processed def rocket_job_process_slice(slice, &block) - # TODO: Skip records already processed - @rocket_job_record_number = slice.first_record_number || 0 - @rocket_job_slice = slice + @rocket_job_slice = slice + count = 0 - processed_records = 0 run_callbacks(:slice) do # Allow before_slice callbacks to fail, complete or abort this slice. return 0 unless running? - RocketJob::Sliced::Writer::Output.collect(self, slice) do |writer| - slice.each do |record| - SemanticLogger.named_tagged(record: @rocket_job_record_number) do - writer << rocket_job_batch_perform(slice, record, &block) - processed_records += 1 - end - # JRuby thinks self.rocket_job_record_number= is private and cannot be accessed - @rocket_job_record_number += 1 - end - end - @rocket_job_slice = nil - @rocket_job_record_number = nil + count = rocket_job_perform_slice(slice, &block) end + @rocket_job_slice = nil # On successful completion remove the slice from the input queue # TODO: Add option to complete slice instead of destroying it to retain input data. slice.destroy - processed_records + count end + # Perform individual slice without callbacks + def rocket_job_perform_slice(slice, &block) + count = 0 + RocketJob::Sliced::Writer::Output.collect(self, slice) do |writer| + records = slice.records + slice.processing_record_number ||= 0 + + # Skip records already processed, if any. + # TODO: Must append to existing output slices before this can be enabled. + # if !collect_output && (slice.processing_record_number > 1) + # records = records[slice.processing_record_number - 1..-1] + # end + + records.each do |record| + slice.processing_record_number += 1 + SemanticLogger.named_tagged(record: slice.current_record_number) do + writer << rocket_job_batch_perform(slice, record, &block) + count += 1 + end + end + end + count + end + # Perform a single record within the current slice. def rocket_job_batch_perform(slice, record) - slice.processing_record_number ||= 0 - slice.processing_record_number += 1 + @rocket_job_record_number = slice.current_record_number return block_given? ? yield(record) : perform(record) if _perform_callbacks.empty? # @rocket_job_input and @rocket_job_output can be modified by before/around callbacks - @rocket_job_input = record - @rocket_job_output = nil + @rocket_job_input = record + @rocket_job_output = nil run_callbacks(:perform) do @rocket_job_output = if block_given? yield(@rocket_job_input) else perform(@rocket_job_input) end end - @rocket_job_input = nil - result = @rocket_job_output - @rocket_job_output = nil + @rocket_job_input = nil + result = @rocket_job_output + @rocket_job_output = nil result end # Checks for completion and runs after_batch if defined # Returns true if the job is now complete/aborted/failed @@ -195,28 +206,11 @@ input_count = input.count failed_count = input.failed.count if failed_count.positive? && (input_count == failed_count) # Reload to pull in any counters or other data that was modified. reload unless new_record? - if may_fail? - fail_job = true - unless new_record? - # Fail job iff no other worker has already finished it - # Must set write concern to at least 1 since we need the nModified back - result = self.class.with(write: {w: 1}) do |query| - query. - where(id: id, state: :running, sub_state: :processing). - update({"$set" => {state: :failed, worker_name: worker_name}}) - end - fail_job = false unless result.modified_count.positive? - end - if fail_job - message = "#{failed_count} slices failed to process" - self.exception = JobException.new(message: message) - fail!(worker_name, message) - end - end + rocket_job_batch_fail!(worker_name) if may_fail? return true end # Any work left? return false if input_count.positive? @@ -235,16 +229,39 @@ update("$set" => {sub_state: :after, worker_name: worker_name}) end # Reload to pull in any counters or other data that was modified. reload + if result.modified_count.positive? rocket_job_batch_run_after_callbacks(false) - else + elsif aborted? # Repeat cleanup in case this worker was still running when the job was aborted - cleanup! if aborted? + cleanup! end true + end + + # Fail the job + def rocket_job_batch_fail!(worker_name) + fail_job = true + + unless new_record? + # Fail job iff no other worker has already finished it + # Must set write concern to at least 1 since we need the nModified back + result = self.class.with(write: {w: 1}) do |query| + query. + where(id: id, state: :running, sub_state: :processing). + update({"$set" => {state: :failed, worker_name: worker_name}}) + end + fail_job = false unless result.modified_count.positive? + end + + return unless fail_job + + message = "#{input.failed.count} slices failed to process" + self.exception = JobException.new(message: message) + new_record? ? fail(worker_name, message) : fail!(worker_name, message) end # Run the before_batch callbacks # Saves the current state before and after running callbacks if callbacks present def rocket_job_batch_run_before_callbacks