lib/rocket_job/batch/worker.rb in rocketjob-6.0.0.rc3 vs lib/rocket_job/batch/worker.rb in rocketjob-6.0.0

- old
+ new

@@ -65,10 +65,12 @@ # For example, to extract the header row which would be in the first slice. # # Returns [Integer] the number of records processed in the slice # # Note: The slice will be removed from processing when this method completes + # + # @deprecated Please open a ticket if you need this behavior. def work_first_slice(&block) raise "#work_first_slice can only be called from within before_batch callbacks" unless sub_state == :before # TODO: Make these settings configurable count = 0 @@ -140,23 +142,23 @@ end # Perform individual slice without callbacks def rocket_job_perform_slice(slice, &block) slice.processing_record_number ||= 0 - records = [] append = false - # Skip processed records in this slice if it has no output categpries. - if slice.processing_record_number > 1 - records = slice.records[slice.processing_record_number - 1..-1] - append = true - logger.info("Resuming previously incomplete slice from record number #{slice.processing_record_number}") - else - # Reprocess all records in this slice. - slice.processing_record_number = 0 - records = slice.records - end + # Skip processed records in this slice if it has no output categories. + records = + if slice.processing_record_number.to_i > 1 + append = true + logger.info("Resuming previously incomplete slice from record number #{slice.processing_record_number}") + slice.records[slice.processing_record_number - 1..-1] + else + # Reprocess all records in this slice. + slice.processing_record_number = 0 + slice.records + end count = 0 RocketJob::Sliced::Writer::Output.collect(self, input_slice: slice, append: append) do |writer| records.each do |record| slice.processing_record_number += 1 @@ -244,10 +246,10 @@ fail_job = true unless new_record? # Fail job iff no other worker has already finished it # Must set write concern to at least 1 since we need the nModified back - result = self.class.with(write: {w: 1}) do |query| + result = self.class.with(write: {w: 1}) do |query| query. where(id: id, state: :running, sub_state: :processing). update({"$set" => {state: :failed, worker_name: worker_name}}) end fail_job = false unless result.modified_count.positive?