lib/rocket_job/batch/worker.rb in rocketjob-6.0.0.rc3 vs lib/rocket_job/batch/worker.rb in rocketjob-6.0.0
- old
+ new
@@ -65,10 +65,12 @@
# For example, to extract the header row which would be in the first slice.
#
# Returns [Integer] the number of records processed in the slice
#
# Note: The slice will be removed from processing when this method completes
+ #
+ # @deprecated Please open a ticket if you need this behavior.
def work_first_slice(&block)
raise "#work_first_slice can only be called from within before_batch callbacks" unless sub_state == :before
# TODO: Make these settings configurable
count = 0
@@ -140,23 +142,23 @@
end
# Perform individual slice without callbacks
def rocket_job_perform_slice(slice, &block)
slice.processing_record_number ||= 0
- records = []
append = false
- # Skip processed records in this slice if it has no output categpries.
- if slice.processing_record_number > 1
- records = slice.records[slice.processing_record_number - 1..-1]
- append = true
- logger.info("Resuming previously incomplete slice from record number #{slice.processing_record_number}")
- else
- # Reprocess all records in this slice.
- slice.processing_record_number = 0
- records = slice.records
- end
+ # Skip processed records in this slice if it has no output categories.
+ records =
+ if slice.processing_record_number.to_i > 1
+ append = true
+ logger.info("Resuming previously incomplete slice from record number #{slice.processing_record_number}")
+ slice.records[slice.processing_record_number - 1..-1]
+ else
+ # Reprocess all records in this slice.
+ slice.processing_record_number = 0
+ slice.records
+ end
count = 0
RocketJob::Sliced::Writer::Output.collect(self, input_slice: slice, append: append) do |writer|
records.each do |record|
slice.processing_record_number += 1
@@ -244,10 +246,10 @@
fail_job = true
unless new_record?
# Fail job iff no other worker has already finished it
# Must set write concern to at least 1 since we need the nModified back
- result = self.class.with(write: {w: 1}) do |query|
+ result = self.class.with(write: {w: 1}) do |query|
query.
where(id: id, state: :running, sub_state: :processing).
update({"$set" => {state: :failed, worker_name: worker_name}})
end
fail_job = false unless result.modified_count.positive?